Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make per-thread QueueBackpressure objects global #4482

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 78 additions & 70 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ using absl::GetFlag;
using nonstd::make_unexpected;

namespace facade {


namespace {

void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
Expand Down Expand Up @@ -269,11 +271,12 @@ thread_local uint32_t free_req_release_weight = 0;
const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN",
"PRECLOSE"};

} // namespace

// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct Connection::QueueBackpressure {
struct QueueBackpressure {
QueueBackpressure() {
}

// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

Expand All @@ -296,14 +299,23 @@ struct Connection::QueueBackpressure {
uint32_t pipeline_queue_max_len = 256; // cached flag for pipeline queue max length.
};

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;

void Connection::QueueBackpressure::EnsureBelowLimit() {
void QueueBackpressure::EnsureBelowLimit() {
pubsub_ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}

// Global array for each io thread to keep track of the total memory usage of the dispatch queues.
QueueBackpressure* thread_queue_backpressure = nullptr;

QueueBackpressure& GetQueueBackpressure() {
DCHECK(thread_queue_backpressure != nullptr);
return *thread_queue_backpressure;
}

} // namespace

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;

void Connection::PipelineMessage::SetArgs(const RespVec& args) {
auto* next = storage.data();
for (size_t i = 0; i < args.size(); ++i) {
Expand Down Expand Up @@ -523,6 +535,30 @@ void UpdateLibNameVerMap(const string& name, const string& ver, int delta) {
}
} // namespace

void Connection::Init(unsigned io_threads) {
CHECK(thread_queue_backpressure == nullptr);
thread_queue_backpressure = new QueueBackpressure[io_threads];

for (unsigned i = 0; i < io_threads; ++i) {
auto& qbp = thread_queue_backpressure[i];
qbp.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit);
qbp.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit);
qbp.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
qbp.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);

if (qbp.publish_buffer_limit == 0 || qbp.pipeline_cache_limit == 0 ||
qbp.pipeline_buffer_limit == 0 || qbp.pipeline_queue_max_len == 0) {
LOG(ERROR) << "pipeline flag limit is 0";
exit(-1);
}
}
}

void Connection::Shutdown() {
delete[] thread_queue_backpressure;
thread_queue_backpressure = nullptr;
}

Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize),
Expand Down Expand Up @@ -611,37 +647,14 @@ void Connection::OnPostMigrateThread() {
LaunchAsyncFiberIfNeeded();
}

// Update tl variables
queue_backpressure_ = &tl_queue_backpressure_;

stats_ = &tl_facade_stats->conn_stats;
++stats_->num_conns;
stats_->read_buf_capacity += io_buf_.Capacity();
}

void Connection::OnConnectionStart() {
DCHECK(queue_backpressure_ == nullptr);

ThisFiber::SetName("DflyConnection");

// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
tl_queue_backpressure_.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);

if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_queue_max_len == 0) {
LOG(ERROR) << "pipeline flag limit is 0";
exit(-1);
}
}

queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
}

