Test Spring Kafka consumer and producer with EmbeddedKafka

Test Spring Kafka consumer and producer with EmbeddedKafka

Writing tests is the least favorite task of any developer, especially when it involves any external party. In our case, Kafka. Not only working with Kafka can be challenging, writing tests for any part that involves testing the functionality of the written Kafka code is even more daunting. However, thanks to the spring-kafka-test library, we can verify whether a Kafka consumer or a producer works as expected. In this article, we will go through how to test Spring Kafka consumer and producer with EmbeddedKafka by writing some JUnit 5 tests.

Why bother writing tests for Kafka consumer and producer

Writing tests for Kafka consumer and producer at first glance may look silly. One may question that we test the functionalities of the Kafka library more rather than the code. That’s partly true. During the process inevitably, we have to write or read a message from a topic, serialize or deserialize it and check the payload. But it brings many benefits, especially in complex codebases.

You don’t want your application to run into problems in the production environment because, for example, it has failed to deserialize a message.

Additionally, you may want to check on cases of sending or receiving any undesirable payloads and see how your code reacts and whether it manages to handle them gracefully or starts throwing exceptions and stops functioning. That’s why you may need to write tests for your Kafka component even though it has some overhead and takes lots of hassle to get it to work.

Who is this guide for?

This article is useful for anyone who uses Spring or Spring Boot with Spring Kafka library. If you use the low-level Apache Kafka library or even Spring Cloud Stream Kafka, you need to look somewhere else. Here, we only cover how to test Spring Kafka components.

For testing, we are going to use another Spring library that is called spring-kafka-test. It provides much functionality to ease our job in the testing process and takes care of lots of headaches. Additionally, we use JUnit 5, not 4. If you still are using JUnit 4, you may need to change some of the annotations accordingly.

A simple Spring Kafka consumer and producer use case

For better elaboration, let’s work on an example. Assume we have a User service that exposes an endpoint (/random). When the endpoint hits, the controller fires a Kafka event (in this case, creating a random user) that will be resulted in producing a message. On the other hand, there’s a Kafka consumer in the same project that reads messages and logs.

Both the consumer and the producer use String for key and Json for value de/serialization.

Even though the explained scenario is pretty rudimentary, it covers the majority of the flows of real Kafka usage (producing and consuming messages).

Without further ado, let’s look at the consumer and producer classes,

package com.madadipouya.springkafkatest.kafka.producer;

import com.madadipouya.springkafkatest.dto.User;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class UserKafkaProducer {

    private final KafkaTemplate<String, User> kafkaTemplate;

    @Value("${spring.kafka.topic.name}")
    private String topic;

    @Value("${spring.kafka.replication.factor:1}")
    private int replicationFactor;

    @Value("${spring.kafka.partition.number:1}")
    private int partitionNumber;

    public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void writeToKafka(User user) {
        kafkaTemplate.send(topic, user.getUuid(), user);
    }

    @Bean
    @Order(-1)
    public NewTopic createNewTopic() {
        return new NewTopic(topic, partitionNumber, (short) replicationFactor);
    }
}
package com.madadipouya.springkafkatest.kafka.consumer;

import com.madadipouya.springkafkatest.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class UserKafkaConsumer {

    private final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);

    @KafkaListener(topics = "${spring.kafka.topic.name}",
            concurrency = "${spring.kafka.consumer.level.concurrency:3}")
    public void logKafkaMessages(@Payload User user,
                                 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                                 @Header(KafkaHeaders.OFFSET) Long offset) {
        logger.info("Received a message contains a user information with id {}, from {} topic, " +
                        "{} partition, and {} offset", user.getUuid(), topic, partition, offset);
    }
}

That’s all about the Kafka consumer and producer classes. As you have seen, they are pretty straightforward, and surprisingly all the configurations are done in the application.properties instead of configuration classes. See below,

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.topic.name=com.madadipouya.kafka.user
spring.kafka.replication.factor=3
spring.kafka.partition.number=2
spring.kafka.consumer.group-id=kafka-user-listener
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.madadipouya.springkafkatest.dto.User
spring.kafka.consumer.level.concurrency=5
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.value.default.type=com.madadipouya.springkafkatest.dto.User

Testing the Spring Kafka producer

Let’s start with the producer because it’s easier. Firstly, we need to define a test Kafka consumer that takes care of reading messages from Kafka. But before that, we need to do some configuration for EmbeddedKafka. We have to define a topic name, number of partitions and annotate the test class as follows,

@EmbeddedKafka
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserKafkaProducerTest {

    private BlockingQueue<ConsumerRecord<String, String>> records;

    private KafkaMessageListenerContainer<String, String> container;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private UserKafkaProducer producer;

    @Autowired
    private ObjectMapper objectMapper;

    @BeforeAll
    void setUp() {
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
        ContainerProperties containerProperties = new ContainerProperties("com.madadipouya.kafka.user");
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    private Map<String, Object> getConsumerProperties() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
                ConsumerConfig.GROUP_ID_CONFIG, "consumer",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
                ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10",
                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }
}

The test class has three crucial annotations,

  1. @EmbeddedKafka – to enable the embedded Kafka for the test class.
  2. @SpringBootTest(properties) – overriding the Kafka broker address and port and using random port created by the embedded Kafka instead. Some tutorials hardcode the port with @EmbeddedKafka(ports = 9092) that’s an anti-pattern, especially for CI pipeline and test parallelization.
  3. @TestInstance(TestInstance.Lifecycle.PER_CLASS) – creates a new test instance once per class. It is useful to speed the testing process by avoiding bootstrapping and shutting down the embedded Kafka for each test case.

