Skip to content

Commit

Permalink
snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Mar 27, 2024
1 parent 5b58d5e commit 03f3137
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
36 changes: 30 additions & 6 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ impl Network {
let ping_client = ping_client;
ping_client.ping_loop(ctx, *ping_timeout).await
});
}
if peer == &self.key.public() {

}
service.run(ctx, stream).await?;
Ok(())
Expand Down Expand Up @@ -227,20 +230,41 @@ impl Network {
let key = self.key.public();
let mut outbound = self.outbound.subscribe();
let mut addrs = self.gossip.validator_addrs.subscribe();
// Current address of this node.
let mut my_addr = None;
while ctx.is_active() {
let _ : Result<(),_> = scope::run!(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),|ctx,s| async {
s.spawn::<()>(async {
loop {
// Wait for one of the following:
let _ : ctx::OrCanceled<()> = scope::run!(ctx,|ctx,s| async {
// loopback connection was established to a different address (this node's address has changed)
s.spawn_bg::<()>(async {
sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?;
Err(ctx::Canceled)
tracing::info!("loopback conn addr");
s.cancel();
Ok(())
});
sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?;
Err(ctx::Canceled)
// an announcement from the node's previous execution has been received which
// overrides our announcement (reannouncement is needed).
s.spawn_bg::<()>(async {
sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?;
tracing::info!("reannouncement");
s.cancel();
Ok(())
});
// timeout has passed.
ctx.sleep(ADDRESS_ANNOUNCER_INTERVAL).await?;
tracing::info!("timeout announcement");
Ok(())
}).await;
if !ctx.is_active() {
return;
}
// If a loopback connection exists, update the current address.
if let Some(conn) = outbound.borrow().current().get(&key) {
my_addr = Some(conn.addr);
}
// If address of this node is known, announce it.
if let Some(addr) = my_addr {
tracing::debug!("announcing validator address {addr}");
self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await;
}
}
Expand Down
15 changes: 5 additions & 10 deletions node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use test_casing::{test_casing, Product};
use tracing::Instrument as _;
use zksync_concurrency::{
ctx, oneshot, scope, sync,
ctx, net, oneshot, scope, sync,
testonly::{abort_on_panic, set_timeout},
time,
};
Expand Down Expand Up @@ -552,8 +552,8 @@ async fn validator_node_restart() {
let start = ctx.now_utc();
for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] {
// Set the new addr to broadcast.
let addr0 = mk_addr(rng);
cfgs[0].public_addr = addr0.into();
cfgs[0].server_addr = net::tcp::testonly::reserve_listener();
cfgs[0].public_addr = (*cfgs[0].server_addr).into();
// Shift the UTC clock.
let now = start + clock_shift;
assert!(
Expand All @@ -570,13 +570,8 @@ async fn validator_node_restart() {
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0")));
tracing::info!("wait for the update to arrive to node1");
let sub = &mut node1.net.gossip.validator_addrs.subscribe();
sync::wait_for(ctx, sub, |got| {
let Some(got) = got.get(&setup.keys[0].public()) else {
return false;
};
got.msg.addr == addr0
})
.await?;
let want = Some(*cfgs[0].server_addr);
sync::wait_for(ctx, sub, |got| got.get(&setup.keys[0].public()).map(|x|x.msg.addr) == want).await?;
Ok(())
})
.await?;
Expand Down
6 changes: 6 additions & 0 deletions node/libs/concurrency/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ impl std::fmt::Debug for Utc {
}
}

impl std::fmt::Display for Utc {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
(time::OffsetDateTime::UNIX_EPOCH + self.0).fmt(f)
}
}

/// Start of the unix epoch.
pub const UNIX_EPOCH: Utc = Utc(Duration::ZERO);

Expand Down

0 comments on commit 03f3137

Please sign in to comment.