Skip to content

Commit

Permalink
easy(indexer-alt): clarify names of tuning parameters
Browse files Browse the repository at this point in the history
## Description
Trying to make the purposes of various tuning parameters clearer by
renaming them.

## Test plan
:eyes:
  • Loading branch information
amnn committed Oct 29, 2024
1 parent 3e5c7bb commit 233da93
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 28 deletions.
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ impl Processor for EvEmitMod {

#[async_trait::async_trait]
impl Handler for EvEmitMod {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(ev_emit_mod::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl Processor for EvStructInst {

#[async_trait::async_trait]
impl Handler for EvStructInst {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(ev_struct_inst::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/kv_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl Processor for KvObjects {

#[async_trait::async_trait]
impl Handler for KvObjects {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_objects::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ impl Processor for KvTransactions {

#[async_trait::async_trait]
impl Handler for KvTransactions {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_transactions::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ impl Processor for TxAffectedObjects {

#[async_trait::async_trait]
impl Handler for TxAffectedObjects {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(tx_affected_objects::table)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ impl Processor for TxBalanceChanges {

#[async_trait::async_trait]
impl Handler for TxBalanceChanges {
const BATCH_SIZE: usize = 100;
const CHUNK_SIZE: usize = 1000;
const MAX_PENDING_SIZE: usize = 10000;
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(tx_balance_changes::table)
Expand Down
11 changes: 7 additions & 4 deletions crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ impl<H: Handler> Pending<H> {
/// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on
/// chunk size.
fn batch_into(&mut self, batch: &mut Batched<H>) {
if batch.values.len() + self.values.len() > H::CHUNK_SIZE {
let mut for_batch = self.values.split_off(H::CHUNK_SIZE - batch.values.len());
if batch.values.len() + self.values.len() > H::MAX_CHUNK_ROWS {
let mut for_batch = self
.values
.split_off(H::MAX_CHUNK_ROWS - batch.values.len());

std::mem::swap(&mut self.values, &mut for_batch);
batch.watermark.push(self.watermark.take(for_batch.len()));
batch.values.extend(for_batch);
Expand Down Expand Up @@ -159,7 +162,7 @@ pub(super) fn collector<H: Handler + 'static>(
}
}

Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => {
Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_ROWS => {
metrics
.total_collector_rows_received
.with_label_values(&[H::NAME])
Expand All @@ -168,7 +171,7 @@ pub(super) fn collector<H: Handler + 'static>(
pending_rows += indexed.values.len();
pending.insert(indexed.checkpoint(), indexed.into());

if pending_rows >= H::BATCH_SIZE {
if pending_rows >= H::MIN_EAGER_ROWS {
poll.reset_immediately()
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ const MAX_WATERMARK_UPDATES: usize = 10_000;
#[async_trait::async_trait]
pub trait Handler: Processor {
/// If at least this many rows are pending, the committer will commit them eagerly.
const BATCH_SIZE: usize = 50;
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation.
const CHUNK_SIZE: usize = 200;
const MAX_CHUNK_ROWS: usize = 200;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_SIZE: usize = 1000;
const MAX_PENDING_ROWS: usize = 1000;

/// Take a chunk of values and commit them to the database, returning the number of rows
/// affected.
Expand Down Expand Up @@ -88,7 +88,7 @@ impl<H: Handler> Batched<H> {
/// The batch is full if it has more than enough values to write to the database, or more than
/// enough watermarks to update.
fn is_full(&self) -> bool {
self.values.len() >= H::CHUNK_SIZE || self.watermark.len() >= MAX_WATERMARK_UPDATES
self.values.len() >= H::MAX_CHUNK_ROWS || self.watermark.len() >= MAX_WATERMARK_UPDATES
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub(super) fn committer<H: Handler + 'static>(
// rows to write, and they are already in the batch, or we can process the next
// checkpoint to extract them.

if pending_rows < H::MIN_BATCH_ROWS {
if pending_rows < H::MIN_EAGER_ROWS {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/pipeline/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod committer;
#[async_trait::async_trait]
pub trait Handler: Processor {
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_BATCH_ROWS: usize = 50;
const MIN_EAGER_ROWS: usize = 50;

/// Maximum number of checkpoints to try and write in a single batch. The larger this number
/// is, the more chances the pipeline has to merge redundant writes, but the longer each write
Expand Down

0 comments on commit 233da93

Please sign in to comment.