Redis Pub/Sub with Spring Boot

Redis Pub/Sub with Spring Boot

In one of the previous articles, we explained how to use Redis as a caching layer with Spring Boot. This article covers a different feature of Redis known as Pub/Sub. In this article, we explain how to use Redis Pub/Sub with Spring Boot.

For those who might not know, Redis can be used in two forms, as a caching layer or a message broker (Pub/Sub). The Redis message broker is similar to other messaging platforms like RabbitMQ or Kafka. It is also interesting to point out that distributed caching of Redis is built using Redis Pub/Sub feature.

In this article, we reuse an example explained here and add a Redis Publisher (Producer) and Subscriber (Listener). Make sure to familiarize yourself with the provided example project and its setup before continuing with the rest of this article.

Pub/Sub architecture

We must know the use cases behind using Redis Pub/Sub feature. In a microservice architecture, a message broker is usually used as a communication medium between two or more services in the form of top-down or unidirectional.

In other words, an upstream microservice produces messages/events and writes them to a messaging channel (or topic) where consumers can subscribe to and process messages at their speed.

Essentially, what we would like to achieve in this project is a simplified version of what is practiced usually. Instead of having two microservices (publisher/subscriber), we code everything in a single service for simplicity’s sake. That way, we don’t need to deal with the challenges of creating two projects and applying two different configuration sets.

At first glance, this might look a little bit odd at this point. But there’s a justification for such an implementation which we discuss in the upcoming articles.

To digest what was mentioned earlier, have a look at this diagram,

Redis-pubsub

As depicted in the above diagram, whenever a new Movie is added to the database, we would like to publish a message to a Redis topic. Hence, all the consumers, the same microservice, will be notified about the changes. For now, we keep things simple by only logging the received messages by the consumers and not anything else.

In the next section, we cover how to implement what is described above in a Spring Boot project using the Movie example discussed in the Getting started with Spring Data Redis with Kotlin article.

Redis Pub/Sub implementation with Spring Boot

Let’s look at the implementation of Redis Publisher/Subscriber in Spring Boot coded in Kotlin.

Adding Redis properties

The first thing we have to do is to add the following properties to either the application.properties or application.yml file of the project.

spring.data.redis.repositories.enabled=true
spring.data.redis.host=localhost
spring.data.redis.port=6379
spring.data.redis.topic=movie.update

Spring Boot Redis configuration

The next step is to create a configuration file for Redis and define the followings:

  • JedisConnectionFactory (since we are using Jedis) – connection to Redis
  • RedisTemplate – template for serialization/deserialization
  • ChannelTopic – topic configuration
  • RedisMessageListenerContainer – listener configuration

Ultimately our Redis configuration should look like this:

package com.madadipouya.redis.springdata.example.config

import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.MessageListener
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.listener.ChannelTopic
import org.springframework.data.redis.listener.RedisMessageListenerContainer
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer

@Configuration
class RedisConfig(val messageListener: MessageListener) {

    @Value("\${spring.redis.host}")
    lateinit var redisHost: String

    @Value("\${spring.redis.port}")
    lateinit var redisPort: String

    @Value("\${spring.redis.topic}")
    lateinit var redisTopic: String

    @Bean
    fun jedisConnectionFactory(): JedisConnectionFactory {
        val config = RedisStandaloneConfiguration(redisHost, redisPort.toInt())
        val jedisClientConfiguration = JedisClientConfiguration.builder().usePooling().build()
        val factory = JedisConnectionFactory(config, jedisClientConfiguration)
        factory.afterPropertiesSet()
        return factory
    }

    @Bean
    fun redisTemplate(): RedisTemplate<String, Any> {
        val template: RedisTemplate<String, Any> = RedisTemplate()
        template.setConnectionFactory(jedisConnectionFactory())
        template.valueSerializer = GenericJackson2JsonRedisSerializer()
        return template
    }

    @Bean
    fun topic(): ChannelTopic = ChannelTopic(redisTopic)

