diff --git a/src/controller.rs b/src/controller.rs index 2a05a91..6336ac7 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,14 +1,11 @@ -use std::ops::Sub; -use std::time::{Duration, SystemTime}; +use std::time::Duration; -use chrono::{DateTime, Utc}; use eyre::WrapErr; use futures_ticker::Ticker; use futures_util::StreamExt; -use libp2p::PeerId; use sqlx::SqlitePool; use tokio::select; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Semaphore}; use crate::chain::inclusion_claim_correct; use crate::config::{ChainInclusionMode, Config}; @@ -129,13 +126,13 @@ impl Controller { self.handle_event(event).await; } _ = self.sync_ticker.next() => { - self.do_sync().await; + self.request_sync().await; } } } } - async fn do_sync(&self) { + async fn request_sync(&self) { let from = Some( chrono::Utc::now() - Duration::from_secs(60 * 60 * self.config.sync_lookback_hours), ); @@ -176,9 +173,13 @@ impl Controller { tracing::info!(histogram.sync_request_processed = 1); } P2PEvent::SyncResponse { premints } => { - for premint in premints { - let _ = self.validate_and_insert(premint).await; - } + let sem = Semaphore::new(10); + futures_util::future::join_all(premints.into_iter().map(|p| async { + let permit = sem.acquire().await.unwrap(); + let _ = self.validate_and_insert(p).await; + drop(permit); + })) + .await; } } } @@ -269,7 +270,7 @@ impl Controller { } } ControllerCommands::Sync => { - self.do_sync().await; + self.request_sync().await; } } Ok(())