Configure Spring Cloud Stream with two Kafka brokers

Configure Spring Cloud Stream with two Kafka brokers

Configuring multiple Kafka brokers in a single project is a common use case. One of the scenarios is to read from a Kafka broker but write to a different one. The nature of the task is not complex, but at the same time, there are some nuances that one needs to pay attention to. In this article, we explain how to configure Spring Cloud Stream with two Kafka brokers.

Before starting on how to configure Spring Cloud Stream with two Kafka brokers, let’s clarify a common misconception. There are different Kafka libraries available on Java namely, <a href="https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html">Spring Cloud Stream Kafka</a>, <a href="https://spring.io/projects/spring-kafka">Spring Kafka</a> and <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-AlternativeJava">Kafka</a>.

In this article, we only focus on Spring Cloud Stream Kafka. This distinction is important since the following configurations do not apply to Spring Kafka and Kafka libraries.

Two Kafka scenario

Let’s create an imaginary use case. We intend to integrate with a service that writes user logging events to a Kafka broker. Let’s call it Broker A.

After that, a microservice, the user aggregator, consumes the messages, makes some transformations, and writes them to a different Kafka broker. Let’s call it Broker B.

In such a case, we need to configure the user aggregator service that is capable of operating with two Kafka brokers at the same time. For a better demonstration have a look at the below diagram,

Spring Cloud Kafka Stream with two brokers
Spring Cloud Kafka Stream with two brokers

Configuring Spring Cloud Kafka Stream with two brokers

As stated earlier using Spring Cloud Stream gives an easy configuration advantage. Because all we have to do is to define two different brokers in the application configuration file, here application.yml. For that, we create two customer binders named kafka-binder-a, and kafka-binder-b. The first binder connects to the user logger Kafka broker and consumes from the com.user.logger topic. And the other binder connects to user aggregator Kafka broker and writes to that Kafka under com.user.aggregator topic.

Using Spring Cloud Stream gives an easy way to configure the brokers. All we have to do is to define two different brokers in the application configuration file, application.yml.

For that, we must create two customer binders named kafka-binder-a, and kafka-binder-b. The first binder connects to the user logger Kafka broker and consumes from the com. user.logger topic. The other binder connects to the user aggregator Kafka broker and writes to the com.user.aggregator Kafka topic.

Lastly, in the binder configuration, we need to add the brokers and zookeeper addresses that they suppose to consume from or produce to. We can have something like below,

spring:
  cloud:
    stream:
      bindings:
        logger-input:
          group: user-aggregator-consumer
          destination: com.user.logger
          binder: kafka-binder-a
          content-type: application/json
          consumer:
            headerMode: raw
            partitioned: true
        aggregator-output:
          destination: com.user.aggregator
          binder: kafka-binder-b
          content-type: application/json
          producer:
            partition-key-expression: headers['partitionKey']
            partitionCount: 32
      kafka:
        bindings:
          logger-input:
            consumer.startOffset: latest
            autoCommitOffset: true
            autoRebalanceEnabled: true
      binders:
        kafka-binder-a:
          type: kafka
          environment.spring.cloud.stream.kafka.binder:
            brokers: localhost:6667
            zkNodes: localhost:2181
            offsetUpdateTimeWindow: 1000
            autoAddPartitions: true
        kafka-binder-b:
          type: kafka
          environment.spring.cloud.stream.kafka.binder:
            brokers: localhost:9092
            zkNodes: localhost:2181
            offsetUpdateTimeWindow: 1000
            autoAddPartitions: true

Now that we have the application configuration ready, all we are left to do is two create a consumer and a producer. The interesting thing is the consumer and producer are the same if we consume and produce to the same broker. We provide an example of each just for the sake of clarity.

Producer,

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface KafkaSource {

    String AGGREGATOR_OUTPUT = "aggregator-output";

    @Output(KafkaSource.AGGREGATOR_OUTPUT)
    MessageChannel write();
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Slf4j
@EnableBinding(KafkaSource.class)
@Component
public class AggregatorStream {

    private static final long SEND_BUS_TIMEOUT = 3000;

    private final KafkaSource kafkaSource;

    public AggregatorStream(KafkaSource kafkaSource) {
        this.kafkaSource = kafkaSource;
    }

    public boolean write(Payload payload) {
        Message<Payload> messagePaylouad = MessageBuilder
                .withPayload(payload)
                .setHeader("partitionKey", payload.getId())
                .build();
        boolean send = kafkaSource.write().send(messagePaylouad, SEND_BUS_TIMEOUT);
        return send;
    }
}

Consumer,

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaSink {

    String LOGGER_INPUT = "logger-input";

    @Input(KafkaSink.LOGGER_INPUT)
    SubscribableChannel listen();
}

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import static org.springframework.kafka.support.KafkaHeaders.OFFSET;
import static org.springframework.kafka.support.KafkaHeaders.RECEIVED_PARTITION_ID;
import static org.springframework.kafka.support.KafkaHeaders.RECEIVED_TOPIC;

@Slf4j
@EnableBinding(KafkaSink.class)
@Component
public class LoggerListener {
    
    private final TransformerService transformerService;
  
    public LoggerListener(TransformerService transformerService) {
        this.transformerService = transformerService;
    }
    
    @StreamListener(KafkaSink.LOGGER_INPUT)
    public void listen(@Header(OFFSET) String offset,
                                    @Header(RECEIVED_PARTITION_ID) String partitionId,
                                    @Header(RECEIVED_TOPIC) String topic,
                                    @Payload LoggerPayload payload) {
      
        transformerService.transform(payload);
    }
}

Conclusion

In this article, we discussed how to configure Spring Cloud Stream, both consumer and producer, with two different Kafka brokers. If you are interested in writing integration test for Kafka check out the Kafka Testcontainers and integration test with embedded Kafka tutorials.

Inline/featured images credits