Skip to content

Commit

Permalink
Return from add_event and remove_event whether it was new. (#1870)
Browse files Browse the repository at this point in the history
  • Loading branch information
afck authored Apr 5, 2024
1 parent 538ce38 commit 1c1dbd8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 58 deletions.
26 changes: 17 additions & 9 deletions linera-chain/src/inbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ where
}

/// Consumes an event from the inbox.
pub(crate) async fn remove_event(&mut self, event: &Event) -> Result<(), InboxError> {
///
/// Returns `true` if the event was already known, i.e. it was present in `added_events`.
pub(crate) async fn remove_event(&mut self, event: &Event) -> Result<bool, InboxError> {
// Record the latest cursor.
let cursor = Cursor::from(event);
ensure!(
Expand All @@ -179,7 +181,7 @@ where
tracing::trace!("Skipping previously received event {:?}", previous_event);
}
// Reconcile the event with the next added event, or mark it as removed.
match self.added_events.front().await? {
let already_known = match self.added_events.front().await? {
Some(previous_event) => {
// Rationale: If the two cursors are equal, then the events should match.
// Otherwise, at this point we know that `self.next_cursor_to_add >
Expand All @@ -195,19 +197,23 @@ where
);
self.added_events.delete_front();
tracing::trace!("Consuming event {:?}", event);
true
}
None => {
tracing::trace!("Marking event as expected: {:?}", event);
self.removed_events.push_back(event.clone());
false
}
}
};
self.next_cursor_to_remove.set(cursor.try_add_one()?);
Ok(())
Ok(already_known)
}

/// Pushes an event to the inbox. The verifications should not fail in production unless
/// many validators are faulty.
pub(crate) async fn add_event(&mut self, event: Event) -> Result<(), InboxError> {
///
/// Returns `true` if the event was new, `false` if it was already in `removed_events`.
pub(crate) async fn add_event(&mut self, event: Event) -> Result<bool, InboxError> {
// Record the latest cursor.
let cursor = Cursor::from(&event);
ensure!(
Expand All @@ -218,7 +224,7 @@ where
}
);
// Find if the message was removed ahead of time.
match self.removed_events.front().await? {
let newly_added = match self.removed_events.front().await? {
Some(previous_event) => {
if Cursor::from(&previous_event) == cursor {
// We already executed this message by anticipation. Remove it from
Expand All @@ -242,14 +248,16 @@ where
}
);
}
false
}
None => {
// Otherwise, schedule the message for execution.
self.added_events.push_back(event)
self.added_events.push_back(event);
true
}
}
};
self.next_cursor_to_add.set(cursor.try_add_one()?);
Ok(())
Ok(newly_added)
}
}

