Pub-sub Redis in Micronaut
How to create pub-sub redis architecture using Micronaut
A few years ago I struggled a lot to get a simple Redis
Pub-sub
working locally. I'm working on a private project looking to illustrate the Kong framework in action. For that I need some sort of Queue system and decided to venture on one that is relatively new to me.
I started out by thinking out a way to first test this. In this way I wanted to make this in a more thought-out way. The tests would help, because I'm not interested at this point to have something fully running. I just want to start with testing as it should be. So going through the Micronaut
website, I found out that I would need a few dependencies, namely this one:
<dependency>
<groupId>io.micronaut.redis</groupId>
<artifactId>micronaut-redis-lettuce</artifactId>
<version>5.2.0</version>
</dependency>
This is a lettuce
dependency, which is mostly a Redis
client, which allows to access a lot of functionalities in Redis
. Then I created this configuration
in the application.yml
file:
redis:
uri: redis://localhost
timeout: 30s
I added the timeout
in order to guarantee connection
. This may be removed in the future.
Because I love all sorts of tests and in this case I was interested in seeing how Redis behaves I resourced to the use of Testcontainers
. If you do not know, Testcontainers is a framework that allows you to start and stop containers, programmatically
, via code, using your Docker
environment locally or in a pipeline. Anyway I created this abstraction to allow this to happen for now:
abstract class AbstractBuyOddYuccaConcertContainerTest {
companion object {
@Container
@JvmField
val postgreSQLContainer: TestPostgresSQLContainer = TestPostgresSQLContainer("postgres:14")
.withUsername("kong")
.withPassword("kong")
.withDatabaseName("yucca")
.withExposedPorts(POSTGRESQL_PORT)
.withCreateContainerCmdModifier { cmd ->
cmd.withHostConfig(
HostConfig().withPortBindings(
PortBinding(
bindPort(POSTGRESQL_PORT),
ExposedPort(POSTGRESQL_PORT)
)
)
)
}
@Container
@JvmField
val redis: GenericContainer<*> = GenericContainer(parse("redis:5.0.3-alpine"))
.withExposedPorts(REDIS_PORT)
.withCreateContainerCmdModifier { cmd ->
cmd.withHostConfig(
HostConfig().withPortBindings(
PortBinding(
bindPort(REDIS_PORT),
ExposedPort(REDIS_PORT)
)
)
)
}
private val config = ClassicConfiguration()
init {
postgreSQLContainer.start()
redis.start()
config.setDataSource(
postgreSQLContainer.jdbcUrl,
postgreSQLContainer.username,
postgreSQLContainer.password
)
config.schemas = arrayOf("ticket")
Flyway(config).migrate()
}
}
}
In this case I'm not only using Redis
, but I'm also using Postgres
. This is for other functionalities in my project, but the point here is to explain how I got to test the first Redis functionality in my project.
This is the test:
@MicronautTest
internal class ReservationsServiceTest @Inject constructor(
private val reservationsService: ReservationsService
) : AbstractBuyOddYuccaConcertContainerTest() {
@Test
fun `should get all reservations`(): Unit = runBlocking {
reservationsService.getAll().toList().shouldNotBeNull()
}
companion object {
@JvmStatic
@BeforeAll
fun setup() {
redis.start()
postgreSQLContainer.start()
}
}
}
The reason why I'm, at this moment, forcing the start of the containers in the @BeforeAll
test, is because I noticed that @MicronautTest
doesn't seem to work very well with the TestContainers
framework with my current setup and the Redis
client seems start being configured very much before the container starts.
Of course for this part, I didn't really adhere strictly to the idea of making the integration test first and then perform the implementation
. What I did is to firstly implement the service in this way:
@Singleton
class ReservationsService(
private val receiptRepository: ReceiptRepository,
private val redisClient: RedisClient
) {
init {
val statefulRedisPubSubConnection = redisClient.connectPubSub()
statefulRedisPubSubConnection.addListener(Listener())
val redisPubSubAsyncCommands = statefulRedisPubSubConnection.async()
redisPubSubAsyncCommands.subscribe("channel1")
redisClient.connectPubSub().async().publish("channel1", "test")
}
@OptIn(DelicateCoroutinesApi::class)
suspend fun createTicket(ticketDto: @Valid TicketDto) = GlobalScope.launch {
receiptRepository.save(Receipt()).toDto
}
fun getAll(): Flow<Receipt> = receiptRepository.findAll()
}
class Listener : RedisPubSubAdapter<String, String>() {
override fun message(p0: String?, p1: String?) {
}
}
Of course this is meant in the future to be where the production code starts. However, this class represents the first successful result in trying to access Redis
. Among the interfaces readily available in the Lettuce
framework, the only one that suits me at the moment in order to implement a Pub-Sub
mechanism is the RedisClient
. The reason why this is the only code that works for me and for now is because I realized that in order for the Pub Sub
to work, I need to create an async commands for the publisher and an async commands for the subscriber. Once the RedisClient
is injected, the way that the experimental code works goes something like this
- Create a
Pub Sub
connection
val statefulRedisPubSubConnection = redisClient.connectPubSub()
- Add a listener (It has to implement an interface of type RedisPubSubListener. RedisPubSubAdapter implements all methods and they are all empty by default)
statefulRedisPubSubConnection.addListener(Listener())
- Get the commands instance to be able to issue commands
val redisPubSubAsyncCommands = statefulRedisPubSubConnection.async()
- Create a channel by subscribing to it
redisPubSubAsyncCommands.subscribe("channel1")
- Send the first message in another connection
redisClient.connectPubSub().async().publish("channel1", "test")
Now I'm very happy to see this working locally. I can spin off the Redis container, connect to it and see the first Pub Sub working! If you are interested in follow the changes in my project, please have a look at it here: https://github.com/jesperancinha/buy-odd-yucca-concert/tree/master/buy-oyc-api-service/src/main. My goal of this quick post was to show you how I came about to understand the basics of a Pub Sub system implemented with Redis. You can find more documentation about this project here: https://github.com/jesperancinha/buy-odd-yucca-concert/tree/master/buy-oyc-api-service.
References