Skip to content

Commit

Permalink
[indexer-alt] Remove coin balance bucket pruner (#20805)
Browse files Browse the repository at this point in the history
## Description 

This PR reimplements the coin balance bucket pruner using proper pruner
instead of a pipeline.
This is the last pipeline that uses consistency layer. So we also get
rid of all the related implementation code for it.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
lxfind authored Jan 9, 2025
1 parent 560668e commit 56fdf0b
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 388 deletions.
116 changes: 2 additions & 114 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
/// closed.
pub(super) fn collector<H: Handler + 'static>(
config: CommitterConfig,
checkpoint_lag: Option<u64>,
mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
tx: mpsc::Sender<BatchedRows<H>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -93,10 +92,6 @@ pub(super) fn collector<H: Handler + 'static>(
let mut poll = interval(config.collect_interval());
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);

// Data for checkpoints that have been received but not yet ready to be sent to committer due to lag constraint.
let mut received: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
let checkpoint_lag = checkpoint_lag.unwrap_or_default();

let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
&metrics.collected_checkpoint_timestamp_lag,
&metrics.latest_collected_checkpoint_timestamp_lag_ms,
Expand Down Expand Up @@ -186,8 +181,8 @@ pub(super) fn collector<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.inc();

received.insert(indexed.checkpoint(), indexed);
pending_rows += move_ready_checkpoints(&mut received, &mut pending, checkpoint_lag);
pending_rows += indexed.len();
pending.insert(indexed.checkpoint(), indexed.into());

if pending_rows >= H::MIN_EAGER_ROWS {
poll.reset_immediately()
Expand All @@ -198,34 +193,6 @@ pub(super) fn collector<H: Handler + 'static>(
})
}

/// Move all checkpoints from `received` that are within the lag range into `pending`.
/// Returns the number of rows moved.
fn move_ready_checkpoints<H: Handler>(
received: &mut BTreeMap<u64, IndexedCheckpoint<H>>,
pending: &mut BTreeMap<u64, PendingCheckpoint<H>>,
checkpoint_lag: u64,
) -> usize {
let tip = match (received.last_key_value(), pending.last_key_value()) {
(Some((cp, _)), None) | (None, Some((cp, _))) => *cp,
(Some((cp1, _)), Some((cp2, _))) => std::cmp::max(*cp1, *cp2),
(None, None) => return 0,
};

let mut moved_rows = 0;
while let Some(entry) = received.first_entry() {
let cp = *entry.key();
if cp + checkpoint_lag > tip {
break;
}

let indexed = entry.remove();
moved_rows += indexed.len();
pending.insert(cp, indexed.into());
}

moved_rows
}

#[cfg(test)]
mod tests {
use sui_field_count::FieldCount;
Expand Down Expand Up @@ -271,82 +238,6 @@ mod tests {
}
}

#[test]
fn test_move_ready_checkpoints_empty() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 10);
assert_eq!(moved, 0);
assert!(received.is_empty());
assert!(pending.is_empty());
}

#[test]
fn test_move_ready_checkpoints_within_lag() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(
i,
IndexedCheckpoint::new(0, i, 0, 0, vec![Entry, Entry, Entry]),
);
}

// With lag of 2 and tip at 5, only checkpoints 1-3 should move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 2);

assert_eq!(moved, 9); // 3 checkpoints * 3 rows each
assert_eq!(received.len(), 2); // 4,5 remain
assert_eq!(pending.len(), 3); // 1,2,3 moved
assert!(pending.contains_key(&1));
assert!(pending.contains_key(&2));
assert!(pending.contains_key(&3));
}

#[test]
fn test_move_ready_checkpoints_tip_from_pending() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoint 10 to pending to establish tip
pending.insert(
10,
PendingCheckpoint::from(IndexedCheckpoint::new(0, 10, 0, 0, vec![Entry])),
);

// Add checkpoints 1-5 to received
for i in 1..=5 {
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 3 and tip at 10, checkpoints 1-7 can move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 3);

assert_eq!(moved, 5); // All 5 checkpoints moved, 1 row each
assert!(received.is_empty());
assert_eq!(pending.len(), 6); // Original + 5 new
}

#[test]
fn test_move_ready_checkpoints_no_eligible() {
let mut received = BTreeMap::new();
let mut pending = BTreeMap::new();

// Add checkpoints 8-10 to received
for i in 8..=10 {
received.insert(i, IndexedCheckpoint::new(0, i, 0, 0, vec![Entry]));
}

// With lag of 5 and tip at 10, no checkpoints can move
let moved = move_ready_checkpoints::<TestHandler>(&mut received, &mut pending, 5);

assert_eq!(moved, 0);
assert_eq!(received.len(), 3);
assert!(pending.is_empty());
}

#[tokio::test]
async fn test_collector_batches_data() {
let (processor_tx, processor_rx) = mpsc::channel(10);
Expand All @@ -356,7 +247,6 @@ mod tests {

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
Expand Down Expand Up @@ -399,7 +289,6 @@ mod tests {

let collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
Expand Down Expand Up @@ -440,7 +329,6 @@ mod tests {

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ pub struct ConcurrentConfig {

/// Configuration for the pruner, that deletes old data.
pub pruner: Option<PrunerConfig>,

/// How many checkpoints lagged behind latest seen checkpoint to hold back writes for.
/// This is useful if pruning is implemented as a concurrent pipeline, and it must be behind
/// the pipeline it tries to prune from by a certain number of checkpoints, to ensure
/// consistency reads remain valid for a certain amount of time.
pub checkpoint_lag: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -204,7 +198,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
let ConcurrentConfig {
committer: committer_config,
pruner: pruner_config,
checkpoint_lag,
} = config;

let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
Expand All @@ -230,7 +223,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(

let collector = collector::<H>(
committer_config.clone(),
checkpoint_lag,
collector_rx,
collector_tx,
metrics.clone(),
Expand Down
Loading

0 comments on commit 56fdf0b

Please sign in to comment.