Expand Down
118 changes: 69 additions & 49 deletions linera-chain/src/unit_tests/inbox_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ async fn test_inbox_add_then_remove_skippable() {
let hash = CryptoHash::test_hash("1");
let mut view = InboxStateView::new().await;
// Add one event.
view.add_event(make_event(hash, 0, 0, [0])).await.unwrap();
assert!(view.add_event(make_event(hash, 0, 0, [0])).await.unwrap());
// Remove the same event
view.remove_event(&make_event(hash, 0, 0, [0]))
assert!(view
.remove_event(&make_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Fail to add an old event.
assert_matches!(
view.add_event(make_event(hash, 0, 0, [0])).await,
Expand All @@ -65,23 +66,24 @@ async fn test_inbox_add_then_remove_skippable() {
Err(InboxError::IncorrectOrder { .. })
);
// Add two more events.
view.add_event(make_event(hash, 0, 1, [1])).await.unwrap();
view.add_event(make_event(hash, 1, 0, [2])).await.unwrap();
assert!(view.add_event(make_event(hash, 0, 1, [1])).await.unwrap());
assert!(view.add_event(make_event(hash, 1, 0, [2])).await.unwrap());
// Fail to remove non-matching event.
assert_matches!(
view.remove_event(&make_event(hash, 0, 1, [0])).await,
Err(InboxError::UnexpectedEvent { .. })
);
// Fail to remove non-matching even (hash).
// Fail to remove non-matching event (hash).
assert_matches!(
view.remove_event(&make_event(CryptoHash::test_hash("2"), 0, 1, [1]))
.await,
Err(InboxError::UnexpectedEvent { .. })
);
// OK to skip events.
view.remove_event(&make_event(hash, 1, 0, [2]))
assert!(view
.remove_event(&make_event(hash, 1, 0, [2]))
.await
.unwrap();
.unwrap());
// Inbox is empty again.
assert_eq!(view.added_events.count(), 0);
assert_eq!(view.removed_events.count(), 0);
Expand All @@ -92,11 +94,12 @@ async fn test_inbox_remove_then_add_skippable() {
let hash = CryptoHash::test_hash("1");
let mut view = InboxStateView::new().await;
// Remove one event by anticipation.
view.remove_event(&make_event(hash, 0, 0, [0]))
assert!(!view
.remove_event(&make_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Add the same event
view.add_event(make_event(hash, 0, 0, [0])).await.unwrap();
assert!(!view.add_event(make_event(hash, 0, 0, [0])).await.unwrap());
// Fail to remove an old event.
assert_matches!(
view.remove_event(&make_event(hash, 0, 0, [0])).await,
Expand All @@ -108,12 +111,14 @@ async fn test_inbox_remove_then_add_skippable() {
Err(InboxError::IncorrectOrder { .. })
);
// Remove two more events.
view.remove_event(&make_event(hash, 0, 1, [1]))
assert!(!view
.remove_event(&make_event(hash, 0, 1, [1]))
.await
.unwrap();
view.remove_event(&make_event(hash, 1, 1, [3]))
.unwrap());
assert!(!view
.remove_event(&make_event(hash, 1, 1, [3]))
.await
.unwrap();
.unwrap());
// Fail to add non-matching event.
assert_matches!(
view.add_event(make_event(hash, 0, 1, [0])).await,
Expand All @@ -131,15 +136,15 @@ async fn test_inbox_remove_then_add_skippable() {
Err(InboxError::UnexpectedEvent { .. })
);
// OK to backfill the two consumed events, with one skippable event in the middle.
view.add_event(make_event(hash, 0, 1, [1])).await.unwrap();
assert!(!view.add_event(make_event(hash, 0, 1, [1])).await.unwrap());
// Cannot add an unskippable event that was visibly skipped already.
assert_matches!(
view.add_event(make_unskippable_event(hash, 1, 0, [2]))
.await,
Err(InboxError::UnexpectedEvent { .. })
);
view.add_event(make_event(hash, 1, 0, [2])).await.unwrap();
view.add_event(make_event(hash, 1, 1, [3])).await.unwrap();
assert!(!view.add_event(make_event(hash, 1, 0, [2])).await.unwrap());
assert!(!view.add_event(make_event(hash, 1, 1, [3])).await.unwrap());
// Inbox is empty again.
assert_eq!(view.added_events.count(), 0);
assert_eq!(view.removed_events.count(), 0);
Expand All @@ -150,13 +155,15 @@ async fn test_inbox_add_then_remove_unskippable() {
let hash = CryptoHash::test_hash("1");
let mut view = InboxStateView::new().await;
// Add one event.
view.add_event(make_unskippable_event(hash, 0, 0, [0]))
assert!(view
.add_event(make_unskippable_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Remove the same event
view.remove_event(&make_unskippable_event(hash, 0, 0, [0]))
assert!(view
.remove_event(&make_unskippable_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Fail to add an old event.
assert_matches!(
view.add_event(make_unskippable_event(hash, 0, 0, [0]))
Expand All @@ -170,12 +177,14 @@ async fn test_inbox_add_then_remove_unskippable() {
Err(InboxError::IncorrectOrder { .. })
);
// Add two more events.
view.add_event(make_unskippable_event(hash, 0, 1, [1]))
assert!(view
.add_event(make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
view.add_event(make_unskippable_event(hash, 1, 0, [2]))
.unwrap());
assert!(view
.add_event(make_unskippable_event(hash, 1, 0, [2]))
.await
.unwrap();
.unwrap());
// Fail to remove non-matching event.
assert_matches!(
view.remove_event(&make_unskippable_event(hash, 0, 1, [0]))
Expand All @@ -199,12 +208,14 @@ async fn test_inbox_add_then_remove_unskippable() {
Err(InboxError::UnskippableEvent {event })
if event == make_unskippable_event(hash, 0, 1, [1])
);
view.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
assert!(view
.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
view.remove_event(&make_unskippable_event(hash, 1, 0, [2]))
.unwrap());
assert!(view
.remove_event(&make_unskippable_event(hash, 1, 0, [2]))
.await
.unwrap();
.unwrap());
// Inbox is empty again.
assert_eq!(view.added_events.count(), 0);
assert_eq!(view.removed_events.count(), 0);
Expand All @@ -215,13 +226,15 @@ async fn test_inbox_remove_then_add_unskippable() {
let hash = CryptoHash::test_hash("1");
let mut view = InboxStateView::new().await;
// Remove one event by anticipation.
view.remove_event(&make_unskippable_event(hash, 0, 0, [0]))
assert!(!view
.remove_event(&make_unskippable_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Add the same event
view.add_event(make_unskippable_event(hash, 0, 0, [0]))
assert!(!view
.add_event(make_unskippable_event(hash, 0, 0, [0]))
.await
.unwrap();
.unwrap());
// Fail to remove an old event.
assert_matches!(
view.remove_event(&make_unskippable_event(hash, 0, 0, [0]))
Expand All @@ -235,12 +248,14 @@ async fn test_inbox_remove_then_add_unskippable() {
Err(InboxError::IncorrectOrder { .. })
);
// Remove two more events.
view.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
assert!(!view
.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
view.remove_event(&make_unskippable_event(hash, 1, 1, [3]))
.unwrap());
assert!(!view
.remove_event(&make_unskippable_event(hash, 1, 1, [3]))
.await
.unwrap();
.unwrap());
// Fail to add non-matching event.
assert_matches!(
view.add_event(make_unskippable_event(hash, 0, 1, [0]))
Expand All @@ -265,18 +280,20 @@ async fn test_inbox_remove_then_add_unskippable() {
Err(InboxError::UnexpectedEvent { .. })
);
// OK to add the two events.
view.add_event(make_unskippable_event(hash, 0, 1, [1]))
assert!(!view
.add_event(make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
.unwrap());
// Cannot add an unskippable event that was visibly skipped already.
assert_matches!(
view.add_event(make_unskippable_event(hash, 1, 0, [2]))
.await,
Err(InboxError::UnexpectedEvent { .. })
);
view.add_event(make_unskippable_event(hash, 1, 1, [3]))
assert!(!view
.add_event(make_unskippable_event(hash, 1, 1, [3]))
.await
.unwrap();
.unwrap());
// Inbox is empty again.
assert_eq!(view.added_events.count(), 0);
assert_eq!(view.removed_events.count(), 0);
Expand All @@ -287,10 +304,11 @@ async fn test_inbox_add_then_remove_mixed() {
let hash = CryptoHash::test_hash("1");
let mut view = InboxStateView::new().await;
// Add two events.
view.add_event(make_unskippable_event(hash, 0, 1, [1]))
assert!(view
.add_event(make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
view.add_event(make_event(hash, 1, 0, [2])).await.unwrap();
.unwrap());
assert!(view.add_event(make_event(hash, 1, 0, [2])).await.unwrap());
// Fail to remove non-matching event (skippability).
assert_matches!(
view.remove_event(&make_event(hash, 0, 1, [1])).await,
Expand All @@ -313,12 +331,14 @@ async fn test_inbox_add_then_remove_mixed() {
Err(InboxError::UnskippableEvent { event })
if event == make_unskippable_event(hash, 0, 1, [1])
);
view.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
assert!(view
.remove_event(&make_unskippable_event(hash, 0, 1, [1]))
.await
.unwrap();
view.remove_event(&make_event(hash, 1, 0, [2]))
.unwrap());
assert!(view
.remove_event(&make_event(hash, 1, 0, [2]))
.await
.unwrap();
.unwrap());
// Inbox is empty again.
assert_eq!(view.added_events.count(), 0);
assert_eq!(view.removed_events.count(), 0);
Expand Down

0 comments on commit 1c1dbd8

Please sign in to comment.