Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event_ingestion): Try to adapt to new design #159

Merged
merged 13 commits into from
Jan 17, 2025

Conversation

felipecsl
Copy link
Member

Description

@rasendubi this is a stacked PR on top of #158 and is my attempt to incorporate your proposed design on #157 on top of the existing code.

I'm sure this has a lot of issues and is nowhere near "done" or "complete", but is more of a initial proposal to see if

  1. do I have the correct understanding about what you're proposing?
  2. is this how you envisioned integrating this?

There's still a bunch of questions left unanswered, mainly how do we handle retries with this design.
For now, just putting it up for feedback. Feel free to iterate on the same branch if you'd like

Copy link

changeset-bot bot commented Jan 17, 2025

⚠️ No Changeset found

Latest commit: 4557cf4

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@felipecsl felipecsl requested a review from rasendubi January 17, 2025 00:20
pub batch: Vec<T>,
/// `None` means the message does not require a flush.
/// `Some` contains a list of watchers.
pub flush: Option<()>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you changed this to Option<()>? (that's basically a bool)

It used to contain oneshot sender, so it can notify the other end when flush is complete. We would wait for it to know when to return from a flush()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly it was just because I couldn't figure out what it was for and this seemed to make things simpler for now. No real reason beyond that

@@ -1,9 +1,9 @@
use std::collections::HashSet;
use crate::events::event::Event;
use crate::event_ingestion::event::Event;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually split imports into 3 sections in this order:

  • std
  • external
  • local

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just let the IDE handle imports for me 😂

batch,
flush: _flush,
}) => {
let result = event_delivery.deliver(batch.clone()).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliver() should take the batch by reference, so we don't have to copy the whole batch of events

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked ChatGPT and apparently handling deserialization with references is more tricky than I thought. i'll punt this to a follow up PR as well, please feel free to send a PR demonstrating how to do this cleanly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, deserialization is more tricky but delivery() only needs serialization (which doesn't get complicated with references)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, i'll give it another shot

Comment on lines 51 to 57
delivery_status
.send(DeliveryStatus {
failure: batch,
success: Vec::new(),
})
.await
.ok()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying inline is the simplest thing we can do for v1. (In that case, we can ignore delivery statuses for now)

You may take a look at exponential-backoff for inspiration (or feel free to use the library)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the recommendation, will address that next

Comment on lines +42 to +45
pub struct EventDispatcher<T> {
config: EventDispatcherConfig,
event_queue: T,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event dispatcher should be a thin wrapper around Sender<BatchedMessage<Event>> now

Comment on lines 160 to 167
assert_eq!(
failed_events,
vec![QueuedEvent {
event,
attempts: 1,
status: QueuedEventStatus::Failed
}]
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is failing because we don't use an explicit queue. If we implement retries inline, I think we can just drop the queue and queued events

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now I just updated it to assert on the failure vec in the receiver 235e096#diff-cc0578a9294bc56ba4e2187fc2ce89c0da05c725b894fe4b8e57ef7d28ffb2c7

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to handle retries yet with this architecture, so I'll punt for now

@felipecsl felipecsl merged commit 903eae4 into felipecsl--retry-failed-events Jan 17, 2025
32 checks passed
@felipecsl felipecsl deleted the felipecsl--events-new-arch branch January 17, 2025 22:40
felipecsl added a commit that referenced this pull request Jan 22, 2025
* chore(events): Redelivery failed events in batches

* cleanup tests

* use linked hash set to enforce event uniqueness and max retries

* cargo fmt

* more accurate retries

* replace hashmap with separate queues

* use HashSet for failed events

* chore(event_ingestion): Try to adapt to new design (#159)

* chore(event_ingestion): Try to adapt to new design

* add `sync` flag for `tokio` dependency (#160)

* add sync feature to tokio crate

* add changeset

* chore: bump versions before release (#161)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore: fix typo in publish.yml update_ruby_lockfile step (#162)

* chore(ruby): update lockfile (#163)

* chore: hide event ingestion under a feature flag (#164)

* chore: bump versions before release (#165)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore(ruby): update lockfile (#166)

* enable feature, use tokio timer advance

* more code review comments

* fix failing tests

* DRY up

---------

Co-authored-by: Leo Romanovsky <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Oleksii Shmalko <[email protected]>

* nuke VecEventQUeue

* more cleanup

* WIP retry failures

* remove this

* fix tests

* code review + exponential backoff

* update channel size

---------

Co-authored-by: Leo Romanovsky <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Oleksii Shmalko <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants