Skip to content

Commit

Permalink
indexer-alt: sequential pipeline (#20053)
Browse files Browse the repository at this point in the history
## Description

Introduce a new kind of pipeline for indexing that needs commit data in
checkpoint order. This will be used for indexing data that would
previously have gone into `objects` or `objects_snapshot`, where rows
are modified in place, and so can't be committed out-of-order.

Sequential pipelines are split into two parts:

- A `processor` which is shared with the existing concurrent pipeline,
and is responsible for turning checkpoint data into values to be sent to
the database.
- A `committer` which is responsible for batching up prefixes of updates
and sending them to the DB when they are complete (no gaps between the
last write and what has been buffered).

The key design constraints of the sequential pipeline are as follows:

- Although the committer must write out rows in order, it can buffer the
results of checkpoint processed out-of-order.
- It uses the ingestion service's regulator for back-pressure: The
ingestion service is only allowed to run ahead of all sequential
pipelines by its buffer size, which bounds the memory that each pipeline
must use to buffer pending writes.
- Sequential pipelines have different tuning parameters compared to
concurrent pipelines:
  - `MIN_BATCH_ROWS`: The threshold for eagerly writing to the DB.
- `MAX_BATCH_CHECKPOINTS`: The maximum number of checkpoints that will
be batched together in a single transaction.
- They guarantee atomicity using DB transactions: All the writes for a
single checkpoint, and the corresponding watermark update are put into
the same DB transaction.
- They support simplifying/merging writes to the DB: If the same object
is modified multiple times across multiple checkpoints, only the latest
write will make it to the DB.

## Test plan

This change is primarily tested by the `sum_obj_types` pipeline
introduced in the next change.

## Stack

- #20050 
- #20051
- #20052 

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn committed Oct 29, 2024
1 parent 3f45c76 commit 5ee814a
Show file tree
Hide file tree
Showing 4 changed files with 505 additions and 15 deletions.
80 changes: 65 additions & 15 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use db::{Db, DbConfig};
use ingestion::{IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use models::watermarks::CommitterWatermark;
use pipeline::{concurrent, PipelineConfig};
use pipeline::{concurrent, sequential, PipelineConfig, Processor};
use task::graceful_shutdown;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -136,23 +136,15 @@ impl Indexer {

/// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
/// they will be idle until the ingestion service starts, and serves it checkpoint data.
///
/// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
/// keep the watermark table up-to-date with the highest point they can guarantee all data
/// exists for, for their pipeline.
pub async fn concurrent_pipeline<H: concurrent::Handler + 'static>(&mut self) -> Result<()> {
if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(H::NAME) {
let Some(watermark) = self.add_pipeline::<H>().await? else {
info!("Skipping pipeline {}", H::NAME);
return Ok(());
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, H::NAME)
.await
.with_context(|| format!("Failed to get watermark for {}", H::NAME))?;

// TODO(amnn): Test this (depends on supporting migrations and tempdb).
self.first_checkpoint_from_watermark = watermark
.as_ref()
.map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1)
.min(self.first_checkpoint_from_watermark);
};

let (processor, collector, committer, watermark) = concurrent::pipeline::<H>(
watermark,
Expand All @@ -171,6 +163,37 @@ impl Indexer {
Ok(())
}

/// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
/// they will be idle until the ingestion service starts, and serves it checkpoint data.
///
/// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may
/// be required to handle pipelines that modify data in-place (where each update is not an
/// insert, but could be a modification of an existing row, where ordering between updates is
/// important).
pub async fn sequential_pipeline<H: sequential::Handler + 'static>(&mut self) -> Result<()> {
let Some(watermark) = self.add_pipeline::<H>().await? else {
info!("Skipping pipeline {}", H::NAME);
return Ok(());
};

let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();

let (processor, committer) = sequential::pipeline::<H>(
watermark,
self.pipeline_config.clone(),
self.db.clone(),
checkpoint_rx,
watermark_tx,
self.metrics.clone(),
self.cancel.clone(),
);

self.handles.push(processor);
self.handles.push(committer);

Ok(())
}

/// Start ingesting checkpoints. Ingestion either starts from the configured
/// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines.
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
Expand Down Expand Up @@ -216,4 +239,31 @@ impl Indexer {
metrics_handle.await.unwrap();
}))
}

/// Update the indexer's first checkpoint based on the watermark for the pipeline by adding for
/// handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is disabled,
/// `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and
/// `Ok(Some(Some(watermark)))` if the pipeline is enabled and the watermark is found.
async fn add_pipeline<P: Processor + 'static>(
&mut self,
) -> Result<Option<Option<CommitterWatermark<'static>>>> {
if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(P::NAME) {
info!("Skipping pipeline {}", P::NAME);
return Ok(None);
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, P::NAME)
.await
.with_context(|| format!("Failed to get watermark for {}", P::NAME))?;

// TODO(amnn): Test this (depends on supporting migrations and tempdb).
self.first_checkpoint_from_watermark = watermark
.as_ref()
.map_or(0, |w| w.checkpoint_hi_inclusive as u64 + 1)
.min(self.first_checkpoint_from_watermark);

Ok(Some(watermark))
}
}
6 changes: 6 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use processor::Processor;

pub(crate) mod concurrent;
mod processor;
pub(crate) mod sequential;

/// Tracing message for the watermark update will be logged at info level at least this many
/// checkpoints.
Expand Down Expand Up @@ -100,6 +101,11 @@ impl<P: Processor> Indexed<P> {
}
}

/// Number of rows from this checkpoint
fn len(&self) -> usize {
self.values.len()
}

/// The checkpoint sequence number that this data is from
fn checkpoint(&self) -> u64 {
self.watermark.checkpoint_hi_inclusive as u64
Expand Down
Loading

0 comments on commit 5ee814a

Please sign in to comment.