From 20d7ace85af4c53db9f831eafec4ed6487c08111 Mon Sep 17 00:00:00 2001 From: "Michael L. Szulczewski" Date: Mon, 15 Aug 2022 01:16:39 -0400 Subject: [PATCH 1/2] Enable atomizer to wait until synched with watchtowers Previously, the atomizer controller would make one attempt to connect to watchtowers and would return false if the attempt failed. This required watchtowers to be initialized before atomizers, which required adding a sleep to the atomizer end-to-end integration test so the watchtowers had time to initialize. This change enables the atomizer to make multiple attempts to connect to watchtowers and precludes the need for the sleep in the integration test. Signed-off-by: Michael L. Szulczewski --- src/uhs/atomizer/atomizer/controller.cpp | 12 ++++++++---- tests/integration/atomizer_end_to_end_test.cpp | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/uhs/atomizer/atomizer/controller.cpp b/src/uhs/atomizer/atomizer/controller.cpp index 0ecf78ef9..5518f3cb6 100644 --- a/src/uhs/atomizer/atomizer/controller.cpp +++ b/src/uhs/atomizer/atomizer/controller.cpp @@ -58,11 +58,15 @@ namespace cbdc::atomizer { } auto controller::init() -> bool { - if(!m_watchtower_network.cluster_connect( - m_opts.m_watchtower_internal_endpoints)) { - m_logger->error("Failed to connect to watchtowers."); - return false; + m_watchtower_network.cluster_connect( + m_opts.m_watchtower_internal_endpoints, + false); + while(!m_watchtower_network.connected_to_one()) { + // TODO: should we limit the number of attempts? + m_logger->warn("Waiting to connect to watchtowers..."); + std::this_thread::sleep_for(std::chrono::seconds(1)); } + m_logger->info("Connected to watchtower network."); auto raft_params = nuraft::raft_params(); raft_params.election_timeout_lower_bound_ diff --git a/tests/integration/atomizer_end_to_end_test.cpp b/tests/integration/atomizer_end_to_end_test.cpp index 37337c296..8613f64ac 100644 --- a/tests/integration/atomizer_end_to_end_test.cpp +++ b/tests/integration/atomizer_end_to_end_test.cpp @@ -47,7 +47,7 @@ class atomizer_end_to_end_test : public ::testing::Test { ASSERT_TRUE(m_ctl_watchtower->init()); }); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // std::this_thread::sleep_for(std::chrono::milliseconds(100)); ASSERT_TRUE(m_ctl_atomizer->init()); ASSERT_TRUE(m_ctl_archiver->init()); From 371d24f776b8a81a69547a49081cc91d3ed8233a Mon Sep 17 00:00:00 2001 From: "Michael L. Szulczewski" Date: Mon, 22 Aug 2022 01:59:38 -0400 Subject: [PATCH 2/2] Enable components to wait for connecting to other components Previously, controllers for components such as the atomizer archiver, sentinel, shard, and watchtower would make one attempt to connect to other components and would return false if the attempt failed. This led to lots of failures when running 'docker compose --file docker-compose-atomizer.yml up'. This commit enables components to make multiple attempts to connect. Signed-off-by: Michael L. Szulczewski --- src/uhs/atomizer/archiver/controller.cpp | 8 +++--- src/uhs/atomizer/sentinel/controller.cpp | 12 ++++++--- src/uhs/atomizer/shard/controller.cpp | 30 ++++++++++++++-------- src/uhs/atomizer/watchtower/controller.cpp | 5 ++-- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/src/uhs/atomizer/archiver/controller.cpp b/src/uhs/atomizer/archiver/controller.cpp index 27975a1cb..ea5f2f64e 100644 --- a/src/uhs/atomizer/archiver/controller.cpp +++ b/src/uhs/atomizer/archiver/controller.cpp @@ -103,10 +103,12 @@ namespace cbdc::archiver { auto controller::init_atomizer_connection() -> bool { m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints, false); - if(!m_atomizer_network.connected_to_one()) { - m_logger->error("Failed to connect to any atomizers."); - return false; + while(!m_atomizer_network.connected_to_one()) { + // TODO: should we limit the number of attempts? + m_logger->warn("Waiting to connect to atomizers..."); + std::this_thread::sleep_for(std::chrono::seconds(1)); } + m_logger->info("Connected to atomizer network."); m_atomizer_handler_thread = m_atomizer_network.start_handler([&](auto&& pkt) { diff --git a/src/uhs/atomizer/sentinel/controller.cpp b/src/uhs/atomizer/sentinel/controller.cpp index 9dc21cf26..250925e71 100644 --- a/src/uhs/atomizer/sentinel/controller.cpp +++ b/src/uhs/atomizer/sentinel/controller.cpp @@ -39,16 +39,20 @@ namespace cbdc::sentinel { shard.second, "..."); auto sock = std::make_unique(); - if(!sock->connect(shard)) { - m_logger->error("failed to connect"); - return false; + while(!sock->connect(shard)) { + m_logger->warn("Failed to connect to", + shard.first, + ":", + shard.second, + ". Retrying..."); + std::this_thread::sleep_for(std::chrono::seconds(1)); } const auto peer_id = m_shard_network.add(std::move(sock)); const auto& shard_range = m_opts.m_shard_ranges[i]; m_shard_data.push_back(shard_info{shard_range, peer_id}); - m_logger->info("done"); + m_logger->info("Done"); } m_shard_dist = decltype(m_shard_dist)(0, m_shard_data.size() - 1); diff --git a/src/uhs/atomizer/shard/controller.cpp b/src/uhs/atomizer/shard/controller.cpp index eb85c5bb0..b9c8942f6 100644 --- a/src/uhs/atomizer/shard/controller.cpp +++ b/src/uhs/atomizer/shard/controller.cpp @@ -49,22 +49,30 @@ namespace cbdc::shard { return false; } - if(!m_archiver_client.init()) { - m_logger->error("Failed to connect to archiver"); - return false; + static constexpr auto retry_delay = std::chrono::seconds(1); + while(!m_archiver_client.init()) { + m_logger->warn("Failed to connect to archiver. Retrying..."); + std::this_thread::sleep_for(retry_delay); } - - if(!m_watchtower_network.cluster_connect( - m_opts.m_watchtower_internal_endpoints)) { - m_logger->error("Failed to connect to watchtowers."); - return false; + m_logger->info("Connected to archiver."); + + m_watchtower_network.cluster_connect( + m_opts.m_watchtower_internal_endpoints, + false); + while(!m_watchtower_network.connected_to_one()) { + // TODO: should we limit the number of attempts? + m_logger->warn("Waiting to connect to watchtowers..."); + std::this_thread::sleep_for(retry_delay); } + m_logger->info("Connected to watchtower network."); m_atomizer_network.cluster_connect(m_opts.m_atomizer_endpoints, false); - if(!m_atomizer_network.connected_to_one()) { - m_logger->error("Failed to connect to any atomizers"); - return false; + while(!m_atomizer_network.connected_to_one()) { + // TODO: should we limit the number of attempts? + m_logger->warn("Waiting to connect to atomizers ..."); + std::this_thread::sleep_for(retry_delay); } + m_logger->info("Connected to atomizer network."); m_atomizer_client = m_atomizer_network.start_handler([&](auto&& pkt) { return atomizer_handler(std::forward(pkt)); diff --git a/src/uhs/atomizer/watchtower/controller.cpp b/src/uhs/atomizer/watchtower/controller.cpp index 7ffe9a2e7..e04381915 100644 --- a/src/uhs/atomizer/watchtower/controller.cpp +++ b/src/uhs/atomizer/watchtower/controller.cpp @@ -73,14 +73,15 @@ auto cbdc::watchtower::controller::init() -> bool { // Since atomizers require a watchtower and the archiver requires an // atomizer, this has to be allowed to fail. The network will reconnect // when an atomizer comes online. - m_logger->warn("Failed to connect to any atomizers, waiting..."); + m_logger->warn("Waiting to connect to atomizers..."); std::this_thread::sleep_for(retry_delay); } while(!m_archiver_client.init()) { - m_logger->warn("Failed to connect to archiver, retrying..."); + m_logger->warn("Failed to connect to archiver. Retrying..."); std::this_thread::sleep_for(retry_delay); } + m_logger->info("Connected to archiver."); m_atomizer_thread = m_atomizer_network.start_handler([&](auto&& pkt) { return atomizer_handler(std::forward(pkt));