How to seek Kafka offset in Spring Kafka Listener. Sometimes it happens that you need to change the Kafka offset in the application manually to point to a specific offset. For me, I needed this for troubleshooting purposes to know why a certain message in the pipeline was failing to get processed. But there might be other use for that as well.
Anyhow, it is relatively easy to seek the Kafka offset in Spring Kafka Listener at run time. All it is needed is to implement the ConsumerSeekAware
interface in the same class of Spring Kafka listener. To know which class is your Spring Kafka Listener, you might need to search your project for @KafkaListener
to find it.
The ConsumerSeekAware
has three methods which are:
registerSeekCallback
: allows registering a custom callback.onPartitionsAssigned
: triggers when partitions assigned to the application. It also triggers on the application startup.onIdleContainer
: triggers when the Kafka container is idle.
According to your need, you can write your implementation in one or all three of the above methods. In my case, I prefer to just only implement onPartitionsAssigned
which as stated earlier will trigger on the application startup. One example of this implementation is like follow:
import lombok.extern.slf4j.Slf4j;
import ma.glasnost.orika.MapperFacade;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MessageListener implements ConsumerSeekAware {
@KafkaListener(
id = "${message.listener.id}",
topics = "${message.listener.input-kafka.topic}",
groupId = "${message.listener.groupId}",
containerFactory = "messageFactory")
public void handleMessage(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {
// consumer implementation
ack.acknowledge();
}
// Implenentations of ConsumerSeekAware
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
// register custom callback
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
// Seek all the assigned partition to a certain offset `10234575L`
assignments.keySet().forEach(partition -> callback.seek("com.madadipouya.message.events", partition.partition(), 10234575L));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
// executes when the Kafka container is idle
}
}
Resources
- https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerSeekAware.html
- http://blog.empeccableweb.com/wp/2016/11/30/manual-offsets-in-kafka-consumers-example/