Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis PubSub #226

Open
Owez opened this issue Oct 19, 2022 · 17 comments
Open

Redis PubSub #226

Owez opened this issue Oct 19, 2022 · 17 comments
Labels
A-redis Area: Redis / deadpool-redis enhancement New feature or request

Comments

@Owez
Copy link

Owez commented Oct 19, 2022

There's a .publish() method in deadpool redis but I can't find any way to subscribe to channels for pubsub usage in redis. How should I go about this, and are there any async alternatives if deadpool doesn't support it?

@bikeshedder bikeshedder added the A-redis Area: Redis / deadpool-redis label Oct 26, 2022
@bikeshedder
Copy link
Owner

deadpool_redis::Connection is just a wrapper around redis::aio::Connection which doesn't support into_pubsub as it requires ownership of the object and not just a reference to it. Most users don't need shdrt lived PubSub objects and therefore I never really bothered implementing a solution for this. Just create the object directly and keep it for the entire duration of the program. (or use Connection::take as explained below)

It could be possible to provide this feature by implementing some clever wrapping utilizing the redis::aio::Connectioninto_pubsub and `redis::aio::PubSub::into_connection methods. If this is indeed a valid use case of yours feel free to submit a PR for this feature.

In the mean time the easiest way to get a PubSub object is by calling Pool::get followed by a Object::take thus detaching the object from the pool and taking full ownership over the underlying redis::aio::Connection object:

let obj = deadpool_redis::Connection::take(pool.get().await?);
let pubsub = obj.into_pubsub();

Due to the take() call the object just won't be returned to the pool afterwards and there is currently no way to manually add a connection back to the pool.

@bikeshedder
Copy link
Owner

I'm closing this as I don't think it's very practical to use a pool for listening for messages. I might be totally wrong so please feel free to comment here and/or open a new issue describing a use case where this is needed.

@bikeshedder bikeshedder added the invalid The issue is invalid and has been raised in error. label Dec 9, 2022
@Owez
Copy link
Author

Owez commented Dec 10, 2022

Yep your right, this ended up being an issue with the redis pubsub implementation itself. I think it was a third party library which was merged into it?

@sp1ff
Copy link

sp1ff commented Mar 24, 2023

Would keyspace notifications be a use case? I need to know about changes to a particular key, but once they've happened, I'm done with it.

@bbaldino
Copy link

bbaldino commented May 2, 2023

I'm also interested in a pooled pubsub usecase. We're using an adhoc pubsub channel to add some request/response type semantic; a stream for the return channel means that the data would be persisted, and we want something more ephemeral so we don't have to worry about cleanup, so looking at using pubsub. Pubsub works well, but creating a new connection every time isn't very efficient.

@bikeshedder bikeshedder reopened this May 3, 2023
@bikeshedder bikeshedder added discussion and removed invalid The issue is invalid and has been raised in error. labels May 3, 2023
@bikeshedder

This comment was marked as outdated.

@bikeshedder
Copy link
Owner

I once again fell into the trap of confusing redis::aio::Connection with redis::Connection. I've just marked my previous comment as outdated as it was highly misleading.

#226 (comment) is still correct about the way it needs to be implemented. The connection wrapper needs to provide a as_pubsub method which calls into_pubsub underneath and returns a PubSub wrapper object. That wrapper must borrow mutably from connection so it can take the redis::aio::Connection from it, call into_pubsub on it and on drop return it to the connection wrapper by calling into_connection first and restoring the original state of the connection wrapper.

@Owez
Copy link
Author

Owez commented May 3, 2023

Would that solution be a workaround because of the current limitations in redis::aio?

My eventual solution was to write a custom PubSub implementation for my specific usecase and use deadpool for the plain TCP connections, but obviously that was a heavy-handed approach 🙂

@bikeshedder bikeshedder added enhancement New feature or request and removed discussion labels Dec 15, 2023
@carlocorradini
Copy link

Any update 😢

@paulkre
Copy link

paulkre commented Jul 28, 2024

redis::aio::Connection is deprecated and redis::aio::MultiplexedConnection does not have the into_pubsub method. It has a subscribe method in redis ^0.26 though. Can we upgrade the dependency on redis?

@attila-lin
Copy link
Contributor

redis::aio::Connection is deprecated and redis::aio::MultiplexedConnection does not have the into_pubsub method. It has a subscribe method in redis ^0.26 though. Can we upgrade the dependency on redis?

updated.

@musjj
Copy link

musjj commented Nov 7, 2024

Are there any updates here? What's currently the idiomatic way to subscribe to a stream of messages with deadpool-redis (like with PubSub::on_message)? Do you need to open a new connection every time? Like:

redis::Client::open("redis://127.0.0.1");

@bikeshedder
Copy link
Owner

I just checked the documentation and example code from the current redis version and it seams that PubSub now works very different from previous versions. Instead of calling into_pubsub you create a dedicated PubSub object via the Client::get_async_pubsub method.

After a bit more digging it is possible to convert a Connection into a PubSub but there doesn't seam to be a way to get back the original connection once it's been converted into a PubSub object.

I currently see only one way for this to work:

  • Create a separate pool implementation just for PubSub connections. e.g. deadpool_redis::PubSubPool which creates PubSub connections.

It really is a bummer that the redis crate makes a distinction between Connection, PubSub and Monitor which are all different types of connections which can't be interchanged.

It's weird as the MultiplexedConnection even provides methods for (un)subscribing but there doesn't seam to be a way to access the published messages: https://docs.rs/redis/latest/redis/aio/struct.MultiplexedConnection.html

@musjj
Copy link

musjj commented Nov 10, 2024

Yeah, the way the connection is designed feels very awkward. Even stranger is that PubSub can't even publish messages, betraying its name.

Would a change in redis-rs be necessary here? I thought about opening an issue there, but I'm not sure what kind of change would be ideal.

@bikeshedder
Copy link
Owner

Now that I understand the difference between RESP2 and RESP3 I think it's possible to provide pubsub support in deadpool-redis. The API is a bit weird as it doesn't utilize a stream but expects an UnboundedSender. It does require an active reader of that channel or it will otherwise result in an OOM situation sooner or later.

A better approach would be a stream like API that only starts buffering messages as long as there is a consumer waiting for messages. I can work around this in deadpool-redis but it requires an extra tokio task for every connection that reads from the channel.

Maybe there is a chance to change this to a Box<dyn Fn<PushInfo>> for a future redis crate release.

@bikeshedder
Copy link
Owner

Redis 0.27.6 just got released and after checking the current deadpool-redis API the following changes are required for this to work:

  1. Remove RedisConnectionInfo::protocol field. Changing the protocol version also requires changing the application code so I see very little use in making this configurable via environment variables or config files.
  2. The protocol_version still needs to be specified somehow. Either by adding a new required parameter to the Manager constructor function or creating two pool types. As RESP2 and RESP3 are basically incompatible with each other those distinct types might actually be a good thing.
  3. Add RecyclingMethod to deadpool-redis #307 is required to clean up subscriptions when recycling connections.
  4. Allow configuring Redis connections via AsyncConnectionConfig #357 was recently merged but contains the push_sender information that we need to control. For this to work can't give the user access to this setting and this PR needs to be reverted/reworked.
  5. The connection object has to store a way to access the event stream somehow. I'm a huge fan of APIs like while let Some(msg) = connection.stream() { ... } which makes consuming events a breeze.
  6. What about redis-cluster and redis-sentinel?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-redis Area: Redis / deadpool-redis enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants