Skip to content

Pub Sub

Mark Paluch edited this page Jan 21, 2017 · 11 revisions

lettuce provides support for Publish/Subscribe on Redis Standalone and Redis Cluster connections. The connection is notified on message/subscribed/unsubscribed events after subscribing to channels or patterns. Synchronous, asynchronous and reactive API’s are provided to interact with Redis Publish/Subscribe features.

Subscribing

A connection can notify multiple listeners that implement RedisPubSubListener (lettuce provides a RedisPubSubAdapter for convenience). All listener registrations are kept within the StatefulRedisPubSubConnection/StatefulRedisClusterConnection.

Example 1. Synchronous subscription
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

// application flow continues
Example 2. Asynchronous subscription
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

// application flow continues

Reactive API

The reactive API provides hot Observables to listen on ChannelMessages and PatternMessages. The Observables receive all inbound messages. You can do filtering using the observable chain if you need to filter out the interesting ones, The Observable stops triggering events when the subscriber unsubscribes from it.

Example 3. Reactive subscription
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()

RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()

// application flow continues
Example 4. Redis Cluster Publish/Subscribe
StatefulRedisClusterConnection<String, String> connection = clusterClient.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
Clone this wiki locally