Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nft_ingester): improve cancellation of signature fetching #450

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,11 @@ pub async fn main() -> Result<(), IngesterError> {
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID;
while !cancellation_token.is_cancelled() {
match signature_fetcher
.fetch_signatures(program_id, args.rpc_retry_interval_millis)
.fetch_signatures(
program_id,
args.rpc_retry_interval_millis,
cancellation_token.child_token(),
)
.await
{
Ok(_) => {
Expand Down
6 changes: 5 additions & 1 deletion nft_ingester/src/bin/migrator/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ impl JsonMigrator {

if tasks_buffer.is_empty() {
drop(tasks_buffer);
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
cancellation_token
.run_until_cancelled(tokio::time::sleep(
tokio::time::Duration::from_secs(10),
))
.await;
continue;
}

Expand Down
15 changes: 9 additions & 6 deletions nft_ingester/src/cleaners/fork_cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ pub async fn run_fork_cleaner(
fork_cleaner.clean_forks(cancellation_token.child_token()).await;
metrics.set_scans_latency(start.elapsed().as_secs_f64());
metrics.inc_total_scans();
tokio::select! {
_ = tokio_sleep(Duration::from_secs(sequence_consistent_checker_wait_period_sec)) => {},
_ = cancellation_token.cancelled() => {
info!("Received stop signal, stopping cleaning forks!");
break;
}
if cancellation_token
.run_until_cancelled(tokio_sleep(Duration::from_secs(
sequence_consistent_checker_wait_period_sec,
)))
.await
.is_none()
{
info!("Received stop signal, stopping cleaning forks!");
break;
}
}

Expand Down
101 changes: 62 additions & 39 deletions usecase/src/signature_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use interface::{
};
use metrics_utils::{MetricStatus, RpcBackfillerMetricsConfig};
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use tokio_util::sync::CancellationToken;
use tracing::info;

pub struct SignatureFetcher<T, SP, TI>
Expand Down Expand Up @@ -45,30 +46,43 @@ where
&self,
program_id: Pubkey,
rpc_retry_interval_millis: u64,
cancellation_token: CancellationToken,
) -> Result<(), StorageError> {
let signature = self.data_layer.first_persisted_signature_for(program_id).await?;
let signature = cancellation_token
.run_until_cancelled(self.data_layer.first_persisted_signature_for(program_id))
.await
.unwrap_or(Ok(None))?;
if signature.is_none() {
return Ok(());
}
let signature = signature.unwrap();
info!("Start fetching signatures...");
let mut all_signatures = match self
.rpc
.get_signatures_by_address(signature, program_id)
let mut all_signatures = cancellation_token
.run_until_cancelled(async move {
match self
.rpc
.get_signatures_by_address(signature, program_id)
.await
.map_err(|e| StorageError::Common(e.to_string()))
{
Ok(all_signatures) => {
self.metrics.inc_fetch_signatures(
"get_signatures_by_address",
MetricStatus::SUCCESS,
);
Ok(all_signatures)
},
Err(e) => {
self.metrics.inc_fetch_signatures(
"get_signatures_by_address",
MetricStatus::FAILURE,
);
Err(e)
},
}
})
.await
.map_err(|e| StorageError::Common(e.to_string()))
{
Ok(all_signatures) => {
self.metrics
.inc_fetch_signatures("get_signatures_by_address", MetricStatus::SUCCESS);
all_signatures
},
Err(e) => {
self.metrics
.inc_fetch_signatures("get_signatures_by_address", MetricStatus::FAILURE);
return Err(e);
},
};
.unwrap_or_else(|| Ok(vec![]))?;

if all_signatures.is_empty() {
return Ok(());
Expand All @@ -85,14 +99,21 @@ where
all_signatures.sort_by(|a, b| a.slot.cmp(&b.slot));
// we need to split the list into batches of BATCH_SIZE

// todo: use Rust's chunks instead
let mut batch_start = 0;
while batch_start < all_signatures.len() {
let batch_end = std::cmp::min(batch_start + BATCH_SIZE, all_signatures.len());
let batch = &all_signatures[batch_start..batch_end];
let missing_signatures =
self.data_layer.missing_signatures(program_id, batch.to_vec()).await?;
batch_start = batch_end;
// this variable is required to account for the last batch potentially being smaller than
// the rest
let mut last_batch_index = 0;

for signatures in all_signatures.chunks(BATCH_SIZE) {
if cancellation_token.is_cancelled() {
break;
}
let signatures_len = signatures.len();
let missing_signatures = cancellation_token
.run_until_cancelled(
self.data_layer.missing_signatures(program_id, signatures.to_vec()),
)
.await
.unwrap_or_else(|| Ok(vec![]))?;
if missing_signatures.is_empty() {
continue;
}
Expand All @@ -101,12 +122,9 @@ where
missing_signatures.len(),
program_id
);

let signatures: Vec<Signature> =
missing_signatures.iter().map(|s| s.signature).collect();

let tx_cnt = signatures.len();

let counter = 0;

Self::process_transactions(
Expand All @@ -125,24 +143,28 @@ where

let fake_key = SignatureWithSlot {
signature: Default::default(),
slot: all_signatures[batch_end - 1].slot,
slot: all_signatures[last_batch_index + signatures_len - 1].slot,
};
info!(
"Ingested {} transactions. Dropping signatures for program {} before slot {}.",
tx_cnt, program_id, fake_key.slot
);
self.data_layer.drop_signatures_before(program_id, fake_key).await?;
last_batch_index += signatures_len;
}
let fake_key = SignatureWithSlot {
signature: Default::default(),
slot: all_signatures[all_signatures.len() - 1].slot,
};

info!(
"Finished fetching signatures for program {}. Dropping signatures before slot {}.",
program_id, fake_key.slot
);
self.data_layer.drop_signatures_before(program_id, fake_key).await?;
if !cancellation_token.is_cancelled() {
let fake_key = SignatureWithSlot {
signature: Default::default(),
slot: all_signatures[all_signatures.len() - 1].slot,
};

info!(
"Finished fetching signatures for program {}. Dropping signatures before slot {}.",
program_id, fake_key.slot
);
self.data_layer.drop_signatures_before(program_id, fake_key).await?;
}
Ok(())
}

Expand Down Expand Up @@ -229,6 +251,7 @@ mod tests {
};
use metrics_utils::RpcBackfillerMetricsConfig;
use mockall::predicate::{self, eq};
use tokio_util::sync::CancellationToken;

#[tokio::test]
async fn test_fetch_signatures_with_over_batch_limit_elements_should_complete_without_infinite_loop(
Expand Down Expand Up @@ -270,6 +293,6 @@ mod tests {
Arc::new(ingester),
metrics,
);
fetcher.fetch_signatures(program_id, 0).await.unwrap();
fetcher.fetch_signatures(program_id, 0, CancellationToken::new()).await.unwrap();
}
}
Loading