Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BucketQueueView in the protocol. #2807

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use linera_execution::{
ServiceRuntimeEndpoint, TransactionTracker,
};
use linera_views::{
bucket_queue_view::BucketQueueView,
context::Context,
log_view::LogView,
map_view::MapView,
queue_view::QueueView,
reentrant_collection_view::ReentrantCollectionView,
register_view::RegisterView,
set_view::SetView,
Expand Down Expand Up @@ -183,6 +183,13 @@ impl BundleInInbox {
}
}

// The number of timestamp in a bucket
// The `TimestampedBundleInInbox` type contains 4 cryptohashes, 1 blockheight
// an index, two enums and the ChannelName. Only the ChannelName has an unbounded
// size but we can expect the size to be reasonably small, so a total size of 100
// seems reasonable for the storing of the data.
Comment on lines +187 to +190
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this comment. It's going to be obsolete very quickly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probable even remove the first line.

const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
Copy link
Contributor

@ma2bd ma2bd Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMESTAMPED_BUNDLES_BUCKET_SIZE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also decide not create a constant altogether.

@afck What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean just inline the 100? Or make it configurable? I think this is an example where I wouldn't make it configurable, since this is very hard to explain to the user and there is probably simply a range of reasonable values one of which we can fix.

Copy link
Contributor

@ma2bd ma2bd Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was wondering if inlining the value wouldn't be just fine. Changing the value breaks the storage format -- which is internal to a validator but still...


/// A view accessing the state of a chain.
#[derive(Debug, RootView, ClonableView, SimpleObject)]
#[graphql(cache_control(no_cache))]
Expand Down Expand Up @@ -217,7 +224,8 @@ where
/// Mailboxes used to receive messages indexed by their origin.
pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
/// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
pub unskippable_bundles:
BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
/// Unskippable bundles that have been removed but are still in the queue.
pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
/// Mailboxes used to send messages, indexed by their target.
Expand Down Expand Up @@ -633,10 +641,10 @@ where
}
if !removed_unskippable.is_empty() {
// Delete all removed bundles from the front of the unskippable queue.
let maybe_front = self.unskippable_bundles.front().await?;
let maybe_front = self.unskippable_bundles.front();
if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
self.unskippable_bundles.delete_front();
while let Some(ts_entry) = self.unskippable_bundles.front().await? {
self.unskippable_bundles.delete_front().await?;
while let Some(ts_entry) = self.unskippable_bundles.front() {
if !removed_unskippable.remove(&ts_entry.entry) {
if !self
.removed_unskippable_bundles
Expand All @@ -647,7 +655,7 @@ where
}
self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
}
self.unskippable_bundles.delete_front();
self.unskippable_bundles.delete_front().await?;
}
}
for entry in removed_unskippable {
Expand Down
14 changes: 10 additions & 4 deletions linera-chain/src/outbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use linera_base::data_types::{ArithmeticError, BlockHeight};
#[cfg(with_testing)]
use linera_views::context::{create_test_memory_context, MemoryContext};
use linera_views::{
bucket_queue_view::BucketQueueView,
context::Context,
queue_view::QueueView,
register_view::RegisterView,
views::{ClonableView, View, ViewError},
};
Expand All @@ -15,6 +15,12 @@ use linera_views::{
#[path = "unit_tests/outbox_tests.rs"]
mod outbox_tests;

// The number of block heights in a bucket
// The `BlockHeight` has just 8 bytes so the size is constant.
// This means that by choosing a size of 1000, we have a
// reasonable size that will not create any memory issues.
const BLOCKHEIGHT_BUCKET_SIZE: usize = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCK_HEIGHT_BUCKET_SIZE


/// The state of an outbox
/// * An outbox is used to send messages to another chain.
/// * Internally, this is implemented as a FIFO queue of (increasing) block heights.
Expand All @@ -31,7 +37,7 @@ where
pub next_height_to_schedule: RegisterView<C, BlockHeight>,
/// Keep sending these certified blocks of ours until they are acknowledged by
/// receivers.
pub queue: QueueView<C, BlockHeight>,
pub queue: BucketQueueView<C, BlockHeight, BLOCKHEIGHT_BUCKET_SIZE>,
}

impl<C> OutboxStateView<C>
Expand Down Expand Up @@ -59,11 +65,11 @@ where
height: BlockHeight,
) -> Result<Vec<BlockHeight>, ViewError> {
let mut updates = Vec::new();
while let Some(h) = self.queue.front().await? {
while let Some(h) = self.queue.front().cloned() {
if h > height {
break;
}
self.queue.delete_front();
self.queue.delete_front().await?;
updates.push(h);
}
Ok(updates)
Expand Down
2 changes: 1 addition & 1 deletion linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ where
let chain = &mut self.state.chain;
if let (Some(epoch), Some(entry)) = (
chain.execution_state.system.epoch.get(),
chain.unskippable_bundles.front().await?,
chain.unskippable_bundles.front(),
) {
let ownership = chain.execution_state.system.ownership.get();
let elapsed = self
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/chain_worker/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ where
let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
for outbox in outboxes {
let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
let front = outbox.queue.front().await?;
if front.is_some_and(|key| key <= height) {
let front = outbox.queue.front();
if front.is_some_and(|key| *key <= height) {
return Ok(false);
}
}
Expand Down
20 changes: 10 additions & 10 deletions linera-service-graphql-client/gql/service_schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ A block height to identify blocks in a chain
scalar BlockHeight


type BucketQueueView_BlockHeight_e824a938 {
entries(count: Int): [BlockHeight!]!
}

type BucketQueueView_TimestampedBundleInInbox_5a630c55 {
entries(count: Int): [TimestampedBundleInInbox!]!
}

"""
An origin and cursor of a unskippable bundle that is no longer in our inbox.
"""
Expand Down Expand Up @@ -290,7 +298,7 @@ type ChainStateExtendedView {
"""
A queue of unskippable bundles, with the timestamp when we added them to the inbox.
"""
unskippableBundles: QueueView_TimestampedBundleInInbox_5a630c55!
unskippableBundles: BucketQueueView_TimestampedBundleInInbox_5a630c55!
"""
Unskippable bundles that have been removed but are still in the queue.
"""
Expand Down Expand Up @@ -847,7 +855,7 @@ type OutboxStateView {
Keep sending these certified blocks of ours until they are acknowledged by
receivers.
"""
queue: QueueView_BlockHeight_e824a938!
queue: BucketQueueView_BlockHeight_e824a938!
}

"""
Expand Down Expand Up @@ -927,18 +935,10 @@ type QueryRoot {
version: VersionInfo!
}

type QueueView_BlockHeight_e824a938 {
entries(count: Int): [BlockHeight!]!
}

type QueueView_MessageBundle_f4399f0b {
entries(count: Int): [MessageBundle!]!
}

type QueueView_TimestampedBundleInInbox_5a630c55 {
entries(count: Int): [TimestampedBundleInInbox!]!
}

"""
The recipient of a transfer
"""
Expand Down
1 change: 1 addition & 0 deletions linera-views/src/views/bucket_queue_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize
/// The size `N` has to be chosen by taking into account the size of the type `T`
/// and the basic size of a block. For example a total size of 100bytes to 10KB
/// seems adequate.
#[derive(Debug)]
Copy link
Contributor

@ma2bd ma2bd Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we generally derive Debug for views?

pub struct BucketQueueView<C, T, const N: usize> {
context: C,
/// The buckets of stored data. If missing, then it has not been loaded. The first index is always loaded.
Expand Down
Loading