Writing a Spring Boot Kafka integration test can be frustrating even by leveraging the spring-kafka-test
library that supplies an embedded Kafka. That’s mainly due to the complex test configuration that involves registering consumers and producers to read and write messages. A better approach is to treat Kafka integration tests as a black-box and avoid going into low-level details. That, however, is not feasible with purely relying on spring-kafka-test
and requires an external library which in this case is Test containers. In this article, we cover how to write Kafka integration test with Testcontainers.
Spring Kafka test vs Testcontainers
In one of the earlier posts, we showed how to test Spring Kafka consumer and producer with EmbeddedKafka. One may ask what the advantages of Testcontainers over EmbeddedKafka are.
Using Testcontainers, one can reduce test configuration to almost zero and not worry about how to read and write messages from and to Kafka when writing tests. Instead, a developer can entirely focus on testing needed functionalities.
Additionally, the Kafka Docker container can be kept alive, which can help with test performance and results in a testing environment closer to production.
Testcontainers dependency
As mentioned earlier, to use Testcontainers we need to add its dependencies to the project. Add the following to your project.
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.7</version>
<scope>test</scope>
</dependency>
</dependencies>
Spring Kafka consumer and producer under the test
For the sake of simplicity, we stick to the example explained on how to test Spring Kafka consumer and producer with the EmbeddedKafka article.
We have a User
microservice that exposes an endpoint (/random
). Calling the endpoint triggers the Kafka producer to emit events that will be consumed by a Kafka consumer and creates users, hypothetically. For the sake of simplicity, however, both the producer and the consumer are located at the User
microservice. Let’s look at the Kafka producer and consumer code.
package com.madadipouya.springkafkatest.kafka.producer;
import com.madadipouya.springkafkatest.dto.User;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class UserKafkaProducer {
private final KafkaTemplate<String, User> kafkaTemplate;
@Value("${spring.kafka.topic.name}")
private String topic;
@Value("${spring.kafka.replication.factor:1}")
private int replicationFactor;
@Value("${spring.kafka.partition.number:1}")
private int partitionNumber;
public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void writeToKafka(User user) {
kafkaTemplate.send(topic, user.getUuid(), user);
}
@Bean
@Order(-1)
public NewTopic createNewTopic() {
return new NewTopic(topic, partitionNumber, (short) replicationFactor);
}
}
package com.madadipouya.springkafkatest.kafka.consumer;
import com.madadipouya.springkafkatest.dto.User;
import com.madadipouya.springkafkatest.service.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class UserKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);
private final UserService userService;
public UserKafkaConsumer(UserService userService) {
this.userService = userService;
}
@KafkaListener(topics = "${spring.kafka.topic.name}",
concurrency = "${spring.kafka.consumer.level.concurrency:3}")
public void logKafkaMessages(@Payload User user,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
@Header(KafkaHeaders.OFFSET) Long offset) {
logger.info("Received a message contains a user information with id {}, from {} topic, " +
"{} partition, and {} offset", user.getUuid(), topic, partition, offset);
userService.save(user);
}
}
Testing Spring Kafka consumer and producer with Testcontainers
In contrast to the EmbeddedKafka and spring-kafka-test
we don’t need to test the producer and the consumer separately. Both can be tested together. Additionally, the configuration is minimal. All we need to do is to configure the test to start the Kafka Docker container and bind it to a port that Spring Boot Test can access. The test class is as follows:
package com.madadipouya.springkafkatest.kafka;
import com.madadipouya.springkafkatest.dto.User;
import com.madadipouya.springkafkatest.kafka.consumer.UserKafkaConsumer;
import com.madadipouya.springkafkatest.kafka.producer.UserKafkaProducer;
import com.madadipouya.springkafkatest.service.UserService;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@Testcontainers
@SpringBootTest
class UserKafkaTestcontainersTest {
@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
@Autowired
private UserKafkaProducer userKafkaProducer;
@Autowired
private UserKafkaConsumer userKafkaConsumer;
@MockBean
private UserService userService;
@Test
void testProduceAndConsumeKafkaMessage() {
ArgumentCaptor<User> captor = ArgumentCaptor.forClass(User.class);
User user = new User("11111", "John", "Wick");
userKafkaProducer.writeToKafka(user);
verify(userService, timeout(5000)).save(captor.capture());
assertNotNull(captor.getValue());
assertEquals("11111", captor.getValue().getUuid());
assertEquals("John", captor.getValue().getFirstName());
assertEquals("Wick", captor.getValue().getLastName());
}
}
The test class is annotated with @Testcontainers
to signal that the test is using Testcontainer. Then we have the Kafka container definition at line 29
that defines the Kafka Docker image name and the tag.
Since we didn’t specify the Docker expose port, Testcontainers assigns a random port on each run. That’s why we must override the spring.kafka.bootstrap-servers
property at runtime. Thanks to the @DynamicPropertySource
it is easier than ever.
When it comes to testing the producer and the consumer, we autowired related classes as any standard @SpringBootTest
.
In the test case, firstly, we produce a message and then consume it. As threads responsible for writings a Kafka message and consuming a message are independent, we had to introduce timeout when verifying the consumer. At line 52
Mockito waits a maximum of 5 seconds for the condition to be true (the Kafka consumer consumes a message and calls userService#save
), otherwise it fails the test.
Conclusion
In this article, we covered how to write Kafka integration test with Testcontainers which is much simpler compared to EmbeddedKafka as one doesn’t need to do complex configuration and deep dive into details. Therefore, the code is much simpler, extendable, and future-proof.
You can find the working example on this GitHub repository that contains both EmbeddedKafka and Kafka Testcontainers examples at the link below,
https://github.com/kasramp/spring-kafka-test
Inline/featured images credits
- Featured image by chaipat aun from Pixabay