Skip to content

Pub Sub (4.0)

Mark Paluch edited this page Jun 2, 2016 · 4 revisions

lettuce provides support for Publish/Subscribe on Redis Standalone and Redis Cluster connections. The connection is notified on message/subscribed/unsubscribed events after subscribing to channels or patterns. Sync, async and reactive API's are available to operate on the connection.

Subscribing

A connection can notify multiple listeners that implement RedisPubSubListener (lettuce provides a RedisPubSubAdapter for convenience). All listener registrations are kept within the StatefulRedisPubSubConnection/StatefulRedisClusterConnection.

Synchronous example

StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");

Asynchronous example

StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");

Reactive API

The reactive API provides two hot Observables to listen on ChannelMessages and PatternMessages. The Observables receive all inbound messages. You can do filtering using the observable chain if you need to filter out the interesting ones, The Observable stops triggering events when the subscriber unsubscribes from it.

StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub()

RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();

reactive.observePatterns().doOnNext(patternMessage -> {...}).subscribe()

Redis Cluster example

StatefulRedisClusterConnection<String, String> connection = clusterClient.connectPubSub()
connection.addListener(new RedisPubSubListener<String, String>() { ... })

RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
Clone this wiki locally