-
Notifications
You must be signed in to change notification settings - Fork 749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use BucketQueueView
in the protocol.
#2807
base: main
Are you sure you want to change the base?
Changes from all commits
4363e6c
ed4fae0
19979d2
eeac30d
39fb362
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,10 +29,10 @@ use linera_execution::{ | |
ServiceRuntimeEndpoint, TransactionTracker, | ||
}; | ||
use linera_views::{ | ||
bucket_queue_view::BucketQueueView, | ||
context::Context, | ||
log_view::LogView, | ||
map_view::MapView, | ||
queue_view::QueueView, | ||
reentrant_collection_view::ReentrantCollectionView, | ||
register_view::RegisterView, | ||
set_view::SetView, | ||
|
@@ -183,6 +183,13 @@ impl BundleInInbox { | |
} | ||
} | ||
|
||
// The number of timestamp in a bucket | ||
// The `TimestampedBundleInInbox` type contains 4 cryptohashes, 1 blockheight | ||
// an index, two enums and the ChannelName. Only the ChannelName has an unbounded | ||
// size but we can expect the size to be reasonably small, so a total size of 100 | ||
// seems reasonable for the storing of the data. | ||
const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could also decide not create a constant altogether. @afck What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean just inline the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was wondering if inlining the value wouldn't be just fine. Changing the value breaks the storage format -- which is internal to a validator but still... |
||
|
||
/// A view accessing the state of a chain. | ||
#[derive(Debug, RootView, ClonableView, SimpleObject)] | ||
#[graphql(cache_control(no_cache))] | ||
|
@@ -217,7 +224,8 @@ where | |
/// Mailboxes used to receive messages indexed by their origin. | ||
pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>, | ||
/// A queue of unskippable bundles, with the timestamp when we added them to the inbox. | ||
pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>, | ||
pub unskippable_bundles: | ||
BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>, | ||
/// Unskippable bundles that have been removed but are still in the queue. | ||
pub removed_unskippable_bundles: SetView<C, BundleInInbox>, | ||
/// Mailboxes used to send messages, indexed by their target. | ||
|
@@ -633,10 +641,10 @@ where | |
} | ||
if !removed_unskippable.is_empty() { | ||
// Delete all removed bundles from the front of the unskippable queue. | ||
let maybe_front = self.unskippable_bundles.front().await?; | ||
let maybe_front = self.unskippable_bundles.front(); | ||
if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) { | ||
self.unskippable_bundles.delete_front(); | ||
while let Some(ts_entry) = self.unskippable_bundles.front().await? { | ||
self.unskippable_bundles.delete_front().await?; | ||
while let Some(ts_entry) = self.unskippable_bundles.front() { | ||
if !removed_unskippable.remove(&ts_entry.entry) { | ||
if !self | ||
.removed_unskippable_bundles | ||
|
@@ -647,7 +655,7 @@ where | |
} | ||
self.removed_unskippable_bundles.remove(&ts_entry.entry)?; | ||
} | ||
self.unskippable_bundles.delete_front(); | ||
self.unskippable_bundles.delete_front().await?; | ||
} | ||
} | ||
for entry in removed_unskippable { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,8 @@ use linera_base::data_types::{ArithmeticError, BlockHeight}; | |
#[cfg(with_testing)] | ||
use linera_views::context::{create_test_memory_context, MemoryContext}; | ||
use linera_views::{ | ||
bucket_queue_view::BucketQueueView, | ||
context::Context, | ||
queue_view::QueueView, | ||
register_view::RegisterView, | ||
views::{ClonableView, View, ViewError}, | ||
}; | ||
|
@@ -15,6 +15,12 @@ use linera_views::{ | |
#[path = "unit_tests/outbox_tests.rs"] | ||
mod outbox_tests; | ||
|
||
// The number of block heights in a bucket | ||
// The `BlockHeight` has just 8 bytes so the size is constant. | ||
// This means that by choosing a size of 1000, we have a | ||
// reasonable size that will not create any memory issues. | ||
const BLOCKHEIGHT_BUCKET_SIZE: usize = 1000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/// The state of an outbox | ||
/// * An outbox is used to send messages to another chain. | ||
/// * Internally, this is implemented as a FIFO queue of (increasing) block heights. | ||
|
@@ -31,7 +37,7 @@ where | |
pub next_height_to_schedule: RegisterView<C, BlockHeight>, | ||
/// Keep sending these certified blocks of ours until they are acknowledged by | ||
/// receivers. | ||
pub queue: QueueView<C, BlockHeight>, | ||
pub queue: BucketQueueView<C, BlockHeight, BLOCKHEIGHT_BUCKET_SIZE>, | ||
} | ||
|
||
impl<C> OutboxStateView<C> | ||
|
@@ -59,11 +65,11 @@ where | |
height: BlockHeight, | ||
) -> Result<Vec<BlockHeight>, ViewError> { | ||
let mut updates = Vec::new(); | ||
while let Some(h) = self.queue.front().await? { | ||
while let Some(h) = self.queue.front().cloned() { | ||
if h > height { | ||
break; | ||
} | ||
self.queue.delete_front(); | ||
self.queue.delete_front().await?; | ||
updates.push(h); | ||
} | ||
Ok(updates) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,6 +124,7 @@ fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize | |
/// The size `N` has to be chosen by taking into account the size of the type `T` | ||
/// and the basic size of a block. For example a total size of 100bytes to 10KB | ||
/// seems adequate. | ||
#[derive(Debug)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we generally derive |
||
pub struct BucketQueueView<C, T, const N: usize> { | ||
context: C, | ||
/// The buckets of stored data. If missing, then it has not been loaded. The first index is always loaded. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this comment. It's going to be obsolete very quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probable even remove the first line.