Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisStorage seems to be broken #155

Open
allo- opened this issue Aug 6, 2024 · 3 comments
Open

RedisStorage seems to be broken #155

allo- opened this issue Aug 6, 2024 · 3 comments

Comments

@allo-
Copy link

allo- commented Aug 6, 2024

When using RedisStorage, I get the error: unsupported operand type(s) for -: 'Pipeline' and 'int' at

pub_prev_id = str(e.id - 1)

Testing it a bit, I think this line is wrong:

event_id = pipe.incr("event_counter:" + channel)

and the results of the pipe are only available after pipe.execute(). I wanted to try event_id, _ = pipe.execute(), but the data in the pipe.setex call already needs the event_id.

Printing the result of pipe.execute without changing other code gives for example [2, True].

@jkarneges
Copy link
Member

Maybe @erfantarighi can look at this.

@allo-
Copy link
Author

allo- commented Oct 2, 2024

@erfantarighi Can you have a look?

I currently have storage disabled because of the issue, but in the project I am developing it would probably be good to have storage sooner or later and as I am already using Redis I think RedisStorage would be the best choice.

@stefanofusai
Copy link

stefanofusai commented Feb 14, 2025

@allo- I solved this by subclassing RedisStorage and overriding the append_event method. Here's my code, let me know your thoughts:

import json

from django_eventstream.event import Event
from django_eventstream.storage import EVENT_TIMEOUT
from django_eventstream.storage import RedisStorage as BaseRedisStorage


class RedisStorage(BaseRedisStorage):
    def append_event(self, channel: str, event_type: str, data: dict) -> Event:
        """
        Appends a new event to the storage for the specified channel.

        Args:
            channel (str): The name of the channel to append the event to.
            event_type (str): The type of the event.
            data (dict): The data associated with the event.

        Returns:
            Event: An Event object representing the appended event.
        """
        with self.redis.pipeline() as pipe:
            try:
                _event_id = pipe.incr("event_counter:" + channel)
                event_data = json.dumps({"type": event_type, "data": data})
                pipe.setex(
                    "event:" + channel + ":" + str(_event_id),
                    EVENT_TIMEOUT * 60,
                    event_data,
                )
                event_id, _ = pipe.execute()
                return Event(channel, event_type, data, id=event_id)

            except ConnectionError as e:
                raise ConnectionError("Failed to append event to Redis.") from e
EVENTSTREAM_STORAGE_CLASS = "myapp.storages.RedisStorage"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants