From 20f8321b34d6770de88357e7b080996508bd63f8 Mon Sep 17 00:00:00 2001 From: robem Date: Wed, 4 Aug 2021 16:47:45 +0000 Subject: [PATCH 1/4] TRS: Don't block on new updates The gRPC thread which is running this code might be blocked for a long time depending on the processed requests. If the TRC decides that it is waiting for too long then it will cancel the subscription and try again. However, the TRS won't notice this at first because it is blocking and thereby occupying resources unnecessarily. This change waits in a loop for a fixed amount of time. Not great, but bettern than unconditional blocking. --- .../thin-replica-server/thin_replica_impl.hpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp index 9f0f7cfcf9..27e5ad495d 100644 --- a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp +++ b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp @@ -89,6 +89,7 @@ class ThinReplicaImpl { using KvbAppFilterPtr = std::shared_ptr; static constexpr size_t kSubUpdateBufferSize{1000u}; + const std::chrono::milliseconds kWaitForUpdateTimeout{100}; const std::string kCorrelationIdTag = "cid"; public: @@ -672,9 +673,12 @@ class ThinReplicaImpl { readAndSend(logger_, context, stream, start, end, kvb_filter); // Let's wait until we have at least one live update - live_updates->waitUntilNonEmpty(); - if (context->IsCancelled()) { - throw StreamCancelled("StreamCancelled while waiting for the first live update"); + bool is_update_available = false; + while (not is_update_available) { + is_update_available = live_updates->waitUntilNonEmpty(kWaitForUpdateTimeout); + if (context->IsCancelled()) { + throw StreamCancelled("StreamCancelled while waiting for the first live update"); + } } // We are in sync already @@ -740,9 +744,12 @@ class ThinReplicaImpl { readAndSendEventGroups(logger_, context, stream, start, end, kvb_filter); // Let's wait until we have at least one live update - live_updates->waitForEventGroupUntilNonEmpty(); - if (context->IsCancelled()) { - throw StreamCancelled("StreamCancelled while waiting for the first live update"); + bool is_update_available = false; + while (not is_update_available) { + is_update_available = live_updates->waitForEventGroupUntilNonEmpty(kWaitForUpdateTimeout); + if (context->IsCancelled()) { + throw StreamCancelled("StreamCancelled while waiting for the first live update"); + } } // We are in sync already From ca4e60b2ceb2a7476a040acad9db985aa1ce7223 Mon Sep 17 00:00:00 2001 From: robem Date: Wed, 4 Aug 2021 17:05:32 +0000 Subject: [PATCH 2/4] TRS: Use TryPop() instead of blocking Similar to the previous commit, we want to check regularly whether the gRPC stream has been cancelled in order to free resources. This change introduces new versions of `Pop()` which only block for a limited amount of time. --- .../subscription_buffer.hpp | 46 +++++++++++++++++-- .../thin-replica-server/thin_replica_impl.hpp | 14 ++++-- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/thin-replica-server/include/thin-replica-server/subscription_buffer.hpp b/thin-replica-server/include/thin-replica-server/subscription_buffer.hpp index 4c7ffe47f1..f5b989b73d 100644 --- a/thin-replica-server/include/thin-replica-server/subscription_buffer.hpp +++ b/thin-replica-server/include/thin-replica-server/subscription_buffer.hpp @@ -74,7 +74,7 @@ class SubUpdateBuffer { } } cv_.notify_one(); - }; + } // Add an update to the queue and notify waiting subscribers void PushEventGroup(const SubEventGroupUpdate& update) { @@ -92,7 +92,7 @@ class SubUpdateBuffer { } } eg_cv_.notify_one(); - }; + } // Return the oldest update (block if queue is empty) void Pop(SubUpdate& out) { @@ -107,7 +107,26 @@ class SubUpdateBuffer { } ConcordAssert(queue_.pop(out)); - }; + } + + template + bool TryPop(SubUpdate& out, const std::chrono::duration& timeout) { + std::unique_lock lock(mutex_); + // Boost's spsc queue is wait-free but we want to block here + cv_.wait_for(lock, timeout, [this] { return too_slow_ || queue_.read_available(); }); + + if (too_slow_) { + // We throw an exception because we cannot handle the clean-up ourselves + // and it doesn't make sense to continue pushing/popping updates. + throw ConsumerTooSlow(); + } + + if (queue_.read_available()) { + ConcordAssert(queue_.pop(out)); + return true; + } + return false; + } // Return the oldest update (event group if queue is empty) void PopEventGroup(SubEventGroupUpdate& out) { @@ -122,7 +141,26 @@ class SubUpdateBuffer { } ConcordAssert(eg_queue_.pop(out)); - }; + } + + template + bool TryPopEventGroup(SubEventGroupUpdate& out, const std::chrono::duration& timeout) { + std::unique_lock lock(eg_mutex_); + // Boost's spsc queue is wait-free but we want to block here + eg_cv_.wait_for(lock, timeout, [this] { return eg_too_slow_ || eg_queue_.read_available(); }); + + if (eg_too_slow_) { + // We throw an exception because we cannot handle the clean-up ourselves + // and it doesn't make sense to continue pushing/popping updates. + throw ConsumerTooSlow(); + } + + if (eg_queue_.read_available()) { + ConcordAssert(eg_queue_.pop(out)); + return true; + } + return false; + } void waitUntilNonEmpty() { std::unique_lock lock(mutex_); diff --git a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp index 27e5ad495d..82e320e4b8 100644 --- a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp +++ b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp @@ -268,12 +268,16 @@ class ThinReplicaImpl { SubUpdate update; while (!context->IsCancelled()) { metrics_.queue_size.Get().Set(live_updates->Size()); + bool is_update_available = false; try { - live_updates->Pop(update); + is_update_available = live_updates->TryPop(update, kWaitForUpdateTimeout); } catch (concord::thin_replica::ConsumerTooSlow& error) { LOG_WARN(logger_, "Closing subscription: " << error.what()); break; } + if (not is_update_available) { + continue; + } auto kvb_update = kvbc::KvbUpdate{update.block_id, update.correlation_id, std::move(update.immutable_kv_pairs)}; const auto& filtered_update = kvb_filter->filterUpdate(kvb_update); try { @@ -343,14 +347,18 @@ class ThinReplicaImpl { // Read, filter, and send live updates SubEventGroupUpdate sub_eg_update; - while (!context->IsCancelled()) { + while (not context->IsCancelled()) { metrics_.queue_size.Get().Set(live_updates->SizeEventGroupQueue()); + bool is_update_available = false; try { - live_updates->PopEventGroup(sub_eg_update); + is_update_available = live_updates->TryPopEventGroup(sub_eg_update, kWaitForUpdateTimeout); } catch (concord::thin_replica::ConsumerTooSlow& error) { LOG_WARN(logger_, "Closing subscription: " << error.what()); break; } + if (not is_update_available) { + continue; + } auto eg_update = kvbc::EgUpdate{sub_eg_update.event_group_id, std::move(sub_eg_update.event_group)}; const auto& filtered_eg_update = kvb_filter->filterEventGroupUpdate(eg_update); try { From 20612089948f10c1a49b56d36bca38de867ac97b Mon Sep 17 00:00:00 2001 From: robem Date: Wed, 4 Aug 2021 17:46:25 +0000 Subject: [PATCH 3/4] TRS: Try-catch all GRPC swallows all thread exceptions and hence we need to make sure to catch them all for easier debugging. This change refactors the exception handling by pulling the two try-catch blocks out of the while loop because both result in stream termination. --- .../thin-replica-server/thin_replica_impl.hpp | 93 ++++++++----------- 1 file changed, 41 insertions(+), 52 deletions(-) diff --git a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp index 82e320e4b8..c5f13ef88a 100644 --- a/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp +++ b/thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp @@ -266,21 +266,17 @@ class ThinReplicaImpl { } // Read, filter, and send live updates SubUpdate update; - while (!context->IsCancelled()) { - metrics_.queue_size.Get().Set(live_updates->Size()); - bool is_update_available = false; - try { + try { + while (!context->IsCancelled()) { + metrics_.queue_size.Get().Set(live_updates->Size()); + bool is_update_available = false; is_update_available = live_updates->TryPop(update, kWaitForUpdateTimeout); - } catch (concord::thin_replica::ConsumerTooSlow& error) { - LOG_WARN(logger_, "Closing subscription: " << error.what()); - break; - } - if (not is_update_available) { - continue; - } - auto kvb_update = kvbc::KvbUpdate{update.block_id, update.correlation_id, std::move(update.immutable_kv_pairs)}; - const auto& filtered_update = kvb_filter->filterUpdate(kvb_update); - try { + if (not is_update_available) { + continue; + } + auto kvb_update = + kvbc::KvbUpdate{update.block_id, update.correlation_id, std::move(update.immutable_kv_pairs)}; + const auto& filtered_update = kvb_filter->filterUpdate(kvb_update); if constexpr (std::is_same()) { LOG_DEBUG(logger_, "Live updates send data"); auto correlation_id = filtered_update.correlation_id; @@ -299,20 +295,19 @@ class ThinReplicaImpl { LOG_DEBUG(logger_, "Live updates send hash"); sendHash(stream, update.block_id, kvb_filter->hashUpdate(filtered_update)); } - } catch (std::exception& error) { - LOG_INFO(logger_, "Subscription stream closed: " << error.what()); - break; - } - metrics_.last_sent_block_id.Get().Set(update.block_id); - if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) { - metrics_.updateAggregator(); - update_aggregator_counter = 0; + metrics_.last_sent_block_id.Get().Set(update.block_id); + if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) { + metrics_.updateAggregator(); + update_aggregator_counter = 0; + } } + config_->subscriber_list.removeBuffer(live_updates); + live_updates->removeAllUpdates(); + metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size()); + metrics_.updateAggregator(); + } catch (std::exception& error) { + LOG_INFO(logger_, "Subscription stream closed: " << error.what()); } - config_->subscriber_list.removeBuffer(live_updates); - live_updates->removeAllUpdates(); - metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size()); - metrics_.updateAggregator(); if (context->IsCancelled()) { return grpc::Status::CANCELLED; } @@ -347,21 +342,16 @@ class ThinReplicaImpl { // Read, filter, and send live updates SubEventGroupUpdate sub_eg_update; - while (not context->IsCancelled()) { - metrics_.queue_size.Get().Set(live_updates->SizeEventGroupQueue()); - bool is_update_available = false; - try { + try { + while (not context->IsCancelled()) { + metrics_.queue_size.Get().Set(live_updates->SizeEventGroupQueue()); + bool is_update_available = false; is_update_available = live_updates->TryPopEventGroup(sub_eg_update, kWaitForUpdateTimeout); - } catch (concord::thin_replica::ConsumerTooSlow& error) { - LOG_WARN(logger_, "Closing subscription: " << error.what()); - break; - } - if (not is_update_available) { - continue; - } - auto eg_update = kvbc::EgUpdate{sub_eg_update.event_group_id, std::move(sub_eg_update.event_group)}; - const auto& filtered_eg_update = kvb_filter->filterEventGroupUpdate(eg_update); - try { + if (not is_update_available) { + continue; + } + auto eg_update = kvbc::EgUpdate{sub_eg_update.event_group_id, std::move(sub_eg_update.event_group)}; + const auto& filtered_eg_update = kvb_filter->filterEventGroupUpdate(eg_update); if constexpr (std::is_same()) { // auto correlation_id = filtered_update.correlation_id; (TODO (Shruti) - Get correlation ID) // TODO (Shruti) : Get and propagate span context @@ -370,20 +360,19 @@ class ThinReplicaImpl { sendEventGroupHash( stream, sub_eg_update.event_group_id, kvb_filter->hashEventGroupUpdate(filtered_eg_update)); } - } catch (std::exception& error) { - LOG_INFO(logger_, "Subscription stream closed: " << error.what()); - break; - } - metrics_.last_sent_event_group_id.Get().Set(sub_eg_update.event_group_id); - if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) { - metrics_.updateAggregator(); - update_aggregator_counter = 0; + metrics_.last_sent_event_group_id.Get().Set(sub_eg_update.event_group_id); + if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) { + metrics_.updateAggregator(); + update_aggregator_counter = 0; + } } + config_->subscriber_list.removeBuffer(live_updates); + live_updates->removeAllEventGroupUpdates(); + metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size()); + metrics_.updateAggregator(); + } catch (std::exception& error) { + LOG_INFO(logger_, "Subscription stream closed: " << error.what()); } - config_->subscriber_list.removeBuffer(live_updates); - live_updates->removeAllEventGroupUpdates(); - metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size()); - metrics_.updateAggregator(); if (context->IsCancelled()) { LOG_WARN(logger_, "Subscription cancelled"); return grpc::Status::CANCELLED; From 82712b5416c50fa589c0adcdd8add033843d9bc0 Mon Sep 17 00:00:00 2001 From: robem Date: Wed, 4 Aug 2021 21:27:55 +0000 Subject: [PATCH 4/4] SubBuffer: Test wait for updates The test makes sure that we unblock with the timer as well as unblock when an update is present. We're adding it due to the new Try* API. --- .../test/trs_sub_buffer_test.cpp | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/thin-replica-server/test/trs_sub_buffer_test.cpp b/thin-replica-server/test/trs_sub_buffer_test.cpp index 4423507e0a..ed5a0deb05 100644 --- a/thin-replica-server/test/trs_sub_buffer_test.cpp +++ b/thin-replica-server/test/trs_sub_buffer_test.cpp @@ -14,11 +14,14 @@ #include "gtest/gtest.h" #include +#include #include #include #include "Logger.hpp" #include "thin-replica-server/subscription_buffer.hpp" +using namespace std::chrono_literals; + namespace { using concord::kvbc::categorization::ImmutableInput; @@ -233,6 +236,54 @@ TEST(trs_sub_buffer_test, happy_path_w_two_consumers_eg) { } } +TEST(trs_sub_buffer_test, waiting_for_updates) { + auto updates = std::make_shared(10); + auto updates_eg = std::make_shared(10); + + SubUpdate out; + SubEventGroupUpdate out_eg; + + SubBufferList sub_list; + sub_list.addBuffer(updates); + sub_list.addBuffer(updates_eg); + + ASSERT_FALSE(updates->waitUntilNonEmpty(10ms)); + ASSERT_FALSE(updates_eg->waitForEventGroupUntilNonEmpty(10ms)); + ASSERT_FALSE(updates->TryPop(out, 10ms)); + ASSERT_FALSE(updates_eg->TryPopEventGroup(out_eg, 10ms)); + + std::atomic_bool reader_started; + auto reader = std::async(std::launch::async, [&] { + reader_started = true; + + updates->waitUntilNonEmpty(); + ASSERT_FALSE(updates->Empty()); + ASSERT_TRUE(updates->waitUntilNonEmpty(10ms)); + + updates_eg->waitForEventGroupUntilNonEmpty(); + ASSERT_FALSE(updates_eg->Empty()); + ASSERT_TRUE(updates_eg->waitForEventGroupUntilNonEmpty(10ms)); + + updates->Pop(out); + ASSERT_TRUE(updates->TryPop(out, 10ms)); + updates_eg->PopEventGroup(out_eg); + ASSERT_TRUE(updates_eg->TryPopEventGroup(out_eg, 10ms)); + }); + + // Let's make sure that the reader thread starts first + while (!reader_started) + ; + + SubUpdate update{}; + SubEventGroupUpdate update_eg{}; + + // The reader is calling pop twice for each update type + sub_list.updateSubBuffers(update); + sub_list.updateSubBuffers(update); + sub_list.updateEventGroupSubBuffers(update_eg); + sub_list.updateEventGroupSubBuffers(update_eg); +} + } // namespace int main(int argc, char** argv) {