Distributed SSE with Spring SseEmitter and Redis Pub/Sub

Distributed SSE with Spring SseEmitter and Redis Pub/Sub

In the last article, we demonstrated how to use Server-Sent Events with Spring MVC SseEmitter. In this post, we expand the scope of the previous example to support distributed SSE. To achieve that we rely on Redis Pub/Sub which was discussed previously.

Before starting the tutorial about distributed SSE with Spring SseEmitter and Redis Pub/Sub, we highly recommend you look at the previous articles in this series:

Problem statement

The previous implementation of SseEmitter works perfectly for a single instance service. However, one of its shortcomings is the lack of support for a distributed environment. That means that if multiple instances of the Movie Service are running they have no way to communicate with each other. As a result, when a new movie is added (some) subscribers may miss the update depending on where the POST create Movie request routes. For a better demonstration, let’s have a look at the diagram below:

Distributed-SSE-Diagram-1

As depicted in the above diagram, a request to add John Wick 3 movie was routed to Instance C of the Movie Service. Since the instances do not communicate about the creation event, User B and User C that subscribe to Instance A and Instance B will miss the update on John Wick 3. Only User D that subscribes to Instance C receives the update.

Redis Pub/Sub as a communication channel

To solve the communication problem, we can rely on Redis Pub/Sub feature. For that, the receiver instance informs other instances so each can notify their subscribers about the latest update. Let’s have a look at the following diagram,

Distributed-SSE-Diagram-2

As you can see, once User A sends a request to create John Wick 3, the request routes to Instance C. Then that instance informs two other instances, A and B by producing a message. Once Instance A and Instance B were informed about the latest update, each notifies their subscribers (User B, User C) respectively. Additionally, Instance C updates its subscriber, User D.

One interesting phenomenon regarding the above diagram is that `Instance C` informs itself about the new update. That is because the instance listens to the same topic since it should receive updates from other instances. Hence, `Instance C` does not need to inform its subscribers differently. In other words, all instances treat updates in the same way, whether updates are trigged by self or through another instance.

Implementation of distributed SSE with Redis Pub/Sub

The implementation is straightforward. All we have to do is to change the previous code we explained here. Instead of directly notifying the subscribers, we write messages (new movie details) to Redis. For that, we need to change the Movie Service as follows:

package com.madadipouya.redis.springdata.example.service.impl

import com.madadipouya.redis.springdata.example.controller.MovieController
import com.madadipouya.redis.springdata.example.model.Movie
import com.madadipouya.redis.springdata.example.producer.MovieAddedProducer
import com.madadipouya.redis.springdata.example.repository.MovieRepository
import com.madadipouya.redis.springdata.example.service.MovieService
import com.madadipouya.redis.springdata.example.service.exception.MovieNotFoundException
import org.springframework.stereotype.Service

@Service
class DefaultMovieService(val movieRepository: MovieRepository, val movieAddedProducer: MovieAddedProducer) : MovieService {

    // removed for brevity
  
    override fun createMovie(movieDto: MovieController.MovieDto): Movie {
        val movie = movieRepository.save(Movie(name = movieDto.name.orEmpty(), genre = movieDto.genre.orEmpty(), year = movieDto.year))
        movieAddedProducer.publish(movie)
        return movie
    }
  
    // removed for brevity
}

As for the producer, the code is even simpler. We only need to serialize messages and write them to the Redis topic,

package com.madadipouya.redis.springdata.example.producer

import com.madadipouya.redis.springdata.example.model.Movie
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.listener.ChannelTopic
import org.springframework.stereotype.Component

@Component
class MovieAddedProducer(val template: RedisTemplate<String, Any>, val channelTopic: ChannelTopic) {

    companion object {
        val logger : Logger = LoggerFactory.getLogger(MovieAddedProducer::class.java)
    }

    fun publish(movie: Movie) {
        logger.info("Notifying subscribers on adding a new Movie {} {}", movie.id, movie.name)
        template.convertAndSend(channelTopic.topic, movie)
    }
}

The next step is the (movie) payload deserialization in the Redis listener that then proceed by calling Subscription Service. Naturally, the aforementioned service notifies its subscribers about the new movie. The code implementation is like below,

package com.madadipouya.redis.springdata.example.consumer

import com.fasterxml.jackson.databind.ObjectMapper
import com.madadipouya.redis.springdata.example.model.Movie
import com.madadipouya.redis.springdata.example.subscription.service.SubscriptionService
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.data.redis.connection.Message
import org.springframework.data.redis.connection.MessageListener
import org.springframework.stereotype.Component

@Component
class MovieAddedListener(val objectMapper: ObjectMapper, val subscriptionService: SubscriptionService) : MessageListener {

    companion object {
        val logger : Logger = LoggerFactory.getLogger(MovieAddedListener::class.java)
    }

    override fun onMessage(message: Message, pattern: ByteArray?) {
        val movie = manualMap(message)
        subscriptionService.notifySubscribers(movie)
        logger.info("Notified on a new Movie creation {}, {}", movie.id, movie.name)
    }

    // Jackson is unable to deserialize empty list  of Actor. That's why need to manually map
    // Or not mixing model/entity with DTO
    fun manualMap(message: Message) : Movie {
        val movieMap = objectMapper.readValue(message.toString(), Map::class.java)
        val movie = Movie(movieMap["name"] as String, movieMap["genre"] as String, movieMap["year"] as Int)
        movie.id = movieMap["id"] as String?
        return movie
    }
}

Keep in mind that we have to deserialize the payload manually. That is because of the incompatibility between Jackson Library and Kotlin due to the use of List in the Movie class. Ideally, entities should be kept separate from DTOs. In this example, for simplicity’s sake, we skipped that. Instead did the mapping manually.

Conclusion

In this post, we discussed how to create a distributed SSE with Spring SseEmitter and Redis Pub/Sub as the backbone to communicate between different application instances and fan out any updates. That way, all nodes get informed about the latest changes and can emit events to the subscribed clients via SSE.

You can find the full source code of the project on GitHub at the link below,

https://github.com/kasramp/spring-data-redis-example-kotlin

Inline/featured images credits