How To: Code Functional style with spring kafka streams (plus some avro goodness)
Spring cloud-streams (with functional programming) is an interesting and much needed simplification to stream processing using spring, before we had to create multiple classes with complex annotations to get the job done, now the whole process had been reduced to a handful of beans.
Lets get started on how this can be achieved.
Assumptions:
- I assume that you have a kafka cluster running (either locally / on cloud / anywhere else) — if you need help setting up kafka please go through many thousands of tutorials available online — how to setup kafka cluster
- I assume you have an understanding of what functional programming is and have a dev env setup and ready to go
- you have about 20 minutes to spare to get this running
lets get going…
Create a new project using spring starter (https://start.spring.io)
You can choose any name, package, language,build tool for the project (the only thing I want to ensure is that you choose Spring boot version 2.6.1 and packaging as Jar ( there is nothing wrong with war packaging, please search online on how to run a spring-boot war file)
I prefer JDK 11 and above. so I have choosen JDK 11
Now select libraries / dependencies on the right
if you are using avro for schema then there are additional dependencies to add to build.gradle file
/** needed to use avro schema to send receive messages in kafka **/
implementation “io.confluent:kafka-avro-serializer:${confluentVersion}” implementation “io.confluent:kafka-streams-avro-serde:${confluentVersion}”
add
set(‘confluentVersion’, “5.2.0”) in ext { } block
The producer;processor;consume
create beans which can do the work of producing, processing, consuming messages; a sample would be something shown here, here and here
The Configuration
The sample code accompanying this post has config split into different files, a cloud specific config and a local config
This is accompanied by kafka stream specific configurations seen in application.yml
The codebase
The codebase can be found here …
Hope this helps…
Happy Coding :)