Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python test for chunks sync by rpc #6

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions node/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,20 @@ impl SyncService {
}

// file may be removed, but remote peer still find one from the file location cache
let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
if !finalized {
info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
// FIXME(zz): If remote removes a file, we will also get failure here.
// self.ctx
// .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
self.ctx.send(NetworkMessage::SendErrorResponse {
peer_id,
error: RPCResponseErrorCode::InvalidRequest,
reason: "Tx not finalized".into(),
id: request_id,
});
return Ok(());
}
// let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
// if !finalized {
// info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
// // FIXME(zz): If remote removes a file, we will also get failure here.
// // self.ctx
// // .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
// self.ctx.send(NetworkMessage::SendErrorResponse {
// peer_id,
// error: RPCResponseErrorCode::InvalidRequest,
// reason: "Tx not finalized".into(),
// id: request_id,
// });
// return Ok(());
// }

let result = self
.store
Expand Down
95 changes: 73 additions & 22 deletions tests/sync_test.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,109 @@
#!/usr/bin/env python3

import random
import time

from test_framework.test_framework import TestFramework
from utility.submission import create_submission
from utility.submission import submit_data
from utility.submission import submit_data, data_to_segments
from utility.utils import (
assert_equal,
wait_until,
)


class SyncTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 2
self.num_nodes = 2
self.__deployed_contracts = 0

def run_test(self):
client1 = self.nodes[0]
client2 = self.nodes[1]

self.stop_storage_node(1)
# By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
# and file or chunks sync should be triggered by rpc.
self.__test_sync_file_by_rpc()
self.__test_sync_chunks_by_rpc()

size = 256 * 1024
chunk_data = random.randbytes(size)
def __test_sync_file_by_rpc(self):
self.log.info("Begin to test file sync by rpc")

submissions, data_root = create_submission(chunk_data)
self.log.info("data root: %s, submissions: %s", data_root, submissions)
self.contract.submit(submissions)
client1 = self.nodes[0]
client2 = self.nodes[1]

wait_until(lambda: self.contract.num_submissions() == 1)
# Create submission
chunk_data = random.randbytes(256 * 1024)
data_root = self.__create_submission(chunk_data)

# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)

# Upload file to storage node
segments = submit_data(client1, chunk_data)
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)

self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])

self.start_storage_node(1)
self.nodes[1].wait_for_rpc_connection()
# File should not be auto sync on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
time.sleep(3)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)

client2.admin_start_sync_file(0)
# Trigger file sync by rpc
assert(client2.admin_start_sync_file(0) is None)
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(0))

wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])

# Validate data
assert_equal(
client2.zgs_download_segment(data_root, 0, 1),
client1.zgs_download_segment(data_root, 0, 1),
client2.zgs_download_segment(data_root, 0, 1024),
client1.zgs_download_segment(data_root, 0, 1024),
)

def __test_sync_chunks_by_rpc(self):
self.log.info("Begin to test chunks sync by rpc")

client1 = self.nodes[0]
client2 = self.nodes[1]

# Prepare 3 segments to upload
chunk_data = random.randbytes(256 * 1024 * 3)
data_root = self.__create_submission(chunk_data)

# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)

# Upload only 2nd segment to storage node
segments = data_to_segments(chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
assert(client1.zgs_upload_segment(segments[1]) is None)

# segment 0 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None)
# segment 1 is available to download
assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
# segment 2 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None)

# Segment 1 should not be able to download on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)

# Trigger chunks sync by rpc
assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None)
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1))
wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None)

# Validate data
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])

def __create_submission(self, chunk_data: bytes) -> str:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.__deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts)
self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
return data_root

if __name__ == "__main__":
SyncTest().main()
5 changes: 3 additions & 2 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

__file_path__ = os.path.dirname(os.path.realpath(__file__))

CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux"

def run_single_test(py, script, test_dir, index, port_min, port_max):
try:
Expand Down Expand Up @@ -61,7 +62,7 @@ def run():
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)

conflux_path = os.path.join(dir_name, "conflux")
conflux_path = os.path.join(dir_name, CONFLUX_BINARY)
if not os.path.exists(conflux_path):
build_conflux(conflux_path)

Expand Down Expand Up @@ -155,7 +156,7 @@ def build_conflux(conflux_path):
os.chdir(destination_path)
os.system("cargo build --release --bin conflux")

path = os.path.join(destination_path, "target", "release", "conflux")
path = os.path.join(destination_path, "target", "release", CONFLUX_BINARY)
shutil.copyfile(path, conflux_path)

if not is_windows_platform():
Expand Down
9 changes: 8 additions & 1 deletion tests/test_framework/zgs_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import shutil
import sys
import base64

from config.node_config import ZGS_CONFIG
from test_framework.blockchain_node import NodeType, TestNode
Expand Down Expand Up @@ -85,6 +85,10 @@ def zgs_upload_segment(self, segment):

def zgs_download_segment(self, data_root, start_index, end_index):
return self.rpc.zgs_downloadSegment([data_root, start_index, end_index])

def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index])
return None if encodedSegment is None else base64.b64decode(encodedSegment)

def zgs_get_file_info(self, data_root):
return self.rpc.zgs_getFileInfo([data_root])
Expand All @@ -98,6 +102,9 @@ def shutdown(self):

def admin_start_sync_file(self, tx_seq):
return self.rpc.admin_startSyncFile([tx_seq])

def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int):
return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index])

def admin_get_sync_status(self, tx_seq):
return self.rpc.admin_getSyncStatus([tx_seq])
Expand Down
Loading