-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Design for Postgres event transport #11
Comments
Note that a postgres-based event transport will not achieve application/bus atomicity. For this we will still need a postgres-based layer that resides in the database being written to. This is because postgres does not support cross-database transactions |
Hi there, I just found lightbus and postgres support looks interesting. Thinking about it, I am wondering if it doesn't help to investigate this issue from a postgresql POV. For example postgres allows you to select items from a queue using "SELECT … FOR UPDATE SKIP LOCKED" or use advisory locks which both can help when implementing a task queue. So instead of thinking in terms of redis streams, maybe think in the semantics that you are trying to achieve instead. For events this could mean "an event needs to be created and then consumed at least once" and then think about an efficient way to achieve this in postgres. You kind of have this in Please note that I am not saying your current approach/thoughts are bad or wrong in any way; I am merely trying to provide another perspective/approach to the problem. |
Hey @apollo13, you make good points, and input is always very welcome. Thank you for taking the time. I definitely agree with you about coming at this from a Postgres point of view. To give some history, at the back of my mind I was assuming this could simply be implemented as typical Postgres queue. Events would be inserted into a table, and they would be selected out and processed (using one of the techniques you point out above). However, I realised this wouldn't work. This provides a job queue (one message goes in, one comes out), but Lightbus events use a fan-out design. In Lightbus there may be zero or more consumers[1] for an event, and each consumer will receive each event at least once. At this point I decided to make some notes in this ticket, so what you see above is how far I got in thinking about it at the time. I think I was essentially trying to get some notes down around "we need to implement consumer groups in Postgres". [2] Perhaps I could therefore crystallise the core requirement as: An event needs to be received by each listener at least once. If it helps, off the top of my head I can see a couple of potential avenues to explore:
These are only rough ideas, and probably need some refining. I could knock out some pseudocode if that would be of interest. I certainly be happy to support you if you were interested in working on this. Even if not, thank you for spurring on some more thought about this 👍 [1] The Lightbus API calls consumers 'listeners' [2] I suspect this is somewhat what I was getting at with "Determine which consumers are currently processing a message". I.e. we should be able to reclaim messages from dead consumers within our group. |
Ah right, this is probably what I was missing. And since the client creating the events doesn't know the consumers there is no way to insert the event multiple times for different consumers either.
I very much like this as problem description. I'll think about it and let you know if I can come up with something :) Thank you for taking the time to go into the details. |
No problem at all. I'd love to hear what you come up with! Also, to expand on this:
I would like to explicitly add: It is possible that a future consumer group will be created which wishes to consume past events (i.e. event sourcing). So it is definitely true that one can not know all the potential destinations for an event at the time an event is fired. |
Adding some notes here regarding some discussion I've had with @apollo13 in the discourse channel:
|
Needed commands (mirroring from Redis event transport):
xadd
xread_group
xpending
xclaim
xrevrange
This requires:
We will need to be able to:
Potential schema outline:
messages
id SERIAL INT
created_at TIMESTAMPTZ
stream VARCHAR(255)
content JSONB
groups
id SERIAL INT
name VARCHAR(255)
stream VARCHAR(255)
latest_id INT
pending_messages
message_id INT
consumer_name VARCHAR(255)
claimed_at TIMESTAMPTZ
Potentially move each stream into its own table.
Note: The old transaction event transport was deleted in 899e9b5
The text was updated successfully, but these errors were encountered: