Skip to content

Commit

Permalink
Merge pull request #1775 from robem/trs-unblock
Browse files Browse the repository at this point in the history
TRS: Don't block forever on updates
  • Loading branch information
shruti authored Aug 5, 2021
2 parents 584a1dd + 82712b5 commit 641f03a
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SubUpdateBuffer {
}
}
cv_.notify_one();
};
}

// Add an update to the queue and notify waiting subscribers
void PushEventGroup(const SubEventGroupUpdate& update) {
Expand All @@ -92,7 +92,7 @@ class SubUpdateBuffer {
}
}
eg_cv_.notify_one();
};
}

// Return the oldest update (block if queue is empty)
void Pop(SubUpdate& out) {
Expand All @@ -107,7 +107,26 @@ class SubUpdateBuffer {
}

ConcordAssert(queue_.pop(out));
};
}

template <typename RepT, typename PeriodT>
bool TryPop(SubUpdate& out, const std::chrono::duration<RepT, PeriodT>& timeout) {
std::unique_lock<std::mutex> lock(mutex_);
// Boost's spsc queue is wait-free but we want to block here
cv_.wait_for(lock, timeout, [this] { return too_slow_ || queue_.read_available(); });

if (too_slow_) {
// We throw an exception because we cannot handle the clean-up ourselves
// and it doesn't make sense to continue pushing/popping updates.
throw ConsumerTooSlow();
}

if (queue_.read_available()) {
ConcordAssert(queue_.pop(out));
return true;
}
return false;
}

// Return the oldest update (event group if queue is empty)
void PopEventGroup(SubEventGroupUpdate& out) {
Expand All @@ -122,7 +141,26 @@ class SubUpdateBuffer {
}

ConcordAssert(eg_queue_.pop(out));
};
}

template <typename RepT, typename PeriodT>
bool TryPopEventGroup(SubEventGroupUpdate& out, const std::chrono::duration<RepT, PeriodT>& timeout) {
std::unique_lock<std::mutex> lock(eg_mutex_);
// Boost's spsc queue is wait-free but we want to block here
eg_cv_.wait_for(lock, timeout, [this] { return eg_too_slow_ || eg_queue_.read_available(); });

if (eg_too_slow_) {
// We throw an exception because we cannot handle the clean-up ourselves
// and it doesn't make sense to continue pushing/popping updates.
throw ConsumerTooSlow();
}

if (eg_queue_.read_available()) {
ConcordAssert(eg_queue_.pop(out));
return true;
}
return false;
}

