From 4f8141b9a96dc80e45aa094f1b67dc0913b5758d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 9 Jan 2025 15:11:33 +0200 Subject: [PATCH] chore: dragonfly connection refactorings 1. Move socket read code into a dedicated function. Remove std:: prefix in the code. 2. Add an optional iouring bufring registration. Currently not being used and is disabled by default. --- src/facade/dragonfly_connection.cc | 67 +++++++++++++++++------------- src/facade/dragonfly_connection.h | 3 ++ src/facade/facade_types.h | 4 ++ src/server/dfly_main.cc | 35 +++++++++++++++- 4 files changed, 80 insertions(+), 29 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index da7d884fc9ca..a15166dd2029 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -87,6 +87,7 @@ ABSL_FLAG(bool, migrate_connections, true, "happen at most once per connection."); using namespace util; +using namespace std; using absl::GetFlag; using nonstd::make_unexpected; @@ -239,7 +240,7 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span resp, } // Write the data itself. - std::array blobs; + array blobs; unsigned index = 0; if (next != stack_buf) { blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)}; @@ -283,7 +284,7 @@ struct Connection::QueueBackpressure { // Used by publisher/subscriber actors to make sure we do not publish too many messages // into the queue. Thread-safe to allow safe access in EnsureBelowLimit. util::fb2::EventCount pubsub_ec; - std::atomic_size_t subscriber_bytes = 0; + atomic_size_t subscriber_bytes = 0; // Used by pipelining/execution fiber to throttle the incoming pipeline messages. // Used together with pipeline_buffer_limit to limit the pipeline usage per thread. @@ -504,7 +505,7 @@ void Connection::AsyncOperations::operator()(const InvalidationMessage& msg) { if (msg.invalidate_due_to_flush) { rbuilder->SendNull(); } else { - std::string_view keys[] = {msg.key}; + string_view keys[] = {msg.key}; rbuilder->SendBulkStrArr(keys); } } @@ -552,7 +553,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, // Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor). // We use it for reference counting and accessing `this` (without managing it). - self_ = {std::make_shared(), this}; + self_ = {make_shared(), this}; #ifdef DFLY_USE_SSL // Increment reference counter so Listener won't free the context while we're @@ -688,6 +689,7 @@ void Connection::HandleRequests() { LOG(INFO) << "Error handshaking " << aresult.error().message(); return; } + is_tls_ = 1; VLOG(1) << "TLS handshake succeeded"; } } @@ -756,7 +758,7 @@ void Connection::RegisterBreakHook(BreakerCb breaker_cb) { breaker_cb_ = breaker_cb; } -std::pair Connection::GetClientInfoBeforeAfterTid() const { +pair Connection::GetClientInfoBeforeAfterTid() const { if (!socket_) { LOG(DFATAL) << "unexpected null socket_ " << " phase " << unsigned(phase_) << ", is_http: " << unsigned(is_http_); @@ -854,18 +856,18 @@ bool Connection::IsMain() const { return static_cast(listener())->IsMainInterface(); } -void Connection::SetName(std::string name) { +void Connection::SetName(string name) { util::ThisFiber::SetName(absl::StrCat("DflyConnection_", name)); name_ = std::move(name); } -void Connection::SetLibName(std::string name) { +void Connection::SetLibName(string name) { UpdateLibNameVerMap(lib_name_, lib_ver_, -1); lib_name_ = std::move(name); UpdateLibNameVerMap(lib_name_, lib_ver_, +1); } -void Connection::SetLibVersion(std::string version) { +void Connection::SetLibVersion(string version) { UpdateLibNameVerMap(lib_name_, lib_ver_, -1); lib_ver_ = std::move(version); UpdateLibNameVerMap(lib_name_, lib_ver_, +1); @@ -1154,7 +1156,7 @@ auto Connection::ParseMemcache() -> ParserStatus { if (MemcacheParser::IsStoreCmd(cmd.type)) { total_len += cmd.bytes_len + 2; if (io_buf_.InputLen() >= total_len) { - std::string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2); + string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2); if (parsed_value[cmd.bytes_len] != '\r' && parsed_value[cmd.bytes_len + 1] != '\n') { builder->SendClientError("bad data chunk"); // We consume the whole buffer because we don't really know where it ends @@ -1241,6 +1243,29 @@ void Connection::HandleMigrateRequest() { } } +error_code Connection::HandleRecvSocket() { + io::MutableBytes append_buf = io_buf_.AppendBuffer(); + DCHECK(!append_buf.empty()); + + phase_ = READ_SOCKET; + error_code ec; + + ::io::Result recv_sz = socket_->Recv(append_buf); + last_interaction_ = time(nullptr); + + if (!recv_sz) { + return recv_sz.error(); + } + + size_t commit_sz = *recv_sz; + + io_buf_.CommitWrite(commit_sz); + stats_->io_read_bytes += commit_sz; + ++stats_->io_read_cnt; + + return ec; +} + auto Connection::IoLoop() -> variant { error_code ec; ParserStatus parse_status = OK; @@ -1250,25 +1275,11 @@ auto Connection::IoLoop() -> variant { do { HandleMigrateRequest(); - - io::MutableBytes append_buf = io_buf_.AppendBuffer(); - DCHECK(!append_buf.empty()); - - phase_ = READ_SOCKET; - - ::io::Result recv_sz = peer->Recv(append_buf); - last_interaction_ = time(nullptr); - - if (!recv_sz) { - ec = recv_sz.error(); - parse_status = OK; - break; + ec = HandleRecvSocket(); + if (ec) { + return ec; } - io_buf_.CommitWrite(*recv_sz); - stats_->io_read_bytes += *recv_sz; - ++stats_->io_read_cnt; - phase_ = PROCESS; bool is_iobuf_full = io_buf_.AppendLen() == 0; @@ -1411,8 +1422,8 @@ void Connection::ClearPipelinedMessages() { queue_backpressure_->pubsub_ec.notifyAll(); } -std::string Connection::DebugInfo() const { - std::string info = "{"; +string Connection::DebugInfo() const { + string info = "{"; absl::StrAppend(&info, "address=", uint64_t(this), ", "); absl::StrAppend(&info, "phase=", phase_, ", "); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3eced43ae1b4..f574e3eeda56 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -365,6 +365,8 @@ class Connection : public util::Connection { PipelineMessagePtr GetFromPipelinePool(); void HandleMigrateRequest(); + std::error_code HandleRecvSocket(); + bool ShouldEndAsyncFiber(const MessageHandle& msg); void LaunchAsyncFiberIfNeeded(); // Async fiber is started lazily @@ -449,6 +451,7 @@ class Connection : public util::Connection { bool migration_enabled_ : 1; bool migration_in_process_ : 1; bool is_http_ : 1; + bool is_tls_ : 1; }; }; }; diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 7f8b1b6e23b7..65864bf48a6b 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -184,6 +184,10 @@ extern __thread FacadeStats* tl_facade_stats; void ResetStats(); +// Constants for socket bufring. +constexpr uint16_t kRecvSockGid = 0; +constexpr size_t kRecvBufSize = 128; + } // namespace facade namespace std { diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index a41231d7f832..38acad0b7257 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -23,7 +23,7 @@ #endif #ifdef __linux__ -#include +#include "util/fibers/uring_proactor.h" #endif #include @@ -81,6 +81,9 @@ ABSL_FLAG(bool, version_check, true, "If true, Will monitor for new releases on Dragonfly servers once a day."); ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter."); +ABSL_FLAG(uint16_t, uring_recv_buffer_cnt, 0, + "How many socket recv buffers of size 256 to allocate per thread." + "Relevant only for modern kernels with io_uring enabled"); using namespace util; using namespace facade; @@ -603,6 +606,35 @@ void SetupAllocationTracker(ProactorPool* pool) { #endif } +void RegisterBufRings(ProactorPool* pool) { +#ifdef __linux__ + auto bufcnt = absl::GetFlag(FLAGS_uring_recv_buffer_cnt); + if (bufcnt == 0) { + return; + } + + if (dfly::kernel_version < 602 || pool->at(0)->GetKind() != ProactorBase::IOURING) { + LOG(WARNING) << "uring_recv_buffer_cnt is only supported on kernels >= 6.2 and with " + "io_uring proactor"; + return; + } + + // We need a power of 2 length. + bufcnt = absl::bit_ceil(bufcnt); + pool->AwaitBrief([&](unsigned, ProactorBase* pb) { + auto up = static_cast(pb); + int res = up->RegisterBufferRing(facade::kRecvSockGid, bufcnt, facade::kRecvBufSize); + if (res != 0) { + LOG(ERROR) << "Failed to register buf ring for proactor " + << util::detail::SafeErrorMessage(res); + exit(1); + } + }); + LOG(INFO) << "Registered a bufring with " << bufcnt << " buffers of size " << facade::kRecvBufSize + << " per thread "; +#endif +} + } // namespace } // namespace dfly @@ -791,6 +823,7 @@ Usage: dragonfly [FLAGS] pool->Run(); SetupAllocationTracker(pool.get()); + RegisterBufRings(pool.get()); AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true); acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));