Skip to content

Commit

Permalink
sync status
Browse files Browse the repository at this point in the history
  • Loading branch information
chong-he committed Nov 6, 2024
1 parent 2ace7c7 commit e87f9b9
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 50 deletions.
176 changes: 127 additions & 49 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
if let Some(slot_clock) = slot_clock {
match check_node_health(&self.beacon_node, log).await {
Ok((head, is_optimistic, el_offline)) => {
println!("{} {} {}", head, is_optimistic, el_offline);
let Some(slot_clock_head) = slot_clock.now() else {
let e = match slot_clock.is_prior_to_genesis() {
Some(true) => CandidateError::PreGenesis,
Expand Down Expand Up @@ -274,6 +275,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
Err(e) => {
// Set the health as `Err` which is sorted last in the list.
*self.health.write().await = Err(e);
println!("{:?}", e);
Err(e)
}
}
Expand Down Expand Up @@ -703,6 +705,7 @@ impl ApiTopic {
mod tests {
use std::str::FromStr;

use logging::test_logger;
use strum::VariantNames;

use eth2::Timeouts;
Expand Down Expand Up @@ -842,33 +845,57 @@ mod tests {
}

#[tokio::test]
async fn check_fallback_broadcast_with_bn_delay() {
let test_rig = ValidatorTestRig::new().await;
async fn update_all_candidates_should_update_sync_status() {
let mut mock_beacon_node_one = MockBeaconNode::<E>::new().await;
let mut mock_beacon_node_two = MockBeaconNode::<E>::new().await;
let mut mock_beacon_node_three = MockBeaconNode::<E>::new().await;

let beacon_node_one =
CandidateBeaconNode::new(mock_beacon_node_one.beacon_api_client.clone(), 0);
CandidateBeaconNode::<E>::new(mock_beacon_node_one.beacon_api_client.clone(), 0);
let beacon_node_two =
CandidateBeaconNode::new(mock_beacon_node_two.beacon_api_client.clone(), 1);

let beacon_node_fallback = BeaconNodeFallback::new(
vec![beacon_node_one, beacon_node_two],
CandidateBeaconNode::<E>::new(mock_beacon_node_two.beacon_api_client.clone(), 1);
let beacon_node_three =
CandidateBeaconNode::<E>::new(mock_beacon_node_three.beacon_api_client.clone(), 2);

let beacon_node_fallback: BeaconNodeFallback<TestingSlotClock, E> = BeaconNodeFallback::new(
// Put this out of order to be sorted later
vec![
beacon_node_two.clone(),
beacon_node_three.clone(),
beacon_node_one.clone(),
],
Config::default(),
vec![ApiTopic::Blocks], // to broadcast blocks to both bns
test_rig.spec.clone(),
test_rig.logger.clone(),
vec![], // to broadcast blocks to both bns
Arc::new(MainnetEthSpec::default_spec()),
test_logger(),
);

let mock1 = mock_beacon_node_one.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(5));
let mock2 = mock_beacon_node_two.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0));
// BeaconNodeHealthTier 1
mock_beacon_node_one.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 3
mock_beacon_node_two.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: false,
el_offline: true,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});
// BeaconNodeHealthTier 5
mock_beacon_node_three.mock_get_node_syncing(eth2::types::SyncingData {
is_syncing: false,
is_optimistic: true,
el_offline: false,
head_slot: Slot::new(1),
sync_distance: Slot::new(0),
});

// Update all beacon nodes health
beacon_node_fallback.update_all_candidates().await;
let first_beacon_node = {
let candidates = beacon_node_fallback.candidates.read().await;
candidates.first().cloned()
};

// logging to check health
{
Expand All @@ -880,39 +907,90 @@ mod tests {
candidate.health.read().await
);
}
}

let block_service: BlockService<TestingSlotClock, MainnetEthSpec> =
BlockServiceBuilder::new()
.slot_clock(test_rig.slot_clock)
.validator_store(test_rig.validator_store.clone())
.beacon_nodes(Arc::new(first_beacon_node))
.runtime_context(test_rig.runtime_context)
.build()
.unwrap();

let validators = test_rig.validator_store.initialized_validators();
let validators = validators.read();
let first_validator = validators.validator_definitions().first().unwrap();
let unsigned_block = UnsignedBlock::Blinded(BlindedBeaconBlock::Deneb(
BeaconBlockDeneb::empty(&test_rig.spec),
));
let voting_pubkey = first_validator.voting_public_key.clone();
let result = block_service
.publish_block_for_testing(Slot::new(1), &voting_pubkey.into(), unsigned_block)
.await;

mock1.expect(1).assert();
mock2.expect(1).assert();

let received_blocks_one = mock_beacon_node_one.received_blocks.lock().unwrap();
let received_blocks_two = mock_beacon_node_two.received_blocks.lock().unwrap();

assert_eq!(received_blocks_one.len(), 1);
assert_eq!(received_blocks_two.len(), 0);

if let Err(e) = result {
panic!("Expected block to be broadcasted to BN, but failed with error: {e:?}");
assert_eq!(
vec![beacon_node_one, beacon_node_two, beacon_node_three],
*candidates
);
}
}

// #[tokio::test]
// async fn check_fallback_broadcast_with_bn_delay() {
// let test_rig = ValidatorTestRig::new().await;
// let mut mock_beacon_node_one = MockBeaconNode::<E>::new().await;
// let mut mock_beacon_node_two = MockBeaconNode::<E>::new().await;

// let beacon_node_one =
// CandidateBeaconNode::new(mock_beacon_node_one.beacon_api_client.clone(), 0);
// let beacon_node_two =
// CandidateBeaconNode::new(mock_beacon_node_two.beacon_api_client.clone(), 1);

// let beacon_node_fallback = BeaconNodeFallback::new(
// vec![beacon_node_one, beacon_node_two],
// Config::default(),
// vec![ApiTopic::Blocks], // to broadcast blocks to both bns
// test_rig.spec.clone(),
// test_rig.logger.clone(),
// );

// let mock1 = mock_beacon_node_one.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0));
// let mock2 = mock_beacon_node_two.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0));

// // logging to check health
// {
// let candidates = beacon_node_fallback.candidates.read().await;
// for (i, candidate) in candidates.iter().enumerate() {
// println!(
// "Candidate {} health: {:?}",
// i,
// candidate.health.read().await
// );
// }
// }

// let block_service: BlockService<TestingSlotClock, MainnetEthSpec> =
// BlockServiceBuilder::new()
// .slot_clock(test_rig.slot_clock)
// .validator_store(test_rig.validator_store.clone())
// .beacon_nodes(Arc::new(beacon_node_fallback))
// .runtime_context(test_rig.runtime_context)
// .build()
// .unwrap();

// // Update all beacon nodes health
// beacon_node_fallback.update_all_candidates().await;
// let first_beacon_node = {
// let candidates = beacon_node_fallback.candidates.read().await;
// candidates.first().cloned()
// };

// let validators = test_rig.validator_store.initialized_validators();
// let validators = validators.read();
// let first_validator = validators.validator_definitions().first().unwrap();
// let unsigned_block = UnsignedBlock::Blinded(BlindedBeaconBlock::Deneb(
// BeaconBlockDeneb::empty(&test_rig.spec),
// ));
// let voting_pubkey = first_validator.voting_public_key.clone();
// let result = block_service
// .publish_block_for_testing(Slot::new(1), &voting_pubkey.into(), unsigned_block)
// .await;

// mock1.expect(1).assert();
// mock2.expect(1).assert();

// let received_blocks_one = mock_beacon_node_one.received_blocks.lock().unwrap();
// let received_blocks_two = mock_beacon_node_two.received_blocks.lock().unwrap();

// assert_eq!(received_blocks_one.len(), 1);
// assert_eq!(received_blocks_two.len(), 0);

// // first success fn
// // --broadcast
// // request

// if let Err(e) = result {
// panic!("Expected block to be broadcasted to BN, but failed with error: {e:?}");
// }
// }
}
14 changes: 13 additions & 1 deletion validator_client/src/testing/mock_beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use eth2::types::{FullBlockContents, ProduceBlockV3Metadata};
use eth2::types::{FullBlockContents, GenericResponse, ProduceBlockV3Metadata, SyncingData};
use eth2::{
BeaconNodeHttpClient, CONSENSUS_BLOCK_VALUE_HEADER, CONSENSUS_VERSION_HEADER,
EXECUTION_PAYLOAD_BLINDED_HEADER, EXECUTION_PAYLOAD_VALUE_HEADER,
Expand Down Expand Up @@ -105,6 +105,18 @@ impl<E: EthSpec> MockBeaconNode<E> {
self
}

pub fn mock_get_node_syncing(&mut self, response: SyncingData) {
let path_pattern = Regex::new(r"^/eth/v1/node/syncing$").unwrap();

let data = GenericResponse::from(response);

self.server
.mock("GET", Matcher::Regex(path_pattern.to_string()))
.with_status(200)
.with_body(serde_json::to_string(&data).unwrap())
.create();
}

pub fn mock_post_beacon_blinded_blocks_v1(&mut self, delay: Duration) -> Mock {
let path_pattern = Regex::new(r"^/eth/v1/beacon/blinded_blocks$").unwrap();
let log = self.log.clone();
Expand Down

0 comments on commit e87f9b9

Please sign in to comment.