void waitUntilNonEmpty() {
std::unique_lock<std::mutex> lock(mutex_);
Expand Down
108 changes: 56 additions & 52 deletions thin-replica-server/include/thin-replica-server/thin_replica_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ThinReplicaImpl {

using KvbAppFilterPtr = std::shared_ptr<kvbc::KvbAppFilter>;
static constexpr size_t kSubUpdateBufferSize{1000u};
const std::chrono::milliseconds kWaitForUpdateTimeout{100};
const std::string kCorrelationIdTag = "cid";

public:
Expand Down Expand Up @@ -265,17 +266,17 @@ class ThinReplicaImpl {
}
// Read, filter, and send live updates
SubUpdate update;
while (!context->IsCancelled()) {
metrics_.queue_size.Get().Set(live_updates->Size());
try {
live_updates->Pop(update);
} catch (concord::thin_replica::ConsumerTooSlow& error) {
LOG_WARN(logger_, "Closing subscription: " << error.what());
break;
}
auto kvb_update = kvbc::KvbUpdate{update.block_id, update.correlation_id, std::move(update.immutable_kv_pairs)};
const auto& filtered_update = kvb_filter->filterUpdate(kvb_update);
try {
try {
while (!context->IsCancelled()) {
metrics_.queue_size.Get().Set(live_updates->Size());
bool is_update_available = false;
is_update_available = live_updates->TryPop(update, kWaitForUpdateTimeout);
if (not is_update_available) {
continue;
}
auto kvb_update =
kvbc::KvbUpdate{update.block_id, update.correlation_id, std::move(update.immutable_kv_pairs)};
const auto& filtered_update = kvb_filter->filterUpdate(kvb_update);
if constexpr (std::is_same<DataT, com::vmware::concord::thin_replica::Data>()) {
LOG_DEBUG(logger_, "Live updates send data");
auto correlation_id = filtered_update.correlation_id;
Expand All @@ -294,20 +295,19 @@ class ThinReplicaImpl {
LOG_DEBUG(logger_, "Live updates send hash");
sendHash(stream, update.block_id, kvb_filter->hashUpdate(filtered_update));
}
} catch (std::exception& error) {
LOG_INFO(logger_, "Subscription stream closed: " << error.what());
break;
}
metrics_.last_sent_block_id.Get().Set(update.block_id);
if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) {
metrics_.updateAggregator();
update_aggregator_counter = 0;
metrics_.last_sent_block_id.Get().Set(update.block_id);
if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) {
metrics_.updateAggregator();
update_aggregator_counter = 0;
}
}
config_->subscriber_list.removeBuffer(live_updates);
live_updates->removeAllUpdates();
metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size());
metrics_.updateAggregator();
} catch (std::exception& error) {
LOG_INFO(logger_, "Subscription stream closed: " << error.what());
}
config_->subscriber_list.removeBuffer(live_updates);
live_updates->removeAllUpdates();
metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size());
metrics_.updateAggregator();
if (context->IsCancelled()) {
return grpc::Status::CANCELLED;
}
Expand Down Expand Up @@ -342,17 +342,16 @@ class ThinReplicaImpl {

// Read, filter, and send live updates
SubEventGroupUpdate sub_eg_update;
while (!context->IsCancelled()) {
metrics_.queue_size.Get().Set(live_updates->SizeEventGroupQueue());
try {
live_updates->PopEventGroup(sub_eg_update);
} catch (concord::thin_replica::ConsumerTooSlow& error) {
LOG_WARN(logger_, "Closing subscription: " << error.what());
break;
}
auto eg_update = kvbc::EgUpdate{sub_eg_update.event_group_id, std::move(sub_eg_update.event_group)};
const auto& filtered_eg_update = kvb_filter->filterEventGroupUpdate(eg_update);
try {
try {
while (not context->IsCancelled()) {
metrics_.queue_size.Get().Set(live_updates->SizeEventGroupQueue());
bool is_update_available = false;
is_update_available = live_updates->TryPopEventGroup(sub_eg_update, kWaitForUpdateTimeout);
if (not is_update_available) {
continue;
}
auto eg_update = kvbc::EgUpdate{sub_eg_update.event_group_id, std::move(sub_eg_update.event_group)};
const auto& filtered_eg_update = kvb_filter->filterEventGroupUpdate(eg_update);
if constexpr (std::is_same<DataT, com::vmware::concord::thin_replica::Data>()) {
// auto correlation_id = filtered_update.correlation_id; (TODO (Shruti) - Get correlation ID)
// TODO (Shruti) : Get and propagate span context
Expand All @@ -361,20 +360,19 @@ class ThinReplicaImpl {
sendEventGroupHash(
stream, sub_eg_update.event_group_id, kvb_filter->hashEventGroupUpdate(filtered_eg_update));
}
} catch (std::exception& error) {
LOG_INFO(logger_, "Subscription stream closed: " << error.what());
break;
}
metrics_.last_sent_event_group_id.Get().Set(sub_eg_update.event_group_id);
if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) {
metrics_.updateAggregator();
update_aggregator_counter = 0;
metrics_.last_sent_event_group_id.Get().Set(sub_eg_update.event_group_id);
if (++update_aggregator_counter == config_->update_metrics_aggregator_thresh) {
metrics_.updateAggregator();
update_aggregator_counter = 0;
}
}
config_->subscriber_list.removeBuffer(live_updates);
live_updates->removeAllEventGroupUpdates();
metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size());
metrics_.updateAggregator();
} catch (std::exception& error) {
LOG_INFO(logger_, "Subscription stream closed: " << error.what());
}
config_->subscriber_list.removeBuffer(live_updates);
live_updates->removeAllEventGroupUpdates();
metrics_.subscriber_list_size.Get().Set(config_->subscriber_list.Size());
metrics_.updateAggregator();
if (context->IsCancelled()) {
LOG_WARN(logger_, "Subscription cancelled");
return grpc::Status::CANCELLED;
Expand Down Expand Up @@ -672,9 +670,12 @@ class ThinReplicaImpl {
readAndSend<ServerContextT, ServerWriterT, DataT>(logger_, context, stream, start, end, kvb_filter);

// Let's wait until we have at least one live update
live_updates->waitUntilNonEmpty();
if (context->IsCancelled()) {
throw StreamCancelled("StreamCancelled while waiting for the first live update");
bool is_update_available = false;
while (not is_update_available) {
is_update_available = live_updates->waitUntilNonEmpty(kWaitForUpdateTimeout);
if (context->IsCancelled()) {
throw StreamCancelled("StreamCancelled while waiting for the first live update");
}
}

// We are in sync already
Expand Down Expand Up @@ -740,9 +741,12 @@ class ThinReplicaImpl {
readAndSendEventGroups<ServerContextT, ServerWriterT, DataT>(logger_, context, stream, start, end, kvb_filter);

// Let's wait until we have at least one live update
live_updates->waitForEventGroupUntilNonEmpty();
if (context->IsCancelled()) {
throw StreamCancelled("StreamCancelled while waiting for the first live update");
bool is_update_available = false;
while (not is_update_available) {
is_update_available = live_updates->waitForEventGroupUntilNonEmpty(kWaitForUpdateTimeout);
if (context->IsCancelled()) {
throw StreamCancelled("StreamCancelled while waiting for the first live update");
}
}

// We are in sync already
Expand Down
51 changes: 51 additions & 0 deletions thin-replica-server/test/trs_sub_buffer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
#include "gtest/gtest.h"

#include <atomic>
#include <chrono>
#include <future>
#include <list>
#include "Logger.hpp"
#include "thin-replica-server/subscription_buffer.hpp"

using namespace std::chrono_literals;

namespace {

using concord::kvbc::categorization::ImmutableInput;
Expand Down Expand Up @@ -233,6 +236,54 @@ TEST(trs_sub_buffer_test, happy_path_w_two_consumers_eg) {
}
}

TEST(trs_sub_buffer_test, waiting_for_updates) {
auto updates = std::make_shared<SubUpdateBuffer>(10);
auto updates_eg = std::make_shared<SubUpdateBuffer>(10);

SubUpdate out;
SubEventGroupUpdate out_eg;

SubBufferList sub_list;
sub_list.addBuffer(updates);
sub_list.addBuffer(updates_eg);

ASSERT_FALSE(updates->waitUntilNonEmpty(10ms));
ASSERT_FALSE(updates_eg->waitForEventGroupUntilNonEmpty(10ms));
ASSERT_FALSE(updates->TryPop(out, 10ms));
ASSERT_FALSE(updates_eg->TryPopEventGroup(out_eg, 10ms));

std::atomic_bool reader_started;
auto reader = std::async(std::launch::async, [&] {
reader_started = true;

updates->waitUntilNonEmpty();
ASSERT_FALSE(updates->Empty());
ASSERT_TRUE(updates->waitUntilNonEmpty(10ms));

updates_eg->waitForEventGroupUntilNonEmpty();
ASSERT_FALSE(updates_eg->Empty());
ASSERT_TRUE(updates_eg->waitForEventGroupUntilNonEmpty(10ms));

updates->Pop(out);
ASSERT_TRUE(updates->TryPop(out, 10ms));
updates_eg->PopEventGroup(out_eg);
ASSERT_TRUE(updates_eg->TryPopEventGroup(out_eg, 10ms));
});

// Let's make sure that the reader thread starts first
while (!reader_started)
;

SubUpdate update{};
SubEventGroupUpdate update_eg{};

// The reader is calling pop twice for each update type
sub_list.updateSubBuffers(update);
sub_list.updateSubBuffers(update);
sub_list.updateEventGroupSubBuffers(update_eg);
sub_list.updateEventGroupSubBuffers(update_eg);
}

} // namespace

int main(int argc, char** argv) {
Expand Down

0 comments on commit 641f03a

Please sign in to comment.