How to seek Kafka offset in Spring Kafka Listener

How to seek Kafka offset in Spring Kafka Listener

How to seek Kafka offset in Spring Kafka Listener. Sometimes it happens that you need to change the Kafka offset in the application manually to point to a specific offset. For me, I needed this for troubleshooting purposes to know why a certain message in the pipeline was failing to get processed. But there might be other use for that as well.

Anyhow, it is relatively easy to seek the Kafka offset in Spring Kafka Listener at run time. All it is needed is to implement the ConsumerSeekAwareinterface in the same class of Spring Kafka listener. To know which class is your Spring Kafka Listener, you might need to search your project for @KafkaListenerto find it.

The ConsumerSeekAwarehas three methods which are:

  • registerSeekCallback: allows registering a custom callback.
  • onPartitionsAssigned: triggers when partitions assigned to the application. It also triggers on the application startup.
  • onIdleContainer: triggers when the Kafka container is idle.

According to your need, you can write your implementation in one or all three of the above methods. In my case, I prefer to just only implement onPartitionsAssignedwhich as stated earlier will trigger on the application startup. One example of this implementation is like follow:

import lombok.extern.slf4j.Slf4j;
import ma.glasnost.orika.MapperFacade;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MessageListener implements ConsumerSeekAware {

    @KafkaListener(
            id = "${message.listener.id}",
            topics = "${message.listener.input-kafka.topic}",
            groupId = "${message.listener.groupId}",
            containerFactory = "messageFactory")
    public void handleMessage(String message,
                                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                                   @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    	// consumer implementation
        ack.acknowledge();
    }


    // Implenentations of ConsumerSeekAware

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        // register custom callback
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        // Seek all the assigned partition to a certain offset `10234575L`
        assignments.keySet().forEach(partition -> callback.seek("com.madadipouya.message.events", partition.partition(), 10234575L));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
  		// executes when the Kafka container is idle
    }
}

Resources

Inline/featured images credits