Write Kafka integration test with Testcontainers

Write Kafka integration test with Testcontainers

Writing a Spring Boot Kafka integration test can be frustrating even by leveraging the spring-kafka-test library that supplies an embedded Kafka. That’s mainly due to the complex test configuration that involves registering consumers and producers to read and write messages. A better approach is to treat Kafka integration tests as a black-box and avoid going into low-level details. That, however, is not feasible with purely relying on spring-kafka-test and requires an external library which in this case is Test containers. In this article, we cover how to write Kafka integration test with Testcontainers.

Spring Kafka test vs Testcontainers

In one of the earlier posts, we showed how to test Spring Kafka consumer and producer with EmbeddedKafka. One may ask what the advantages of Testcontainers over EmbeddedKafka are.

Using Testcontainers, one can reduce test configuration to almost zero and not worry about how to read and write messages from and to Kafka when writing tests. Instead, a developer can entirely focus on testing needed functionalities.

Additionally, the Kafka Docker container can be kept alive, which can help with test performance and results in a testing environment closer to production.

Testcontainers dependency

As mentioned earlier, to use Testcontainers we need to add its dependencies to the project. Add the following to your project.

<dependencies>
  <dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.19.7</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.7</version>
    <scope>test</scope>
  </dependency>
</dependencies>

Spring Kafka consumer and producer under the test

For the sake of simplicity, we stick to the example explained on how to test Spring Kafka consumer and producer with the EmbeddedKafka article.

We have a User microservice that exposes an endpoint (/random). Calling the endpoint triggers the Kafka producer to emit events that will be consumed by a Kafka consumer and creates users, hypothetically. For the sake of simplicity, however, both the producer and the consumer are located at the User microservice. Let’s look at the Kafka producer and consumer code.

package com.madadipouya.springkafkatest.kafka.producer;

import com.madadipouya.springkafkatest.dto.User;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class UserKafkaProducer {

    private final KafkaTemplate<String, User> kafkaTemplate;

    @Value("${spring.kafka.topic.name}")
    private String topic;

    @Value("${spring.kafka.replication.factor:1}")
    private int replicationFactor;

    @Value("${spring.kafka.partition.number:1}")
    private int partitionNumber;

    public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void writeToKafka(User user) {
        kafkaTemplate.send(topic, user.getUuid(), user);
    }

    @Bean
    @Order(-1)
    public NewTopic createNewTopic() {
        return new NewTopic(topic, partitionNumber, (short) replicationFactor);
    }
}
package com.madadipouya.springkafkatest.kafka.consumer;

import com.madadipouya.springkafkatest.dto.User;
import com.madadipouya.springkafkatest.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class UserKafkaConsumer {

    private static final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);

    private final UserService userService;

    public UserKafkaConsumer(UserService userService) {
        this.userService = userService;
    }

    @KafkaListener(topics = "${spring.kafka.topic.name}",
            concurrency = "${spring.kafka.consumer.level.concurrency:3}")
    public void logKafkaMessages(@Payload User user,
                                 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                                 @Header(KafkaHeaders.OFFSET) Long offset) {
        logger.info("Received a message contains a user information with id {}, from {} topic, " +
                "{} partition, and {} offset", user.getUuid(), topic, partition, offset);
        userService.save(user);
    }
}

Testing Spring Kafka consumer and producer with Testcontainers

In contrast to the EmbeddedKafka and spring-kafka-test we don’t need to test the producer and the consumer separately. Both can be tested together. Additionally, the configuration is minimal. All we need to do is to configure the test to start the Kafka Docker container and bind it to a port that Spring Boot Test can access. The test class is as follows:

package com.madadipouya.springkafkatest.kafka;

import com.madadipouya.springkafkatest.dto.User;
import com.madadipouya.springkafkatest.kafka.consumer.UserKafkaConsumer;
import com.madadipouya.springkafkatest.kafka.producer.UserKafkaProducer;
import com.madadipouya.springkafkatest.service.UserService;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

@Testcontainers
@SpringBootTest
class UserKafkaTestcontainersTest {

    @Container
    static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
    }

    @Autowired
    private UserKafkaProducer userKafkaProducer;

    @Autowired
    private UserKafkaConsumer userKafkaConsumer;

    @MockBean
    private UserService userService;

    @Test
    void testProduceAndConsumeKafkaMessage() {
        ArgumentCaptor<User> captor = ArgumentCaptor.forClass(User.class);
        User user = new User("11111", "John", "Wick");

        userKafkaProducer.writeToKafka(user);

        verify(userService, timeout(5000)).save(captor.capture());
        assertNotNull(captor.getValue());
        assertEquals("11111", captor.getValue().getUuid());
        assertEquals("John", captor.getValue().getFirstName());
        assertEquals("Wick", captor.getValue().getLastName());
    }
}

The test class is annotated with @Testcontainers to signal that the test is using Testcontainer. Then we have the Kafka container definition at line 29 that defines the Kafka Docker image name and the tag.

Since we didn’t specify the Docker expose port, Testcontainers assigns a random port on each run. That’s why we must override the spring.kafka.bootstrap-servers property at runtime. Thanks to the @DynamicPropertySource it is easier than ever.

When it comes to testing the producer and the consumer, we autowired related classes as any standard @SpringBootTest.

In the test case, firstly, we produce a message and then consume it. As threads responsible for writings a Kafka message and consuming a message are independent, we had to introduce timeout when verifying the consumer. At line 52 Mockito waits a maximum of 5 seconds for the condition to be true (the Kafka consumer consumes a message and calls userService#save), otherwise it fails the test.

Conclusion

In this article, we covered how to write Kafka integration test with Testcontainers which is much simpler compared to EmbeddedKafka as one doesn’t need to do complex configuration and deep dive into details. Therefore, the code is much simpler, extendable, and future-proof.

You can find the working example on this GitHub repository that contains both EmbeddedKafka and Kafka Testcontainers examples at the link below,

https://github.com/kasramp/spring-kafka-test

Inline/featured images credits