-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(event_ingestion): Try to adapt to new design #159
chore(event_ingestion): Try to adapt to new design #159
Conversation
|
pub batch: Vec<T>, | ||
/// `None` means the message does not require a flush. | ||
/// `Some` contains a list of watchers. | ||
pub flush: Option<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason you changed this to Option<()>
? (that's basically a bool
)
It used to contain oneshot sender, so it can notify the other end when flush is complete. We would wait for it to know when to return from a flush()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly it was just because I couldn't figure out what it was for and this seemed to make things simpler for now. No real reason beyond that
@@ -1,9 +1,9 @@ | |||
use std::collections::HashSet; | |||
use crate::events::event::Event; | |||
use crate::event_ingestion::event::Event; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I usually split imports into 3 sections in this order:
- std
- external
- local
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just let the IDE handle imports for me 😂
batch, | ||
flush: _flush, | ||
}) => { | ||
let result = event_delivery.deliver(batch.clone()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deliver()
should take the batch by reference, so we don't have to copy the whole batch of events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked ChatGPT and apparently handling deserialization with references is more tricky than I thought. i'll punt this to a follow up PR as well, please feel free to send a PR demonstrating how to do this cleanly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, deserialization is more tricky but delivery()
only needs serialization (which doesn't get complicated with references)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, i'll give it another shot
delivery_status | ||
.send(DeliveryStatus { | ||
failure: batch, | ||
success: Vec::new(), | ||
}) | ||
.await | ||
.ok()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retrying inline is the simplest thing we can do for v1. (In that case, we can ignore delivery statuses for now)
You may take a look at exponential-backoff for inspiration (or feel free to use the library)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the recommendation, will address that next
pub struct EventDispatcher<T> { | ||
config: EventDispatcherConfig, | ||
event_queue: T, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event dispatcher should be a thin wrapper around Sender<BatchedMessage<Event>>
now
assert_eq!( | ||
failed_events, | ||
vec![QueuedEvent { | ||
event, | ||
attempts: 1, | ||
status: QueuedEventStatus::Failed | ||
}] | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert is failing because we don't use an explicit queue. If we implement retries inline, I think we can just drop the queue and queued events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now I just updated it to assert on the failure
vec in the receiver 235e096#diff-cc0578a9294bc56ba4e2187fc2ce89c0da05c725b894fe4b8e57ef7d28ffb2c7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how to handle retries yet with this architecture, so I'll punt for now
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* chore(events): Redelivery failed events in batches * cleanup tests * use linked hash set to enforce event uniqueness and max retries * cargo fmt * more accurate retries * replace hashmap with separate queues * use HashSet for failed events * chore(event_ingestion): Try to adapt to new design (#159) * chore(event_ingestion): Try to adapt to new design * add `sync` flag for `tokio` dependency (#160) * add sync feature to tokio crate * add changeset * chore: bump versions before release (#161) Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * chore: fix typo in publish.yml update_ruby_lockfile step (#162) * chore(ruby): update lockfile (#163) * chore: hide event ingestion under a feature flag (#164) * chore: bump versions before release (#165) Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * chore(ruby): update lockfile (#166) * enable feature, use tokio timer advance * more code review comments * fix failing tests * DRY up --------- Co-authored-by: Leo Romanovsky <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Oleksii Shmalko <[email protected]> * nuke VecEventQUeue * more cleanup * WIP retry failures * remove this * fix tests * code review + exponential backoff * update channel size --------- Co-authored-by: Leo Romanovsky <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Oleksii Shmalko <[email protected]>
Description
@rasendubi this is a stacked PR on top of #158 and is my attempt to incorporate your proposed design on #157 on top of the existing code.
I'm sure this has a lot of issues and is nowhere near "done" or "complete", but is more of a initial proposal to see if
There's still a bunch of questions left unanswered, mainly how do we handle retries with this design.
For now, just putting it up for feedback. Feel free to iterate on the same branch if you'd like