From 92ac03fc69ba0dccc8877156fbe732f7f94e5fbc Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 20 Jan 2025 11:28:23 +0200 Subject: [PATCH] chore: make per-thread QueueBackpressure objects global Before this PR, QueueBackpressure objects are in fact referenced from other threads in an awkward way via Connection::WeakRef::EnsureMemoryBudget(). This PR removes the complexities of accessing these objects from foreigh threads. --- src/facade/dragonfly_connection.cc | 148 +++++++++++++++-------------- src/facade/dragonfly_connection.h | 22 ++--- src/server/main_service.cc | 6 +- 3 files changed, 89 insertions(+), 87 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index a15166dd2029..cff6b7916b87 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -92,6 +92,8 @@ using absl::GetFlag; using nonstd::make_unexpected; namespace facade { + + namespace { void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) { @@ -269,11 +271,12 @@ thread_local uint32_t free_req_release_weight = 0; const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN", "PRECLOSE"}; -} // namespace - // Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages // in these queues. -struct Connection::QueueBackpressure { +struct QueueBackpressure { + QueueBackpressure() { + } + // Block until subscriber memory usage is below limit, can be called from any thread. void EnsureBelowLimit(); @@ -296,14 +299,23 @@ struct Connection::QueueBackpressure { uint32_t pipeline_queue_max_len = 256; // cached flag for pipeline queue max length. }; -thread_local vector Connection::pipeline_req_pool_; -thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_; - -void Connection::QueueBackpressure::EnsureBelowLimit() { +void QueueBackpressure::EnsureBelowLimit() { pubsub_ec.await( [this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; }); } +// Global array for each io thread to keep track of the total memory usage of the dispatch queues. +QueueBackpressure* thread_queue_backpressure = nullptr; + +QueueBackpressure& GetQueueBackpressure() { + DCHECK(thread_queue_backpressure != nullptr); + return *thread_queue_backpressure; +} + +} // namespace + +thread_local vector Connection::pipeline_req_pool_; + void Connection::PipelineMessage::SetArgs(const RespVec& args) { auto* next = storage.data(); for (size_t i = 0; i < args.size(); ++i) { @@ -523,6 +535,30 @@ void UpdateLibNameVerMap(const string& name, const string& ver, int delta) { } } // namespace +void Connection::Init(unsigned io_threads) { + CHECK(thread_queue_backpressure == nullptr); + thread_queue_backpressure = new QueueBackpressure[io_threads]; + + for (unsigned i = 0; i < io_threads; ++i) { + auto& qbp = thread_queue_backpressure[i]; + qbp.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit); + qbp.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit); + qbp.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit); + qbp.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit); + + if (qbp.publish_buffer_limit == 0 || qbp.pipeline_cache_limit == 0 || + qbp.pipeline_buffer_limit == 0 || qbp.pipeline_queue_max_len == 0) { + LOG(ERROR) << "pipeline flag limit is 0"; + exit(-1); + } + } +} + +void Connection::Shutdown() { + delete[] thread_queue_backpressure; + thread_queue_backpressure = nullptr; +} + Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) : io_buf_(kMinReadSize), @@ -611,37 +647,14 @@ void Connection::OnPostMigrateThread() { LaunchAsyncFiberIfNeeded(); } - // Update tl variables - queue_backpressure_ = &tl_queue_backpressure_; - stats_ = &tl_facade_stats->conn_stats; ++stats_->num_conns; stats_->read_buf_capacity += io_buf_.Capacity(); } void Connection::OnConnectionStart() { - DCHECK(queue_backpressure_ == nullptr); - ThisFiber::SetName("DflyConnection"); - // We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object - // may be created in a differrent thread from where it runs. - if (tl_queue_backpressure_.publish_buffer_limit == 0) { - tl_queue_backpressure_.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit); - tl_queue_backpressure_.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit); - tl_queue_backpressure_.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit); - tl_queue_backpressure_.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit); - - if (tl_queue_backpressure_.publish_buffer_limit == 0 || - tl_queue_backpressure_.pipeline_cache_limit == 0 || - tl_queue_backpressure_.pipeline_buffer_limit == 0 || - tl_queue_backpressure_.pipeline_queue_max_len == 0) { - LOG(ERROR) << "pipeline flag limit is 0"; - exit(-1); - } - } - - queue_backpressure_ = &tl_queue_backpressure_; stats_ = &tl_facade_stats->conn_stats; } @@ -1016,19 +1029,18 @@ void Connection::ConnectionFlow() { void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, absl::FunctionRef cmd_msg_cb) { - DCHECK(queue_backpressure_ == &tl_queue_backpressure_); bool optimize_for_async = has_more; - - if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit( - stats_->dispatch_queue_bytes, dispatch_q_.size())) { + QueueBackpressure& qbp = GetQueueBackpressure(); + if (optimize_for_async && + qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) { stats_->pipeline_throttle_count++; LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit: pipeline_bytes " << stats_->dispatch_queue_bytes << " queue_size " << dispatch_q_.size() << ", consider increasing pipeline_buffer_limit/pipeline_queue_limit"; fb2::NoOpLock noop; - queue_backpressure_->pipeline_cnd.wait(noop, [this] { - bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit( - stats_->dispatch_queue_bytes, dispatch_q_.size()); + qbp.pipeline_cnd.wait(noop, [this, &qbp] { + bool over_limits = + qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size()); return !over_limits || (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; }); if (cc_->conn_closing) @@ -1418,8 +1430,9 @@ void Connection::ClearPipelinedMessages() { } dispatch_q_.clear(); - queue_backpressure_->pipeline_cnd.notify_all(); - queue_backpressure_->pubsub_ec.notifyAll(); + QueueBackpressure& qbp = GetQueueBackpressure(); + qbp.pipeline_cnd.notify_all(); + qbp.pubsub_ec.notifyAll(); } string Connection::DebugInfo() const { @@ -1465,7 +1478,7 @@ void Connection::AsyncFiber() { uint64_t prev_epoch = fb2::FiberSwitchEpoch(); fb2::NoOpLock noop_lk; - + QueueBackpressure& qbp = GetQueueBackpressure(); while (!reply_builder_->GetError()) { DCHECK_EQ(socket()->proactor(), ProactorBase::me()); cnd_.wait(noop_lk, [this] { @@ -1493,7 +1506,7 @@ void Connection::AsyncFiber() { reply_builder_->SetBatchMode(dispatch_q_.size() > 1); bool subscriber_over_limit = - stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit; + stats_->dispatch_queue_subscriber_bytes >= qbp.publish_buffer_limit; // Special case: if the dispatch queue accumulated a big number of commands, // we can try to squash them @@ -1518,7 +1531,7 @@ void Connection::AsyncFiber() { if (ShouldEndAsyncFiber(msg)) { RecycleMessage(std::move(msg)); CHECK(dispatch_q_.empty()) << DebugInfo(); - queue_backpressure_->pipeline_cnd.notify_all(); + qbp.pipeline_cnd.notify_all(); return; // don't set conn closing flag } @@ -1528,21 +1541,18 @@ void Connection::AsyncFiber() { RecycleMessage(std::move(msg)); } - DCHECK(queue_backpressure_ == &tl_queue_backpressure_); - if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, - dispatch_q_.size()) || + if (!qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size()) || dispatch_q_.empty()) { - queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it. + qbp.pipeline_cnd.notify_all(); // very cheap if noone is waiting on it. } - if (subscriber_over_limit && - stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit) - queue_backpressure_->pubsub_ec.notify(); + if (subscriber_over_limit && stats_->dispatch_queue_subscriber_bytes < qbp.publish_buffer_limit) + qbp.pubsub_ec.notify(); } DCHECK(cc_->conn_closing || reply_builder_->GetError()); cc_->conn_closing = true; - queue_backpressure_->pipeline_cnd.notify_all(); + qbp.pipeline_cnd.notify_all(); } Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) { @@ -1632,7 +1642,7 @@ Connection::WeakRef Connection::Borrow() { // unsafe. All external mechanisms that borrow references should register subscriptions. DCHECK_GT(cc_->subscriptions, 0); - return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_); + return WeakRef(self_, socket_->proactor()->GetPoolIndex(), id_); } void Connection::ShutdownThreadLocal() { @@ -1709,7 +1719,8 @@ void Connection::SendAsync(MessageHandle msg) { msg.dispatch_ts = ProactorBase::GetMonotonicTimeNs(); if (msg.IsPubMsg()) { - queue_backpressure_->subscriber_bytes.fetch_add(used_mem, memory_order_relaxed); + QueueBackpressure& qbp = GetQueueBackpressure(); + qbp.subscriber_bytes.fetch_add(used_mem, memory_order_relaxed); stats_->dispatch_queue_subscriber_bytes += used_mem; } @@ -1739,8 +1750,9 @@ void Connection::RecycleMessage(MessageHandle msg) { stats_->dispatch_queue_bytes -= used_mem; stats_->dispatch_queue_entries--; + QueueBackpressure& qbp = GetQueueBackpressure(); if (msg.IsPubMsg()) { - queue_backpressure_->subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed); + qbp.subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed); stats_->dispatch_queue_subscriber_bytes -= used_mem; } @@ -1752,7 +1764,7 @@ void Connection::RecycleMessage(MessageHandle msg) { // Retain pipeline message in pool. if (auto* pipe = get_if(&msg.handle); pipe) { pending_pipeline_cmd_cnt_--; - if (stats_->pipeline_cmd_cache_bytes < queue_backpressure_->pipeline_cache_limit) { + if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) { stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity(); pipeline_req_pool_.push_back(std::move(*pipe)); } @@ -1850,14 +1862,14 @@ void Connection::BreakOnce(uint32_t ev_mask) { } } -void Connection::SetMaxQueueLenThreadLocal(uint32_t val) { - tl_queue_backpressure_.pipeline_queue_max_len = val; - tl_queue_backpressure_.pipeline_cnd.notify_all(); +void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) { + thread_queue_backpressure[tid].pipeline_queue_max_len = val; + thread_queue_backpressure[tid].pipeline_cnd.notify_all(); } -void Connection::SetPipelineBufferLimit(size_t val) { - tl_queue_backpressure_.pipeline_buffer_limit = val; - tl_queue_backpressure_.pipeline_cnd.notify_all(); +void Connection::SetPipelineBufferLimit(unsigned tid, size_t val) { + thread_queue_backpressure[tid].pipeline_buffer_limit = val; + thread_queue_backpressure[tid].pipeline_cnd.notify_all(); } void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) { @@ -1874,18 +1886,17 @@ void Connection::TrackRequestSize(bool enable) { } } -Connection::WeakRef::WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, - unsigned thread, uint32_t client_id) - : ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} { - DCHECK(backpressure); +Connection::WeakRef::WeakRef(std::shared_ptr ptr, unsigned thread_id, + uint32_t client_id) + : ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} { } unsigned Connection::WeakRef::Thread() const { - return thread_; + return thread_id_; } Connection* Connection::WeakRef::Get() const { - DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_)); + DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_)); // The connection can only be deleted on this thread, so // this pointer is valid until the next suspension. // Note: keeping a shared_ptr doesn't prolong the lifetime because @@ -1904,9 +1915,7 @@ uint32_t Connection::WeakRef::GetClientId() const { bool Connection::WeakRef::EnsureMemoryBudget() const { // Simple optimization: If a connection was closed, don't check memory budget. if (!ptr_.expired()) { - // We don't rely on the connection ptr staying valid because we only access - // the threads backpressure - backpressure_->EnsureBelowLimit(); + thread_queue_backpressure[thread_id_].EnsureBelowLimit(); return true; } return false; @@ -1931,7 +1940,6 @@ void ResetStats() { cstats.io_read_bytes = 0; tl_facade_stats->reply_stats = {}; - if (io_req_size_hist) io_req_size_hist->Clear(); } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index f574e3eeda56..eb4f7cc52570 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -54,9 +54,10 @@ class SinkReplyBuilder; // For pipelined requests, monitor and pubsub messages it uses // a separate dispatch queue that is processed on a separate fiber. class Connection : public util::Connection { - struct QueueBackpressure; - public: + static void Init(unsigned io_threads); + static void Shutdown(); + Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service); ~Connection(); @@ -204,12 +205,10 @@ class Connection : public util::Connection { private: friend class Connection; - WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, unsigned thread, - uint32_t client_id); + WeakRef(std::shared_ptr ptr, unsigned thread_id, uint32_t client_id); std::weak_ptr ptr_; - QueueBackpressure* backpressure_; - unsigned thread_; + unsigned thread_id_; uint32_t client_id_; }; @@ -306,8 +305,8 @@ class Connection : public util::Connection { bool IsHttp() const; // Sets max queue length locally in the calling thread. - static void SetMaxQueueLenThreadLocal(uint32_t val); - static void SetPipelineBufferLimit(size_t val); + static void SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val); + static void SetPipelineBufferLimit(unsigned tid, size_t val); static void GetRequestSizeHistogramThreadLocal(std::string* hist); static void TrackRequestSize(bool enable); @@ -427,19 +426,12 @@ class Connection : public util::Connection { // Used to keep track of borrowed references. Does not really own itself std::shared_ptr self_; - // Pointer to corresponding queue backpressure struct. - // Needed for access from different threads by EnsureAsyncMemoryBudget(). - QueueBackpressure* queue_backpressure_ = nullptr; - util::fb2::ProactorBase* migration_request_ = nullptr; // Pooled pipeline messages per-thread // Aggregated while handling pipelines, gradually released while handling regular commands. static thread_local std::vector pipeline_req_pool_; - // Per-thread queue backpressure structs. - static thread_local QueueBackpressure tl_queue_backpressure_; - union { uint16_t flags_; struct { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 4f2ae3cc3a41..1b2c391b527d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -778,6 +778,7 @@ Service::~Service() { void Service::Init(util::AcceptServer* acceptor, std::vector listeners) { InitRedisTables(); + facade::Connection::Init(pp_.size()); config_registry.RegisterSetter( "maxmemory", [](const MemoryBytesFlag& flag) { max_memory_limit = flag.value; }); @@ -800,12 +801,12 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterSetter("pipeline_queue_limit", [](uint32_t val) { shard_set->pool()->AwaitBrief( - [val](unsigned, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(val); }); + [val](unsigned tid, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(tid, val); }); }); config_registry.RegisterSetter("pipeline_buffer_limit", [](size_t val) { shard_set->pool()->AwaitBrief( - [val](unsigned, auto*) { facade::Connection::SetPipelineBufferLimit(val); }); + [val](unsigned tid, auto*) { facade::Connection::SetPipelineBufferLimit(tid, val); }); }); config_registry.RegisterMutable("replica_partial_sync"); @@ -910,6 +911,7 @@ void Service::Shutdown() { // wait for all the pending callbacks to stop. ThisFiber::SleepFor(10ms); + facade::Connection::Shutdown(); } optional Service::CheckKeysOwnership(const CommandId* cid, CmdArgList args,