Skip to content

Commit

Permalink
Merge pull request #12 from luckychacha/main
Browse files Browse the repository at this point in the history
Feat: Add restart logic when websocket connection failed, Add Block Submit Threshold
  • Loading branch information
Acaishiba authored Jan 18, 2024
2 parents 28d73b0 + 6aae2b7 commit 6335457
Showing 1 changed file with 64 additions and 23 deletions.
87 changes: 64 additions & 23 deletions node/src/avail_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ where
<<B as BlockT>::Header as HeaderT>::Number: Into<u32>,
{
let block_number = Some(BlockNumber::from(block_num));
let block_hash = avail_client.rpc().block_hash(block_number).await?;
let block_hash = avail_client
.rpc()
.block_hash(block_number)
.await
.map_err(|_| anyhow::anyhow!("DA Layer error: reconnect required"))?;
// log::info!("block_hash:{:?}", block_hash);
let rpc_response = avail_client.rpc().block(block_hash).await?;
let rpc_response = avail_client
.rpc()
.block(block_hash)
.await
.map_err(|_| anyhow::anyhow!("DA Layer error: reconnect required"))?;
let submitted_block = rpc_response.unwrap();
let find_result = submitted_block
.block
Expand Down Expand Up @@ -151,6 +159,8 @@ where
let avail_client_ref = guard.as_ref().unwrap();

let block_number: u32 = (*notification.header.number()).into();
let mut reconnect_flag = false;

// Query
if block_number % 5 == 0 {
// Sync From Pallet
Expand Down Expand Up @@ -183,7 +193,18 @@ where
};

if last_submit_block_confirm != last_submit_block {
let avail_latest_finalized_height = get_avail_latest_finalized_height(avail_client_ref).await.unwrap();
let latest_finalized_height_result = get_avail_latest_finalized_height(avail_client_ref).await.map_or_else(
|_e| {
log::info!("================ QUERY TASK | DA Layer error: reconnect required ================");
reconnect_flag = true;
Err(0)
},
|height| Ok(height),
);
if latest_finalized_height_result.is_err() {
continue;
}
let avail_latest_finalized_height = latest_finalized_height_result.unwrap();
log::info!(
"================ QUERY TASK | Avail Block Query Range: {:?} to {:?} ================",
last_avail_scan_block_confirm,
Expand All @@ -199,29 +220,37 @@ where
for block_number in last_avail_scan_block_confirm..=avail_latest_finalized_height {
log::info!("================ QUERY TASK | search avail block:{:?} ================", block_number);
for block_number_solo in last_submit_block_confirm + 1..=last_submit_block {
if let Ok(find_result) = query_block_exist(
match query_block_exist(
avail_client_ref,
client.clone(),
block_number,
block_number_solo,
)
.await
{
log::info!(
"================ QUERY TASK | solo block:{:?}, find result:{:?}========",
block_number_solo,
find_result
);
if !find_result {
break;
Ok(find_result) => {
log::info!(
"================ QUERY TASK | solo block:{:?}, find result:{:?}========",
block_number_solo,
find_result
);
if !find_result {
break;
}
confirm_block_number = block_number_solo;
last_avail_scan_block = block_number;
},
Err(e) => {
if e.to_string().contains("DA Layer error: reconnect required") {
log::info!("================ QUERY TASK | DA Layer error: reconnect required ================");
reconnect_flag = true;
break;
}
log::info!(
"Query task DA Layer error block_number: {:?} not found in DA Layer",
block_number
);
}
confirm_block_number = block_number_solo;
last_avail_scan_block = block_number;
} else {
log::info!(
"Query task DA Layer error block_number: {:?} not found in DA Layer",
block_number
)
}
}
}
Expand Down Expand Up @@ -266,9 +295,15 @@ where
continue;
}

// log::info!("================ SUBMIT TASK | after: last_submit_block:{:?} ================", last_submit_block);
log::info!("================ SUBMIT TASK | submit latest_final_height:{:?} ================", latest_final_height);
for block_number_solo in last_submit_block + 1..=latest_final_height {
// If last_submit_block + 1..=latest_final_height is larger than 30, submit 30 blocks
let batch_submit_block_number = if latest_final_height - last_submit_block > 30 {
last_submit_block + 30
} else {
latest_final_height
};
log::info!("================ SUBMIT TASK | submit latest_final_height:{:?} batch_submit_block_number: {:?} ================", latest_final_height, batch_submit_block_number);

for block_number_solo in last_submit_block + 1..=batch_submit_block_number {
let rollup_block_hash =
client.block_hash(block_number_solo.into()).unwrap().unwrap();
let rollup_block: Option<sp_runtime::generic::SignedBlock<B>> =
Expand All @@ -295,8 +330,10 @@ where
),
Err(_e) => {
log::info!(
"Submit task DA Layer error : failed due to closed websocket connection"
)
"Submit task DA Layer error : failed due to closed websocket connection"
);
reconnect_flag = true;
break;
},
};
},
Expand All @@ -317,6 +354,10 @@ where
avail_record_local.awaiting_inherent_processing = true;
}
}

if reconnect_flag {
*avail_client.lock().await = None::<OnlineClient<AvailConfig>>;
}
}
}
});
Expand Down

0 comments on commit 6335457

Please sign in to comment.