RSocket support in Spring messaging
class MessageHandler(private val builder: RSocketRequester.Builder) {
// ...
suspend fun stream(request: ServerRequest): ServerResponse {
val requester = builder
.dataMimeType(APPLICATION_CBOR)
.connectTcpAndAwait("localhost", 9898)
val replies = requester
.route("bot.messages")
.dataWithType(processor)
.retrieveFlow<Message>()
val broadcast = requester.route("bot.broadcast").retrieveFlow<Message>()
val messages = flowOf(replies, processor.asFlow(), broadcast).flattenMerge()
return ok().sse().bodyAndAwait(messages)
}
}