From 99ead6423b29560b4700d34884786ab5a8192375 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Mon, 22 Jan 2024 18:19:28 +0800 Subject: [PATCH] add python test for chunks sync by rpc --- node/sync/src/service.rs | 28 +++++----- tests/sync_test.py | 95 ++++++++++++++++++++++++-------- tests/test_all.py | 5 +- tests/test_framework/zgs_node.py | 9 ++- 4 files changed, 98 insertions(+), 39 deletions(-) diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index d99ddba2..bc407c41 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -399,20 +399,20 @@ impl SyncService { } // file may be removed, but remote peer still find one from the file location cache - let finalized = self.store.check_tx_completed(request.tx_id.seq).await?; - if !finalized { - info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized"); - // FIXME(zz): If remote removes a file, we will also get failure here. - // self.ctx - // .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized"); - self.ctx.send(NetworkMessage::SendErrorResponse { - peer_id, - error: RPCResponseErrorCode::InvalidRequest, - reason: "Tx not finalized".into(), - id: request_id, - }); - return Ok(()); - } + // let finalized = self.store.check_tx_completed(request.tx_id.seq).await?; + // if !finalized { + // info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized"); + // // FIXME(zz): If remote removes a file, we will also get failure here. + // // self.ctx + // // .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized"); + // self.ctx.send(NetworkMessage::SendErrorResponse { + // peer_id, + // error: RPCResponseErrorCode::InvalidRequest, + // reason: "Tx not finalized".into(), + // id: request_id, + // }); + // return Ok(()); + // } let result = self .store diff --git a/tests/sync_test.py b/tests/sync_test.py index 781927b5..157e9e2c 100755 --- a/tests/sync_test.py +++ b/tests/sync_test.py @@ -1,58 +1,109 @@ #!/usr/bin/env python3 import random +import time from test_framework.test_framework import TestFramework from utility.submission import create_submission -from utility.submission import submit_data +from utility.submission import submit_data, data_to_segments from utility.utils import ( assert_equal, wait_until, ) - class SyncTest(TestFramework): def setup_params(self): self.num_blockchain_nodes = 2 self.num_nodes = 2 + self.__deployed_contracts = 0 def run_test(self): - client1 = self.nodes[0] - client2 = self.nodes[1] - - self.stop_storage_node(1) + # By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false, + # and file or chunks sync should be triggered by rpc. + self.__test_sync_file_by_rpc() + self.__test_sync_chunks_by_rpc() - size = 256 * 1024 - chunk_data = random.randbytes(size) + def __test_sync_file_by_rpc(self): + self.log.info("Begin to test file sync by rpc") - submissions, data_root = create_submission(chunk_data) - self.log.info("data root: %s, submissions: %s", data_root, submissions) - self.contract.submit(submissions) + client1 = self.nodes[0] + client2 = self.nodes[1] - wait_until(lambda: self.contract.num_submissions() == 1) + # Create submission + chunk_data = random.randbytes(256 * 1024) + data_root = self.__create_submission(chunk_data) + # Ensure log entry sync from blockchain node wait_until(lambda: client1.zgs_get_file_info(data_root) is not None) assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False) + # Upload file to storage node segments = submit_data(client1, chunk_data) - self.log.info( - "segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments] - ) - + self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"]) - self.start_storage_node(1) - self.nodes[1].wait_for_rpc_connection() + # File should not be auto sync on node 2 + wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) + time.sleep(3) + assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) - client2.admin_start_sync_file(0) + # Trigger file sync by rpc + assert(client2.admin_start_sync_file(0) is None) wait_until(lambda: client2.sycn_status_is_completed_or_unknown(0)) - wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"]) + + # Validate data assert_equal( - client2.zgs_download_segment(data_root, 0, 1), - client1.zgs_download_segment(data_root, 0, 1), + client2.zgs_download_segment(data_root, 0, 1024), + client1.zgs_download_segment(data_root, 0, 1024), ) + def __test_sync_chunks_by_rpc(self): + self.log.info("Begin to test chunks sync by rpc") + + client1 = self.nodes[0] + client2 = self.nodes[1] + + # Prepare 3 segments to upload + chunk_data = random.randbytes(256 * 1024 * 3) + data_root = self.__create_submission(chunk_data) + + # Ensure log entry sync from blockchain node + wait_until(lambda: client1.zgs_get_file_info(data_root) is not None) + assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False) + + # Upload only 2nd segment to storage node + segments = data_to_segments(chunk_data) + self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]) + assert(client1.zgs_upload_segment(segments[1]) is None) + + # segment 0 is not able to download + assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None) + # segment 1 is available to download + assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256]) + # segment 2 is not able to download + assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None) + + # Segment 1 should not be able to download on node 2 + wait_until(lambda: client2.zgs_get_file_info(data_root) is not None) + assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False) + assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None) + + # Trigger chunks sync by rpc + assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None) + wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1)) + wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None) + + # Validate data + assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256]) + + def __create_submission(self, chunk_data: bytes) -> str: + submissions, data_root = create_submission(chunk_data) + self.contract.submit(submissions) + self.__deployed_contracts += 1 + wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts) + self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions) + return data_root if __name__ == "__main__": SyncTest().main() diff --git a/tests/test_all.py b/tests/test_all.py index 2507a823..5f7236bf 100755 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -16,6 +16,7 @@ __file_path__ = os.path.dirname(os.path.realpath(__file__)) +CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux" def run_single_test(py, script, test_dir, index, port_min, port_max): try: @@ -61,7 +62,7 @@ def run(): if not os.path.exists(dir_name): os.makedirs(dir_name, exist_ok=True) - conflux_path = os.path.join(dir_name, "conflux") + conflux_path = os.path.join(dir_name, CONFLUX_BINARY) if not os.path.exists(conflux_path): build_conflux(conflux_path) @@ -155,7 +156,7 @@ def build_conflux(conflux_path): os.chdir(destination_path) os.system("cargo build --release --bin conflux") - path = os.path.join(destination_path, "target", "release", "conflux") + path = os.path.join(destination_path, "target", "release", CONFLUX_BINARY) shutil.copyfile(path, conflux_path) if not is_windows_platform(): diff --git a/tests/test_framework/zgs_node.py b/tests/test_framework/zgs_node.py index a8307a7f..c94e53b0 100644 --- a/tests/test_framework/zgs_node.py +++ b/tests/test_framework/zgs_node.py @@ -1,6 +1,6 @@ import os import shutil -import sys +import base64 from config.node_config import ZGS_CONFIG from test_framework.blockchain_node import NodeType, TestNode @@ -85,6 +85,10 @@ def zgs_upload_segment(self, segment): def zgs_download_segment(self, data_root, start_index, end_index): return self.rpc.zgs_downloadSegment([data_root, start_index, end_index]) + + def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes: + encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index]) + return None if encodedSegment is None else base64.b64decode(encodedSegment) def zgs_get_file_info(self, data_root): return self.rpc.zgs_getFileInfo([data_root]) @@ -98,6 +102,9 @@ def shutdown(self): def admin_start_sync_file(self, tx_seq): return self.rpc.admin_startSyncFile([tx_seq]) + + def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int): + return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index]) def admin_get_sync_status(self, tx_seq): return self.rpc.admin_getSyncStatus([tx_seq])