How to: Dynamically start/stop kafka listener in SpringBoot apps
Scenario: For whatever reason (as it usually is the case) you need to start a kafka consumer at a certain time and stop soon after the processing is done.
Solution #1: Its best to start the app when the kafka messages need to be processed and stop it soon after the processing is done.
Solution #2: If Solution #1 isn’t right for you, then…
set id property while registering Kafka Listener
@KafkaListener(id="assigned_listener_id", autoStartup = "false", topics = "topic-to-listen-to")
public void listen(Message message){
// interesting message processing logic
}
setting autoStartup to “false” will cause the kafka listener to not start with the application, set it to “true” if you need it to start the listener with the app. This property defaults to “true”
then use KafkaListenerEndpointRegistry bean to control start/stop of the listener
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
use this code to start the listener
/**
* invoke this method to start the listener
*/
public void startListener(){
kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").start();
}
use this method to stop it
/**
* invoke this method to stop the listener
*/
public void stopListener(){
kafkaListenerEndpointRegistry.getListenerContainer("assigned_listener_id").stop(()->{
log.info("Listener Stopped.");
});
}
Solution #3: this is when you want to create a consumer programatically and control start/stop. Bikas Katwal has written a great post explaining this approach here.
Happy Coding…