Then we created a KafkaMessageListenerContainer that its properties set in the setUp method and a BlockingQueue that is responsible for pulling the records from the Kafka.

Additionally, we wired EmbeddedKafkaBroker, and the producer under the test UserKafkaProducer.

The setUp method configures the embedded Kafka, starts the container and listens to a topic. Finally, the tearDown method stops the container after all tests run.

Now let’s write a test for the aforementioned UserKafkaProducer producer class.

class UserKafkaProducerTest {
  @Test
  void testWriteToKafka() throws InterruptedException, JsonProcessingException {
      // Create a user and write to Kafka
      User user = new User("11111", "John", "Wick");
      producer.writeToKafka(user);

      // Read the message (John Wick user) with a test consumer from Kafka and assert its properties
      ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
      assertNotNull(message);
      assertEquals("11111", message.key());
      User result = objectMapper.readValue(message.value(), User.class);
      assertNotNull(result);
      assertEquals("John", result.getFirstName());
      assertEquals("Wick", result.getLastName());
  }
}

First, we created a test user and then used the producer under the test to write it to Kafka. After that, we pulled the record with the timeout of 100 milliseconds and assert the payload.

Note that we used String for both key and value in the consumer record, but in the producer code, the message value is JSON. That’s why to ease the assertions, we converted the string value to an instance of User object using Jackson and then asserted its properties.

Testing the Spring Kafka consumer

Similar to the producer test case, we have to set the embedded Kafka first. But this time, we are not required to use BlockingQueue and KafkaContainer. Instead, we need to create a Kafka producer to write to a topic as follows,

@EmbeddedKafka
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserKafkaConsumerTest {

    private final String TOPIC_NAME = "com.madadipouya.kafka.user";

    private Producer<String, String> producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private ObjectMapper objectMapper;

    @SpyBean
    private UserKafkaConsumer userKafkaConsumer;

    @Captor
    ArgumentCaptor<User> userArgumentCaptor;

    @Captor
    ArgumentCaptor<String> topicArgumentCaptor;

    @Captor
    ArgumentCaptor<Integer> partitionArgumentCaptor;

    @Captor
    ArgumentCaptor<Long> offsetArgumentCaptor;

    @BeforeAll
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
    }
    
    @AfterAll
    void shutdown() {
        producer.close();
    }

Except for the apparent overlaps, there are a couple of things to note here:

  • Producer<String, String> producer defines a producer that its configurations initialized in the setUp method. All configs in that method use default values, nothing to be concerned.
  • The class under the test UserKafkaConsumer is a spy class to allow us to capture the passed arguments to its method. That is because the UserKafkaConsumer only logs a message. There is no way to read the consumed payload. Read the “When and how to spy objects in Mockito” article to know more about spy objects.
  • Subsequently, there are four ArgumentCaptors, matching the UserKafkaConsumer#logKafkaMessages method arguments to capture the argument values once the method runs.

Now that we have everything in place, we can write a test for the UserKafkaConsumer as follows,

class UserKafkaConsumerTest {

  @Test
  void testLogKafkaMessages() throws JsonProcessingException {
      // Write a message (John Wick user) to Kafka using a test producer
      String uuid = "11111";
      String message = objectMapper.writeValueAsString(new User(uuid, "John", "Wick"));
      producer.send(new ProducerRecord<>(TOPIC_NAME, 0, uuid, message));
      producer.flush();

      // Read the message and assert its properties
      verify(userKafkaConsumer, timeout(5000).times(1))
              .logKafkaMessages(userArgumentCaptor.capture(), topicArgumentCaptor.capture(),
                      partitionArgumentCaptor.capture(), offsetArgumentCaptor.capture());

      User user = userArgumentCaptor.getValue();
      assertNotNull(user);
      assertEquals("11111", user.getUuid());
      assertEquals("John", user.getFirstName());
      assertEquals("Wick", user.getLastName());
      assertEquals(TOPIC_NAME, topicArgumentCaptor.getValue());
      assertEquals(0, partitionArgumentCaptor.getValue());
      assertEquals(0, offsetArgumentCaptor.getValue());
  }
}

In the above code, first, we use the test producer to write a message to Kafka in a specific partition.

Then since the consumer reacts to the produced message in an async manner, instead of putting Thread.sleep(x), we use Mockito.verify with a timeout that inside we additionally capture the arguments. If the consumer method (logKafkaMessages in this case) doesn’t execute within five seconds, the test will fail. Finally, the test asserts the captured arguments.

Well, that’s all about how to test Spring Kafka consumer and producer with EmbeddedKafka.

In the next sections, I discuss some of my findings while writing this piece up.

Lack of good documentation and guidelines

Unfortunately, there are not much documentation and guidelines available on writing tests using EmbeddedKafka. Oddly enough, some tutorials mistakenly test the functionality of the EmbeddedKafka rather than the coded consumer or producer.

A note on Testcontainers

A much better alternative to test any Kafka related component is the Testcontainers library. For that you can refer to our write Kafka integration test with Testcontainers article. Furthermore, you learn Testconainers basics from integration test with Testcontainers in Java article.

Even though Testcontainers provides a close to real experience, tests usually take much longer time and heavier. Additionally, they are considered integration tests and not unit tests alike. Hence, it may not be the best option for testing edge and failure cases. What is demonstrated here aims at writing tests with embedded Kafka for fast execution.

Conclusion

In this article, we covered how to test Spring Kafka consumer and producer with EmbeddedKafka using the spring-kafka-test library. That helps developers to test corner cases quickly and use Testcontainers only to write tests for happy paths as they take longer to run.

If you are interested, you can find the working example on GitHub Spring Kafka Test repository.

References

Inline/featured images credits

2348 2363 2387