Skip to content

Commit 23d1942

Browse files
committed
using a direct consumer for force reingest
1 parent 1d1e72d commit 23d1942

File tree

1 file changed

+21
-17
lines changed
  • nft_ingester/src/bin/ingester

1 file changed

+21
-17
lines changed

nft_ingester/src/bin/ingester/main.rs

+21-17
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ pub async fn main() -> Result<(), IngesterError> {
535535
let signature_fetcher = usecase::signature_fetcher::SignatureFetcher::new(
536536
rocks_clone,
537537
transactions_getter,
538-
tx_ingester,
538+
tx_ingester.clone(),
539539
metrics_state.rpc_backfiller_metrics.clone(),
540540
);
541541
let cloned_keep_running = keep_running.clone();
@@ -572,6 +572,7 @@ pub async fn main() -> Result<(), IngesterError> {
572572
if config.run_sequence_consistent_checker {
573573
let force_reingestable_slot_processor =
574574
Arc::new(ForceReingestableSlotGetter::new(rocks_storage.clone()));
575+
575576
let slots_collector = SlotsCollector::new(
576577
force_reingestable_slot_processor.clone(),
577578
big_table_client.big_table_inner_client.clone(),
@@ -604,28 +605,31 @@ pub async fn main() -> Result<(), IngesterError> {
604605
}
605606
}));
606607

607-
// run an additional slot persister
608+
// run an additional direct slot persister
608609
let rx = shutdown_rx.resubscribe();
609-
let consumer = force_reingestable_slot_processor.clone();
610610
let producer = big_table_client.clone();
611611
let metrics = Arc::new(BackfillerMetricsConfig::new());
612612
metrics.register_with_prefix(&mut metrics_state.registry, "force_slot_persister_");
613-
let backfiller_clone = backfiller.clone();
613+
614+
let consumer = Arc::new(DirectBlockParser::new(
615+
tx_ingester.clone(),
616+
rocks_storage.clone(),
617+
metrics_state.backfiller_metrics.clone(),
618+
));
619+
620+
let transactions_parser = Arc::new(TransactionsParser::new(
621+
rocks_storage.clone(),
622+
force_reingestable_slot_processor.clone(),
623+
consumer,
624+
producer,
625+
metrics.clone(),
626+
backfiller_config.workers_count,
627+
backfiller_config.chunk_size,
628+
));
629+
614630
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
615631
info!("Running slot force persister...");
616-
if let Err(e) = backfiller_clone
617-
.run_perpetual_slot_processing(
618-
metrics,
619-
force_reingestable_slot_processor.clone(),
620-
consumer,
621-
producer,
622-
Duration::from_secs(backfiller_config.wait_period_sec),
623-
rx,
624-
)
625-
.await
626-
{
627-
error!("Error while running perpetual force slot persister: {}", e);
628-
}
632+
transactions_parser.parse_transactions(rx).await;
629633
info!("Force slot persister finished working");
630634
}));
631635
}

0 commit comments

Comments
 (0)