Skip to content

Commit

Permalink
Sync recent announced files with priority (#300)
Browse files Browse the repository at this point in the history
* Upgrade rust toolchain to avoid macro-proc issue of latest rust analyzer

* Update random sync metrics

* Adjust default config for sync layer

* Cache the recent announced file for random sync with priority

* Fix clippy

* fix auto sync failure reason issue

* Add cached ready txs info in rpc

* fix lint
  • Loading branch information
boqiu authored Dec 18, 2024
1 parent 910b5af commit 8790fe1
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 85 deletions.
4 changes: 2 additions & 2 deletions .github/actions/setup-rust/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Setup Rust (cache & toolchain)
runs:
using: composite
steps:
- name: Install toolchain 1.75.0
- name: Install toolchain 1.78.0
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.75.0
toolchain: 1.78.0
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v2
8 changes: 0 additions & 8 deletions node/rpc/src/admin/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub struct RpcServerImpl {

#[async_trait]
impl RpcServer for RpcServerImpl {
#[tracing::instrument(skip(self), err)]
async fn find_file(&self, tx_seq: u64) -> RpcResult<()> {
info!("admin_findFile({tx_seq})");

Expand All @@ -39,7 +38,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn shutdown(&self) -> RpcResult<()> {
info!("admin_shutdown()");

Expand All @@ -51,7 +49,6 @@ impl RpcServer for RpcServerImpl {
.map_err(|e| error::internal_error(format!("Failed to send shutdown command: {:?}", e)))
}

#[tracing::instrument(skip(self), err)]
async fn start_sync_file(&self, tx_seq: u64) -> RpcResult<()> {
info!("admin_startSyncFile({tx_seq})");

Expand All @@ -72,7 +69,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn start_sync_chunks(
&self,
tx_seq: u64,
Expand Down Expand Up @@ -102,7 +98,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn terminate_sync(&self, tx_seq: u64) -> RpcResult<bool> {
info!("admin_terminateSync({tx_seq})");

Expand Down Expand Up @@ -131,7 +126,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn get_sync_status(&self, tx_seq: u64) -> RpcResult<String> {
info!("admin_getSyncStatus({tx_seq})");

Expand All @@ -148,7 +142,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn get_sync_info(&self, tx_seq: Option<u64>) -> RpcResult<HashMap<u64, FileSyncInfo>> {
info!(?tx_seq, "admin_getSyncInfo()");

Expand All @@ -163,7 +156,6 @@ impl RpcServer for RpcServerImpl {
}
}

#[tracing::instrument(skip(self), err)]
async fn get_network_info(&self) -> RpcResult<NetworkInfo> {
info!("admin_getNetworkInfo()");

Expand Down
1 change: 0 additions & 1 deletion node/rpc/src/zgs/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub struct RpcServerImpl {

#[async_trait]
impl RpcServer for RpcServerImpl {
#[tracing::instrument(skip(self), err)]
async fn get_status(&self) -> RpcResult<Status> {
info!("zgs_getStatus()");
let sync_progress = self
Expand Down
10 changes: 8 additions & 2 deletions node/sync/src/auto_sync/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{controllers::SyncState, SyncRequest, SyncResponse, SyncSender};
use crate::{
controllers::{FailureReason, SyncState},
SyncRequest, SyncResponse, SyncSender,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration};
Expand Down Expand Up @@ -126,7 +129,10 @@ impl Batcher {
"Failed to sync file and terminate the failed file sync"
);
self.terminate_file_sync(tx_seq, false).await;
Ok(Some(SyncResult::Failed))
match reason {
FailureReason::TimeoutFindFile => Ok(Some(SyncResult::Timeout)),
_ => Ok(Some(SyncResult::Failed)),
}
}

// finding peers timeout
Expand Down
26 changes: 13 additions & 13 deletions node/sync/src/auto_sync/batcher_random.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{batcher::Batcher, sync_store::SyncStore};
use super::{batcher::Batcher, metrics::RandomBatcherMetrics, sync_store::SyncStore};
use crate::{
auto_sync::{batcher::SyncResult, metrics, sync_store::Queue},
auto_sync::{batcher::SyncResult, sync_store::Queue},
Config, SyncSender,
};
use anyhow::Result;
Expand All @@ -19,6 +19,7 @@ pub struct RandomBatcherState {
pub tasks: Vec<u64>,
pub pending_txs: usize,
pub ready_txs: usize,
pub cached_ready_txs: usize,
}

#[derive(Clone)]
Expand All @@ -27,6 +28,7 @@ pub struct RandomBatcher {
config: Config,
batcher: Batcher,
sync_store: Arc<SyncStore>,
metrics: Arc<RandomBatcherMetrics>,
}

impl RandomBatcher {
Expand All @@ -36,6 +38,7 @@ impl RandomBatcher {
store: Store,
sync_send: SyncSender,
sync_store: Arc<SyncStore>,
metrics: Arc<RandomBatcherMetrics>,
) -> Self {
Self {
name,
Expand All @@ -47,17 +50,19 @@ impl RandomBatcher {
sync_send,
),
sync_store,
metrics,
}
}

pub async fn get_state(&self) -> Result<RandomBatcherState> {
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
let (pending_txs, ready_txs, cached_ready_txs) = self.sync_store.stat().await?;

Ok(RandomBatcherState {
name: self.name.clone(),
tasks: self.batcher.tasks().await,
pending_txs,
ready_txs,
cached_ready_txs,
})
}

Expand All @@ -71,11 +76,10 @@ impl RandomBatcher {
}

loop {
// if let Ok(state) = self.get_state().await {
// metrics::RANDOM_STATE_TXS_SYNCING.update(state.tasks.len() as u64);
// metrics::RANDOM_STATE_TXS_READY.update(state.ready_txs as u64);
// metrics::RANDOM_STATE_TXS_PENDING.update(state.pending_txs as u64);
// }
if let Ok(state) = self.get_state().await {
self.metrics
.update_state(state.ready_txs, state.pending_txs);
}

match self.sync_once().await {
Ok(true) => {}
Expand Down Expand Up @@ -106,11 +110,7 @@ impl RandomBatcher {
};

debug!(%tx_seq, ?sync_result, "Completed to sync file, state = {:?}", self.get_state().await);
match sync_result {
SyncResult::Completed => metrics::RANDOM_SYNC_RESULT_COMPLETED.mark(1),
SyncResult::Failed => metrics::RANDOM_SYNC_RESULT_FAILED.inc(1),
SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1),
}
self.metrics.update_result(sync_result);

if matches!(sync_result, SyncResult::Completed) {
self.sync_store.remove(tx_seq).await?;
Expand Down
2 changes: 1 addition & 1 deletion node/sync/src/auto_sync/historical_tx_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl HistoricalTxWriter {
}

pub async fn get_state(&self) -> Result<HistoricalTxWriterState> {
let (pending_txs, ready_txs) = self.sync_store.stat().await?;
let (pending_txs, ready_txs, _) = self.sync_store.stat().await?;

Ok(HistoricalTxWriterState {
next_tx_seq: self.next_tx_seq.load(Ordering::Relaxed),
Expand Down
7 changes: 6 additions & 1 deletion node/sync/src/auto_sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
batcher_random::RandomBatcher,
batcher_serial::SerialBatcher,
historical_tx_writer::HistoricalTxWriter,
metrics,
sync_store::{Queue, SyncStore},
};

Expand All @@ -45,11 +46,12 @@ impl AutoSyncManager {
// use v2 db to avoid reading v1 files that announced from the whole network instead of neighbors
Arc::new(SyncStore::new_with_name(
store.clone(),
config.ready_txs_cache_cap,
"pendingv2",
"readyv2",
))
} else {
Arc::new(SyncStore::new(store.clone()))
Arc::new(SyncStore::new(store.clone(), 0))
};
let catched_up = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -83,6 +85,7 @@ impl AutoSyncManager {
store.clone(),
sync_send.clone(),
sync_store,
metrics::RANDOM_ANNOUNCED.clone(),
);
executor.spawn(random.clone().start(catched_up.clone()), "auto_sync_random");

Expand All @@ -96,6 +99,7 @@ impl AutoSyncManager {
if config.neighbors_only {
let historical_sync_store = Arc::new(SyncStore::new_with_name(
store.clone(),
0,
"pendingv2_historical",
"readyv2_historical",
));
Expand All @@ -111,6 +115,7 @@ impl AutoSyncManager {
store,
sync_send,
historical_sync_store,
metrics::RANDOM_HISTORICAL.clone(),
);
executor.spawn(
random_historical.start(catched_up.clone()),
Expand Down
51 changes: 43 additions & 8 deletions node/sync/src/auto_sync/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
use std::sync::Arc;

use metrics::{register_meter, Counter, CounterUsize, Gauge, GaugeUsize, Histogram, Meter, Sample};
use metrics::{
register_meter, register_meter_with_group, Counter, CounterUsize, Gauge, GaugeUsize, Histogram,
Meter, Sample,
};

use super::batcher::SyncResult;

#[derive(Clone)]
pub struct RandomBatcherMetrics {
pub ready_txs: Arc<dyn Gauge<usize>>,
pub pending_txs: Arc<dyn Gauge<usize>>,

pub completed_qps: Arc<dyn Meter>,
pub failed_qps: Arc<dyn Meter>,
pub timeout_qps: Arc<dyn Meter>,
}

impl RandomBatcherMetrics {
pub fn new(group_name: &str) -> Self {
Self {
ready_txs: GaugeUsize::register_with_group(group_name, "ready_txs"),
pending_txs: GaugeUsize::register_with_group(group_name, "pending_txs"),
completed_qps: register_meter_with_group(group_name, "completed_qps"),
failed_qps: register_meter_with_group(group_name, "failed_qps"),
timeout_qps: register_meter_with_group(group_name, "timeout_qps"),
}
}

pub fn update_state(&self, ready_txs: usize, pending_txs: usize) {
self.ready_txs.update(ready_txs);
self.pending_txs.update(pending_txs);
}

pub fn update_result(&self, result: SyncResult) {
match result {
SyncResult::Completed => self.completed_qps.mark(1),
SyncResult::Failed => self.failed_qps.mark(1),
SyncResult::Timeout => self.timeout_qps.mark(1),
}
}
}

lazy_static::lazy_static! {
// sequential auto sync
Expand All @@ -14,11 +54,6 @@ lazy_static::lazy_static! {
pub static ref SEQUENTIAL_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_sequential_sync_result_timeout");

// random auto sync
// pub static ref RANDOM_STATE_TXS_SYNCING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_syncing", 1024);
// pub static ref RANDOM_STATE_TXS_READY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_ready", 1024);
// pub static ref RANDOM_STATE_TXS_PENDING: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("sync_auto_random_state_txs_pending", 1024);

pub static ref RANDOM_SYNC_RESULT_COMPLETED: Arc<dyn Meter> = register_meter("sync_auto_random_sync_result_completed");
pub static ref RANDOM_SYNC_RESULT_FAILED: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_failed");
pub static ref RANDOM_SYNC_RESULT_TIMEOUT: Arc<dyn Counter<usize>> = CounterUsize::register("sync_auto_random_sync_result_timeout");
pub static ref RANDOM_ANNOUNCED: Arc<RandomBatcherMetrics> = Arc::new(RandomBatcherMetrics::new("sync_auto_random_announced"));
pub static ref RANDOM_HISTORICAL: Arc<RandomBatcherMetrics> = Arc::new(RandomBatcherMetrics::new("sync_auto_random_historical"));
}
Loading

0 comments on commit 8790fe1

Please sign in to comment.