From e87f9b9128e8ed9916a4139c58e00e0c021d08ec Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 6 Nov 2024 14:13:53 +0800 Subject: [PATCH] sync status --- validator_client/src/beacon_node_fallback.rs | 176 +++++++++++++----- .../src/testing/mock_beacon_node.rs | 14 +- 2 files changed, 140 insertions(+), 50 deletions(-) diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 349a5e5e0b..25c430c022 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -230,6 +230,7 @@ impl CandidateBeaconNode { if let Some(slot_clock) = slot_clock { match check_node_health(&self.beacon_node, log).await { Ok((head, is_optimistic, el_offline)) => { + println!("{} {} {}", head, is_optimistic, el_offline); let Some(slot_clock_head) = slot_clock.now() else { let e = match slot_clock.is_prior_to_genesis() { Some(true) => CandidateError::PreGenesis, @@ -274,6 +275,7 @@ impl CandidateBeaconNode { Err(e) => { // Set the health as `Err` which is sorted last in the list. *self.health.write().await = Err(e); + println!("{:?}", e); Err(e) } } @@ -703,6 +705,7 @@ impl ApiTopic { mod tests { use std::str::FromStr; + use logging::test_logger; use strum::VariantNames; use eth2::Timeouts; @@ -842,33 +845,57 @@ mod tests { } #[tokio::test] - async fn check_fallback_broadcast_with_bn_delay() { - let test_rig = ValidatorTestRig::new().await; + async fn update_all_candidates_should_update_sync_status() { let mut mock_beacon_node_one = MockBeaconNode::::new().await; let mut mock_beacon_node_two = MockBeaconNode::::new().await; + let mut mock_beacon_node_three = MockBeaconNode::::new().await; let beacon_node_one = - CandidateBeaconNode::new(mock_beacon_node_one.beacon_api_client.clone(), 0); + CandidateBeaconNode::::new(mock_beacon_node_one.beacon_api_client.clone(), 0); let beacon_node_two = - CandidateBeaconNode::new(mock_beacon_node_two.beacon_api_client.clone(), 1); - - let beacon_node_fallback = BeaconNodeFallback::new( - vec![beacon_node_one, beacon_node_two], + CandidateBeaconNode::::new(mock_beacon_node_two.beacon_api_client.clone(), 1); + let beacon_node_three = + CandidateBeaconNode::::new(mock_beacon_node_three.beacon_api_client.clone(), 2); + + let beacon_node_fallback: BeaconNodeFallback = BeaconNodeFallback::new( + // Put this out of order to be sorted later + vec![ + beacon_node_two.clone(), + beacon_node_three.clone(), + beacon_node_one.clone(), + ], Config::default(), - vec![ApiTopic::Blocks], // to broadcast blocks to both bns - test_rig.spec.clone(), - test_rig.logger.clone(), + vec![], // to broadcast blocks to both bns + Arc::new(MainnetEthSpec::default_spec()), + test_logger(), ); - let mock1 = mock_beacon_node_one.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(5)); - let mock2 = mock_beacon_node_two.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0)); + // BeaconNodeHealthTier 1 + mock_beacon_node_one.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: false, + el_offline: false, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); + // BeaconNodeHealthTier 3 + mock_beacon_node_two.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: false, + el_offline: true, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); + // BeaconNodeHealthTier 5 + mock_beacon_node_three.mock_get_node_syncing(eth2::types::SyncingData { + is_syncing: false, + is_optimistic: true, + el_offline: false, + head_slot: Slot::new(1), + sync_distance: Slot::new(0), + }); - // Update all beacon nodes health beacon_node_fallback.update_all_candidates().await; - let first_beacon_node = { - let candidates = beacon_node_fallback.candidates.read().await; - candidates.first().cloned() - }; // logging to check health { @@ -880,39 +907,90 @@ mod tests { candidate.health.read().await ); } - } - let block_service: BlockService = - BlockServiceBuilder::new() - .slot_clock(test_rig.slot_clock) - .validator_store(test_rig.validator_store.clone()) - .beacon_nodes(Arc::new(first_beacon_node)) - .runtime_context(test_rig.runtime_context) - .build() - .unwrap(); - - let validators = test_rig.validator_store.initialized_validators(); - let validators = validators.read(); - let first_validator = validators.validator_definitions().first().unwrap(); - let unsigned_block = UnsignedBlock::Blinded(BlindedBeaconBlock::Deneb( - BeaconBlockDeneb::empty(&test_rig.spec), - )); - let voting_pubkey = first_validator.voting_public_key.clone(); - let result = block_service - .publish_block_for_testing(Slot::new(1), &voting_pubkey.into(), unsigned_block) - .await; - - mock1.expect(1).assert(); - mock2.expect(1).assert(); - - let received_blocks_one = mock_beacon_node_one.received_blocks.lock().unwrap(); - let received_blocks_two = mock_beacon_node_two.received_blocks.lock().unwrap(); - - assert_eq!(received_blocks_one.len(), 1); - assert_eq!(received_blocks_two.len(), 0); - - if let Err(e) = result { - panic!("Expected block to be broadcasted to BN, but failed with error: {e:?}"); + assert_eq!( + vec![beacon_node_one, beacon_node_two, beacon_node_three], + *candidates + ); } } + + // #[tokio::test] + // async fn check_fallback_broadcast_with_bn_delay() { + // let test_rig = ValidatorTestRig::new().await; + // let mut mock_beacon_node_one = MockBeaconNode::::new().await; + // let mut mock_beacon_node_two = MockBeaconNode::::new().await; + + // let beacon_node_one = + // CandidateBeaconNode::new(mock_beacon_node_one.beacon_api_client.clone(), 0); + // let beacon_node_two = + // CandidateBeaconNode::new(mock_beacon_node_two.beacon_api_client.clone(), 1); + + // let beacon_node_fallback = BeaconNodeFallback::new( + // vec![beacon_node_one, beacon_node_two], + // Config::default(), + // vec![ApiTopic::Blocks], // to broadcast blocks to both bns + // test_rig.spec.clone(), + // test_rig.logger.clone(), + // ); + + // let mock1 = mock_beacon_node_one.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0)); + // let mock2 = mock_beacon_node_two.mock_post_beacon_blinded_blocks_v1(Duration::from_secs(0)); + + // // logging to check health + // { + // let candidates = beacon_node_fallback.candidates.read().await; + // for (i, candidate) in candidates.iter().enumerate() { + // println!( + // "Candidate {} health: {:?}", + // i, + // candidate.health.read().await + // ); + // } + // } + + // let block_service: BlockService = + // BlockServiceBuilder::new() + // .slot_clock(test_rig.slot_clock) + // .validator_store(test_rig.validator_store.clone()) + // .beacon_nodes(Arc::new(beacon_node_fallback)) + // .runtime_context(test_rig.runtime_context) + // .build() + // .unwrap(); + + // // Update all beacon nodes health + // beacon_node_fallback.update_all_candidates().await; + // let first_beacon_node = { + // let candidates = beacon_node_fallback.candidates.read().await; + // candidates.first().cloned() + // }; + + // let validators = test_rig.validator_store.initialized_validators(); + // let validators = validators.read(); + // let first_validator = validators.validator_definitions().first().unwrap(); + // let unsigned_block = UnsignedBlock::Blinded(BlindedBeaconBlock::Deneb( + // BeaconBlockDeneb::empty(&test_rig.spec), + // )); + // let voting_pubkey = first_validator.voting_public_key.clone(); + // let result = block_service + // .publish_block_for_testing(Slot::new(1), &voting_pubkey.into(), unsigned_block) + // .await; + + // mock1.expect(1).assert(); + // mock2.expect(1).assert(); + + // let received_blocks_one = mock_beacon_node_one.received_blocks.lock().unwrap(); + // let received_blocks_two = mock_beacon_node_two.received_blocks.lock().unwrap(); + + // assert_eq!(received_blocks_one.len(), 1); + // assert_eq!(received_blocks_two.len(), 0); + + // // first success fn + // // --broadcast + // // request + + // if let Err(e) = result { + // panic!("Expected block to be broadcasted to BN, but failed with error: {e:?}"); + // } + // } } diff --git a/validator_client/src/testing/mock_beacon_node.rs b/validator_client/src/testing/mock_beacon_node.rs index 47f06c3c64..39ea25227b 100644 --- a/validator_client/src/testing/mock_beacon_node.rs +++ b/validator_client/src/testing/mock_beacon_node.rs @@ -7,7 +7,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; -use eth2::types::{FullBlockContents, ProduceBlockV3Metadata}; +use eth2::types::{FullBlockContents, GenericResponse, ProduceBlockV3Metadata, SyncingData}; use eth2::{ BeaconNodeHttpClient, CONSENSUS_BLOCK_VALUE_HEADER, CONSENSUS_VERSION_HEADER, EXECUTION_PAYLOAD_BLINDED_HEADER, EXECUTION_PAYLOAD_VALUE_HEADER, @@ -105,6 +105,18 @@ impl MockBeaconNode { self } + pub fn mock_get_node_syncing(&mut self, response: SyncingData) { + let path_pattern = Regex::new(r"^/eth/v1/node/syncing$").unwrap(); + + let data = GenericResponse::from(response); + + self.server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_body(serde_json::to_string(&data).unwrap()) + .create(); + } + pub fn mock_post_beacon_blinded_blocks_v1(&mut self, delay: Duration) -> Mock { let path_pattern = Regex::new(r"^/eth/v1/beacon/blinded_blocks$").unwrap(); let log = self.log.clone();