diff --git a/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp b/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp index 28c445e017..7d9b34cec9 100644 --- a/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp +++ b/client/thin-replica-client/include/client/thin-replica-client/grpc_connection.hpp @@ -23,6 +23,7 @@ #include "assertUtils.hpp" #include "thin_replica.grpc.pb.h" #include "replica_state_snapshot.grpc.pb.h" +#include "thread_pool.hpp" #include "Logger.hpp" using namespace std::chrono_literals; @@ -215,6 +216,11 @@ class GrpcConnection { std::unique_ptr trc_stub_; std::unique_ptr rss_stub_; + // A gRPC connection will either have an active hash-, data stream or no subscription at all. Reads on an active + // subscription stream have to be serialized and hence either we use multiple threads and a lock or start a single + // thread only. + concord::util::ThreadPool subscription_pool_{1}; + // TRS connection config std::unique_ptr config_; diff --git a/client/thin-replica-client/src/grpc_connection.cpp b/client/thin-replica-client/src/grpc_connection.cpp index 4451af155e..89ee4f0ab2 100644 --- a/client/thin-replica-client/src/grpc_connection.cpp +++ b/client/thin-replica-client/src/grpc_connection.cpp @@ -123,7 +123,7 @@ GrpcConnection::Result GrpcConnection::openDataStream(const SubscriptionRequest& data_context_.reset(new grpc::ClientContext()); data_context_->AddMetadata("client_id", client_id_); - auto stream = async(launch::async, [this, &request] { + auto stream = subscription_pool_.async([this, &request] { ReadLock read_lock(channel_mutex_); return trc_stub_->SubscribeToUpdates(data_context_.get(), request); }); @@ -169,7 +169,7 @@ GrpcConnection::Result GrpcConnection::readData(Data* data) { ConcordAssertNE(data_stream_, nullptr); ConcordAssertNE(data_context_, nullptr); - auto result = async(launch::async, [this, data] { return data_stream_->Read(data); }); + auto result = subscription_pool_.async([this, data] { return data_stream_->Read(data); }); auto status = result.wait_for(data_timeout_); if (status == future_status::timeout || status == future_status::deferred) { data_context_->TryCancel(); @@ -329,7 +329,7 @@ GrpcConnection::Result GrpcConnection::openHashStream(SubscriptionRequest& reque hash_context_.reset(new grpc::ClientContext()); hash_context_->AddMetadata("client_id", client_id_); - auto stream = async(launch::async, [this, &request] { + auto stream = subscription_pool_.async([this, &request] { ReadLock read_lock(channel_mutex_); return trc_stub_->SubscribeToUpdateHashes(hash_context_.get(), request); }); @@ -375,7 +375,7 @@ GrpcConnection::Result GrpcConnection::readHash(Hash* hash) { ConcordAssertNE(hash_stream_, nullptr); ConcordAssertNE(hash_context_, nullptr); - auto result = async(launch::async, [this, hash] { return hash_stream_->Read(hash); }); + auto result = subscription_pool_.async([this, hash] { return hash_stream_->Read(hash); }); auto status = result.wait_for(hash_timeout_); if (status == future_status::timeout || status == future_status::deferred) { hash_context_->TryCancel();