Skip to content

Commit 7b5c364

Browse files
committed
feat(nft_ingester): improve cancellation of signature fetching
1 parent b81b651 commit 7b5c364

File tree

5 files changed

+72
-55
lines changed

5 files changed

+72
-55
lines changed

nft_ingester/src/bin/backfill/main.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,7 @@ async fn main() {
251251

252252
let current_rate = if elapsed > 0.0 { (count as f64) / elapsed } else { 0.0 };
253253

254-
// Update rate
255-
{
256-
let mut rate_guard = rate.lock().unwrap();
257-
*rate_guard = current_rate;
258-
}
254+
*rate.lock().unwrap() = current_rate;
259255

260256
// Update for next iteration
261257
last_time = current_time;

nft_ingester/src/bin/ingester/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,11 @@ pub async fn main() -> Result<(), IngesterError> {
597597
let program_id = mpl_bubblegum::programs::MPL_BUBBLEGUM_ID;
598598
while !cancellation_token.is_cancelled() {
599599
match signature_fetcher
600-
.fetch_signatures(program_id, args.rpc_retry_interval_millis)
600+
.fetch_signatures(
601+
program_id,
602+
args.rpc_retry_interval_millis,
603+
cancellation_token.child_token(),
604+
)
601605
.await
602606
{
603607
Ok(_) => {

nft_ingester/src/bin/migrator/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,11 @@ impl JsonMigrator {
215215

216216
if tasks_buffer.is_empty() {
217217
drop(tasks_buffer);
218-
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
218+
cancellation_token
219+
.run_until_cancelled(tokio::time::sleep(
220+
tokio::time::Duration::from_secs(10),
221+
))
222+
.await;
219223
continue;
220224
}
221225

nft_ingester/src/cleaners/fork_cleaner.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@ pub async fn run_fork_cleaner(
2727
fork_cleaner.clean_forks(cancellation_token.child_token()).await;
2828
metrics.set_scans_latency(start.elapsed().as_secs_f64());
2929
metrics.inc_total_scans();
30-
tokio::select! {
31-
_ = tokio_sleep(Duration::from_secs(sequence_consistent_checker_wait_period_sec)) => {},
32-
_ = cancellation_token.cancelled() => {
33-
info!("Received stop signal, stopping cleaning forks!");
34-
break;
35-
}
30+
if cancellation_token
31+
.run_until_cancelled(tokio_sleep(Duration::from_secs(
32+
sequence_consistent_checker_wait_period_sec,
33+
)))
34+
.await
35+
.is_none()
36+
{
37+
info!("Received stop signal, stopping cleaning forks!");
38+
break;
3639
}
3740
}
3841

usecase/src/signature_fetcher.rs

+52-42
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use interface::{
99
};
1010
use metrics_utils::{MetricStatus, RpcBackfillerMetricsConfig};
1111
use solana_sdk::{pubkey::Pubkey, signature::Signature};
12+
use tokio_util::sync::CancellationToken;
1213
use tracing::info;
1314

1415
pub struct SignatureFetcher<T, SP, TI>
@@ -45,30 +46,43 @@ where
4546
&self,
4647
program_id: Pubkey,
4748
rpc_retry_interval_millis: u64,
49+
cancellation_token: CancellationToken,
4850
) -> Result<(), StorageError> {
49-
let signature = self.data_layer.first_persisted_signature_for(program_id).await?;
51+
let signature = cancellation_token
52+
.run_until_cancelled(self.data_layer.first_persisted_signature_for(program_id))
53+
.await
54+
.unwrap_or(Ok(None))?;
5055
if signature.is_none() {
5156
return Ok(());
5257
}
5358
let signature = signature.unwrap();
5459
info!("Start fetching signatures...");
55-
let mut all_signatures = match self
56-
.rpc
57-
.get_signatures_by_address(signature, program_id)
60+
let mut all_signatures = cancellation_token
61+
.run_until_cancelled(async move {
62+
match self
63+
.rpc
64+
.get_signatures_by_address(signature, program_id)
65+
.await
66+
.map_err(|e| StorageError::Common(e.to_string()))
67+
{
68+
Ok(all_signatures) => {
69+
self.metrics.inc_fetch_signatures(
70+
"get_signatures_by_address",
71+
MetricStatus::SUCCESS,
72+
);
73+
Ok(all_signatures)
74+
},
75+
Err(e) => {
76+
self.metrics.inc_fetch_signatures(
77+
"get_signatures_by_address",
78+
MetricStatus::FAILURE,
79+
);
80+
Err(e)
81+
},
82+
}
83+
})
5884
.await
59-
.map_err(|e| StorageError::Common(e.to_string()))
60-
{
61-
Ok(all_signatures) => {
62-
self.metrics
63-
.inc_fetch_signatures("get_signatures_by_address", MetricStatus::SUCCESS);
64-
all_signatures
65-
},
66-
Err(e) => {
67-
self.metrics
68-
.inc_fetch_signatures("get_signatures_by_address", MetricStatus::FAILURE);
69-
return Err(e);
70-
},
71-
};
85+
.unwrap_or_else(|| Ok(vec![]))?;
7286

7387
if all_signatures.is_empty() {
7488
return Ok(());
@@ -85,14 +99,20 @@ where
8599
all_signatures.sort_by(|a, b| a.slot.cmp(&b.slot));
86100
// we need to split the list into batches of BATCH_SIZE
87101

88-
// todo: use Rust's chunks instead
89-
let mut batch_start = 0;
90-
while batch_start < all_signatures.len() {
91-
let batch_end = std::cmp::min(batch_start + BATCH_SIZE, all_signatures.len());
92-
let batch = &all_signatures[batch_start..batch_end];
93-
let missing_signatures =
94-
self.data_layer.missing_signatures(program_id, batch.to_vec()).await?;
95-
batch_start = batch_end;
102+
// this variable is required to account for the last batch potentially being smaller than
103+
// the rest
104+
let mut last_batch_index = 0;
105+
106+
for signatures in all_signatures.chunks(BATCH_SIZE) {
107+
if cancellation_token.is_cancelled() {
108+
break;
109+
}
110+
let missing_signatures = cancellation_token
111+
.run_until_cancelled(
112+
self.data_layer.missing_signatures(program_id, signatures.to_vec()),
113+
)
114+
.await
115+
.unwrap_or_else(|| Ok(vec![]))?;
96116
if missing_signatures.is_empty() {
97117
continue;
98118
}
@@ -101,12 +121,9 @@ where
101121
missing_signatures.len(),
102122
program_id
103123
);
104-
105124
let signatures: Vec<Signature> =
106125
missing_signatures.iter().map(|s| s.signature).collect();
107-
108126
let tx_cnt = signatures.len();
109-
110127
let counter = 0;
111128

112129
Self::process_transactions(
@@ -125,24 +142,21 @@ where
125142

126143
let fake_key = SignatureWithSlot {
127144
signature: Default::default(),
128-
slot: all_signatures[batch_end - 1].slot,
145+
slot: all_signatures[last_batch_index + missing_signatures.len() - 1].slot,
129146
};
130147
info!(
131148
"Ingested {} transactions. Dropping signatures for program {} before slot {}.",
132149
tx_cnt, program_id, fake_key.slot
133150
);
134151
self.data_layer.drop_signatures_before(program_id, fake_key).await?;
152+
last_batch_index += missing_signatures.len();
135153
}
136-
let fake_key = SignatureWithSlot {
137-
signature: Default::default(),
138-
slot: all_signatures[all_signatures.len() - 1].slot,
139-
};
140154

141155
info!(
142-
"Finished fetching signatures for program {}. Dropping signatures before slot {}.",
143-
program_id, fake_key.slot
156+
"Finished fetching signatures for program {}. Dropped signatures before slot {}.",
157+
program_id,
158+
all_signatures[all_signatures.len() - 1].slot
144159
);
145-
self.data_layer.drop_signatures_before(program_id, fake_key).await?;
146160
Ok(())
147161
}
148162

@@ -229,6 +243,7 @@ mod tests {
229243
};
230244
use metrics_utils::RpcBackfillerMetricsConfig;
231245
use mockall::predicate::{self, eq};
246+
use tokio_util::sync::CancellationToken;
232247

233248
#[tokio::test]
234249
async fn test_fetch_signatures_with_over_batch_limit_elements_should_complete_without_infinite_loop(
@@ -259,17 +274,12 @@ mod tests {
259274
.with(eq(program_id), predicate::always())
260275
.times(2)
261276
.returning(move |_, _| Ok(vec![]));
262-
data_layer
263-
.expect_drop_signatures_before()
264-
.with(eq(program_id), predicate::always())
265-
.times(1)
266-
.returning(move |_, _| Ok(()));
267277
let fetcher = super::SignatureFetcher::new(
268278
Arc::new(data_layer),
269279
Arc::new(rpc),
270280
Arc::new(ingester),
271281
metrics,
272282
);
273-
fetcher.fetch_signatures(program_id, 0).await.unwrap();
283+
fetcher.fetch_signatures(program_id, 0, CancellationToken::new()).await.unwrap();
274284
}
275285
}

0 commit comments

Comments
 (0)