Expand Down Expand Up @@ -1016,19 +1029,18 @@ void Connection::ConnectionFlow() {

void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size())) {
QueueBackpressure& qbp = GetQueueBackpressure();
if (optimize_for_async &&
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) {
stats_->pipeline_throttle_count++;
LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit: pipeline_bytes "
<< stats_->dispatch_queue_bytes << " queue_size " << dispatch_q_.size()
<< ", consider increasing pipeline_buffer_limit/pipeline_queue_limit";
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size());
qbp.pipeline_cnd.wait(noop, [this, &qbp] {
bool over_limits =
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size());
return !over_limits || (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
Expand Down Expand Up @@ -1418,8 +1430,9 @@ void Connection::ClearPipelinedMessages() {
}

dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->pubsub_ec.notifyAll();
QueueBackpressure& qbp = GetQueueBackpressure();
qbp.pipeline_cnd.notify_all();
qbp.pubsub_ec.notifyAll();
}

string Connection::DebugInfo() const {
Expand Down Expand Up @@ -1465,7 +1478,7 @@ void Connection::AsyncFiber() {

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;

QueueBackpressure& qbp = GetQueueBackpressure();
while (!reply_builder_->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
cnd_.wait(noop_lk, [this] {
Expand Down Expand Up @@ -1493,7 +1506,7 @@ void Connection::AsyncFiber() {
reply_builder_->SetBatchMode(dispatch_q_.size() > 1);

bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;
stats_->dispatch_queue_subscriber_bytes >= qbp.publish_buffer_limit;

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
Expand All @@ -1518,7 +1531,7 @@ void Connection::AsyncFiber() {
if (ShouldEndAsyncFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
qbp.pipeline_cnd.notify_all();
return; // don't set conn closing flag
}

Expand All @@ -1528,21 +1541,18 @@ void Connection::AsyncFiber() {
RecycleMessage(std::move(msg));
}

DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes,
dispatch_q_.size()) ||
if (!qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size()) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
qbp.pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}

if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->pubsub_ec.notify();
if (subscriber_over_limit && stats_->dispatch_queue_subscriber_bytes < qbp.publish_buffer_limit)
qbp.pubsub_ec.notify();
}

DCHECK(cc_->conn_closing || reply_builder_->GetError());
cc_->conn_closing = true;
queue_backpressure_->pipeline_cnd.notify_all();
qbp.pipeline_cnd.notify_all();
}

Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
Expand Down Expand Up @@ -1632,7 +1642,7 @@ Connection::WeakRef Connection::Borrow() {
// unsafe. All external mechanisms that borrow references should register subscriptions.
DCHECK_GT(cc_->subscriptions, 0);

return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_);
return WeakRef(self_, socket_->proactor()->GetPoolIndex(), id_);
}

void Connection::ShutdownThreadLocal() {
Expand Down Expand Up @@ -1709,7 +1719,8 @@ void Connection::SendAsync(MessageHandle msg) {

msg.dispatch_ts = ProactorBase::GetMonotonicTimeNs();
if (msg.IsPubMsg()) {
queue_backpressure_->subscriber_bytes.fetch_add(used_mem, memory_order_relaxed);
QueueBackpressure& qbp = GetQueueBackpressure();
qbp.subscriber_bytes.fetch_add(used_mem, memory_order_relaxed);
stats_->dispatch_queue_subscriber_bytes += used_mem;
}

Expand Down Expand Up @@ -1739,8 +1750,9 @@ void Connection::RecycleMessage(MessageHandle msg) {
stats_->dispatch_queue_bytes -= used_mem;
stats_->dispatch_queue_entries--;

QueueBackpressure& qbp = GetQueueBackpressure();
if (msg.IsPubMsg()) {
queue_backpressure_->subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed);
qbp.subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed);
stats_->dispatch_queue_subscriber_bytes -= used_mem;
}

Expand All @@ -1752,7 +1764,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
pending_pipeline_cmd_cnt_--;
if (stats_->pipeline_cmd_cache_bytes < queue_backpressure_->pipeline_cache_limit) {
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
pipeline_req_pool_.push_back(std::move(*pipe));
}
Expand Down Expand Up @@ -1850,14 +1862,14 @@ void Connection::BreakOnce(uint32_t ev_mask) {
}
}

void Connection::SetMaxQueueLenThreadLocal(uint32_t val) {
tl_queue_backpressure_.pipeline_queue_max_len = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) {
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
}

void Connection::SetPipelineBufferLimit(size_t val) {
tl_queue_backpressure_.pipeline_buffer_limit = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
void Connection::SetPipelineBufferLimit(unsigned tid, size_t val) {
thread_queue_backpressure[tid].pipeline_buffer_limit = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
}

void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) {
Expand All @@ -1874,18 +1886,17 @@ void Connection::TrackRequestSize(bool enable) {
}
}

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
DCHECK(backpressure);
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id,
uint32_t client_id)
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
}

unsigned Connection::WeakRef::Thread() const {
return thread_;
return thread_id_;
}

Connection* Connection::WeakRef::Get() const {
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_));
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_));
// The connection can only be deleted on this thread, so
// this pointer is valid until the next suspension.
// Note: keeping a shared_ptr doesn't prolong the lifetime because
Expand All @@ -1904,9 +1915,7 @@ uint32_t Connection::WeakRef::GetClientId() const {
bool Connection::WeakRef::EnsureMemoryBudget() const {
// Simple optimization: If a connection was closed, don't check memory budget.
if (!ptr_.expired()) {
// We don't rely on the connection ptr staying valid because we only access
// the threads backpressure
backpressure_->EnsureBelowLimit();
thread_queue_backpressure[thread_id_].EnsureBelowLimit();
return true;
}
return false;
Expand All @@ -1931,7 +1940,6 @@ void ResetStats() {
cstats.io_read_bytes = 0;

tl_facade_stats->reply_stats = {};

if (io_req_size_hist)
io_req_size_hist->Clear();
}
Expand Down
22 changes: 7 additions & 15 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class SinkReplyBuilder;
// For pipelined requests, monitor and pubsub messages it uses
// a separate dispatch queue that is processed on a separate fiber.
class Connection : public util::Connection {
struct QueueBackpressure;

public:
static void Init(unsigned io_threads);
static void Shutdown();

Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service);
~Connection();
Expand Down Expand Up @@ -204,12 +205,10 @@ class Connection : public util::Connection {
private:
friend class Connection;

WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure, unsigned thread,
uint32_t client_id);
WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id, uint32_t client_id);

std::weak_ptr<Connection> ptr_;
QueueBackpressure* backpressure_;
unsigned thread_;
unsigned thread_id_;
uint32_t client_id_;
};

Expand Down Expand Up @@ -306,8 +305,8 @@ class Connection : public util::Connection {
bool IsHttp() const;

// Sets max queue length locally in the calling thread.
static void SetMaxQueueLenThreadLocal(uint32_t val);
static void SetPipelineBufferLimit(size_t val);
static void SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val);
static void SetPipelineBufferLimit(unsigned tid, size_t val);
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
static void TrackRequestSize(bool enable);

Expand Down Expand Up @@ -427,19 +426,12 @@ class Connection : public util::Connection {
// Used to keep track of borrowed references. Does not really own itself
std::shared_ptr<Connection> self_;

// Pointer to corresponding queue backpressure struct.
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_ = nullptr;

util::fb2::ProactorBase* migration_request_ = nullptr;

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;

// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

union {
uint16_t flags_;
struct {
Expand Down
Loading
Loading