    @Bean
    fun newMessageListener(): MessageListenerAdapter = MessageListenerAdapter(messageListener)

    @Bean
    fun redisContainer(): RedisMessageListenerContainer {
        val container = RedisMessageListenerContainer()
        container.setConnectionFactory(jedisConnectionFactory())
        container.addMessageListener(newMessageListener(), topic())
        return container
    }
}

To avoid breaking the Redis as a caching layer, we configured the Redis template a way that can serialize and deserialize any types of data (Movie and Actor). For that, we set GenericJackson2JsonRedisSerializer as the serializer.

Implementing the producer

Now we need to implement the producer that sends a new Movie created message to the Redis topic, configured in the previous step. The producer code is as follows:

package com.madadipouya.redis.springdata.example.producer

import com.madadipouya.redis.springdata.example.model.Movie
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.listener.ChannelTopic
import org.springframework.stereotype.Component

@Component
class MovieAddedProducer(val template: RedisTemplate<String, Any>, val channelTopic: ChannelTopic) {

    companion object {
        val logger : Logger = LoggerFactory.getLogger(MovieAddedProducer::class.java)
    }

    fun publish(movie: Movie) {
        logger.info("Notifying subscribers on adding a new Movie {} {}", movie.id, movie.name)
        template.convertAndSend(channelTopic.topic, movie)
    }
}

However, the producer does not work automatically. We need to call it from somewhere in the code. Since we want to fire an event when adding a new Movie, it makes sense to implement a method call inside of the DefaultMovieService.kt class.

package com.madadipouya.redis.springdata.example.service.impl

import com.madadipouya.redis.springdata.example.controller.MovieController
import com.madadipouya.redis.springdata.example.model.Movie
import com.madadipouya.redis.springdata.example.producer.MovieAddedProducer
import com.madadipouya.redis.springdata.example.repository.ActorRepository
import com.madadipouya.redis.springdata.example.repository.MovieRepository
import com.madadipouya.redis.springdata.example.service.MovieService
import com.madadipouya.redis.springdata.example.service.exception.MovieNotFoundException
import org.springframework.stereotype.Service

@Service
class DefaultMovieService(val movieRepository: MovieRepository, val movieAddedProducer: MovieAddedProducer) : MovieService {
    override fun createMovie(movieDto: MovieController.MovieDto): Movie {
        val movie = movieRepository.save(Movie(name = movieDto.name.orEmpty(), genre = movieDto.genre.orEmpty(), year = movieDto.year))
        movieAddedProducer.publish(movie)
        return movie
    }
}

Implementing the consumer

The last step is to implement the consumer that reads messages from the topic. Similar to the producer, the implementation is straightforward as follows:

package com.madadipouya.redis.springdata.example.subscriber

import com.fasterxml.jackson.databind.ObjectMapper
import com.madadipouya.redis.springdata.example.model.Movie
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.Message
import org.springframework.data.redis.connection.MessageListener
import org.springframework.stereotype.Component

@Component
class MovieAddedListener(val objectMapper: ObjectMapper) : MessageListener {

    companion object {
        val logger : Logger = LoggerFactory.getLogger(MovieAddedListener::class.java)
    }

    override fun onMessage(message: Message, pattern: ByteArray?) {
        val movieMap = objectMapper.readValue(message.toString(), Map::class.java)
        logger.info("Notified on a new Movie creation {}, {}", movieMap["id"], movieMap["name"])
    }

}

As mentioned earlier, the consumer only logs the incoming messages/events.

Conclusion

In this tutorial, we discussed how to use Redis Pub/Sub feature with Spring Boot. We reused our Movie project example and added the publish and subscriber. However, both consumer and subscriber code resides in the same codebase. In future articles, you will learn how to utilize the coded Pub/Sub to update all SSE subscribers and keep them in sync when multiple instances of Movie services are running.

You can access this article’s source code on GitHub at the link below,
https://github.com/kasramp/spring-data-redis-example-kotlin

Inline/featured images credits