Test Spring Kafka consumer and producer with EmbeddedKafka

Test Spring Kafka consumer and producer with EmbeddedKafka

Writing tests is the least favorite task of any developer, especially when it involves any external party. In our case, Kafka. Not only working with Kafka can be challenging, writing tests for any part that involves testing the functionality of the written Kafka code is even more daunting. However, thanks to the spring-kafka-test library, we can verify whether a Kafka consumer or a producer works as expected. In this article, we will go through how to test Spring Kafka consumer and producer with EmbeddedKafka by writing some JUnit 5 tests.

Why bother writing tests for Kafka consumer and producer

Writing tests for Kafka consumer and producer at first glance may look silly. One may question that we test the functionalities of the Kafka library more rather than the code. That’s partly true. During the process inevitably, we have to write or read a message from a topic, serialize or deserialize it and check the payload. But it brings many benefits, especially in complex codebases.

You don’t want your application to run into problems in the production environment because, for example, it has failed to deserialize a message.

Additionally, you may want to check on cases of sending or receiving any undesirable payloads and see how your code reacts and whether it manages to handle them gracefully or starts throwing exceptions and stops functioning. That’s why you may need to write tests for your Kafka component even though it has some overhead and takes lots of hassle to get it to work.

Who is this guide for?

This article is useful for anyone who uses Spring or Spring Boot with Spring Kafka library. If you use the low-level Apache Kafka library or even Spring Cloud Stream Kafka, you need to look somewhere else. Here, we only cover how to test Spring Kafka components.

For testing, we are going to use another Spring library that is called spring-kafka-test. It provides much functionality to ease our job in the testing process and takes care of lots of headaches. Additionally, we use JUnit 5, not 4. If you still are using JUnit 4, you may need to change some of the annotations accordingly.

A simple Spring Kafka consumer and producer use case

For better elaboration, let’s work on an example. Assume we have a User service that exposes an endpoint (/random). When the endpoint hits, the controller fires a Kafka event (in this case, creating a random user) that will be resulted in producing a message. On the other hand, there’s a Kafka consumer in the same project that reads messages and logs.

Both the consumer and the producer use String for key and Json for value de/serialization.

Even though the explained scenario is pretty rudimentary (and even silly), it covers the majority of the flows of real Kafka usage (producing and consuming messages).

Without further ado, let’s look at the consumer and producer classes,

That’s all about the Kafka consumer and producer classes. As you have seen, they are pretty straightforward, and surprisingly all the configurations are done in the application.properties instead of configuration classes. See below,

Testing the Spring Kafka producer

Let’s start with the producer because it’s easier. Firstly, we need to define a test Kafka consumer that takes care of reading messages from Kafka. But before that, we need to do some configuration for EmbeddedKafka. We have to define a topic name, number of partitions and annotate the test class as follows,

The test class has three crucial annotations,

  1. @EmbeddedKafka – to enable the embedded Kafka for the test class.
  2. @SpringBootTest(properties) – overriding the Kafka broker address and port and using random port created by the embedded Kafka instead. Some tutorials hardcode the port with @EmbeddedKafka(ports = 9092) that’s an anti-pattern, especially for CI pipeline and test parallelization.
  3. @TestInstance(TestInstance.Lifecycle.PER_CLASS) – creates a new test instance once per class. It is useful to speed the testing process by avoiding bootstrapping and shutting down the embedded Kafka for each test case.

Then we created a KafkaMessageListenerContainer that its properties set in the setUp method and a BlockingQueue that is responsible for pulling the records from the Kafka.

Additionally, we wired EmbeddedKafkaBroker, and the producer under the test UserKafkaProducer.

The setUp method configures the embedded Kafka, starts the container and listens to a topic. Finally, the tearDown method stops the container after all tests run.

Now let’s write a test for the aforementioned UserKafkaProducer producer class.

First, we created a test user and then used the producer under the test to write it to Kafka. After that, we pulled the record with the timeout of 100 milliseconds and assert the payload.

Note that we used String for both key and value in the consumer record, but in the producer code, the message value is JSON. That’s why to ease the assertions, we converted the string value to an instance of User object using Jackson and then asserted its properties.

Testing the Spring Kafka consumer

Similar to the producer test case, we have to set the embedded Kafka first. But this time, we are not required to use BlockingQueue and KafkaContainer. Instead, we need to create a Kafka producer to write to a topic as follows,

Except for the apparent overlaps, there are a couple of things to note here:

  • Producer<String, String> producer defines a producer that its configurations initialized in the setUp method. All configs in that method use default values, nothing to be concerned.
  • The class under the test UserKafkaConsumer is a spy class to allow us to capture the passed arguments to its method. That is because the UserKafkaConsumer only logs a message. There is no way to read the consumed payload. Read the “When and how to spy objects in Mockito” article to know more about spy objects.
  • Subsequently, there are four ArgumentCaptors, matching the UserKafkaConsumer#logKafkaMessages method arguments to capture the argument values once the method runs.

Now that we have everything in place, we can write a test for the UserKafkaConsumer as follows,

In the above code, first, we use the test producer to write a message to Kafka in a specific partition.

Then since the consumer reacts to the produced message in an async manner, instead of putting Thread.sleep(x), we use Mockito.verify with a timeout that inside we additionally capture the arguments. If the consumer method (logKafkaMessages in this case) doesn’t execute within five seconds, the test will fail. Finally, the test asserts the captured arguments.

Well, that’s all about how to test Spring Kafka consumer and producer with EmbeddedKafka.

In the next sections, I discuss some of my findings while writing this piece up.

Lack of good documentation and guidelines

Unfortunately, there are not much documentation and guidelines available on writing tests using EmbeddedKafka. Oddly enough, some tutorials mistakenly test the functionality of the EmbeddedKafka rather than the coded consumer or producer.

A side note on testContainers

A much better alternative to test any Kafka related component is the Testcontainers library. We have already covered how to work with it in “Integration test with Testcontainers in Java” article. The approach for Kafka is very similar to the Elasticsearch use case that’s shown there.

Even though Testcontainers provides a close to real experience, tests usually take much longer time and heavier. Additionally, they are considered integration tests and not unit tests alike. Hence, it may not be the best option for testing edge and failure cases. What is demonstrated here aims at writing tests with embedded Kafka for fast execution.

Conclusion

In this article, we covered how to test Spring Kafka consumer and producer with EmbeddedKafka using the spring-kafka-test library. That helps developers to test corner cases quickly and use Testcontainers only to write tests for happy paths as they take longer to run.

If you are interested, you can find the working example on this GitHub repository at the link below,

https://github.com/kasramp/spring-kafka-test

References

Inline/featured images credits