Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: dragonfly connection refactorings #4434

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ ABSL_FLAG(bool, migrate_connections, true,
"happen at most once per connection.");

using namespace util;
using namespace std;
Copy link
Contributor

@kostasrim kostasrim Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am allergic to unqualified lookup unless it's used in generic code as a customization point 😄

Ignore me 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do not know what it means but I do not think our code has lots of ambiguous names. as long as we enable it locally in cc file I think it's fine.

Copy link
Contributor

@kostasrim kostasrim Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long story short there are different taxonomy rules for looking up names qualified (e.g, std::some_algo etc) vs unqualified (e.g, some_algo). This can lead to many surprises that are amplified in the presence of generic code (templates and their two phase lookup upon parsing and instantiation).

A simple example without templates:

// Omitting include<algorithm> on purpose
std::accumulate(c.begin(), c.end(), adder());

This is easy, the compiler looks within the std namespace and it finds std::accumulate.

Now if you do the same unqualified

accumulate(...)

The compiler will look in namespaces that are associated with the arguments passed in the function call. In our case it's Vector::Iterator. Now here is a little bit of madness: this is NOT platform independend code*. It might work on some machines and it might fail to compile for others. Why?

Because the cpp standard does not mandate if the iterator is just a pointer or if it's wrapper struct. If it's a typedef to an int then guess what! The program won't compile. If the implementation is a struct Iterator it will because the rules mandate that the namespaces for the compiler to search include the namespace of the class. However if it was an int, these namespaces are ignored so the compiler ends up searching in a different way.

Also, because we did not include algorithm we might or might not pull the right std::accumulate, or we might not even consider it (and which header includes what again is not defined by the cpp standard)

This is just without the templates. You get templates and customization points and now you are dancing with tag_dispatch and the infamous two-step dance (using declaration + followed by an unqualified lookup)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.s. I am not advocating this here, just left the comment to explain some of the reasons I don't like it.

p.s.2 We also do not use a lot of templates so we are kinda on the safer side 😄

using absl::GetFlag;
using nonstd::make_unexpected;

Expand Down Expand Up @@ -239,7 +240,7 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp,
}

// Write the data itself.
std::array<iovec, 16> blobs;
array<iovec, 16> blobs;
unsigned index = 0;
if (next != stack_buf) {
blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)};
Expand Down Expand Up @@ -283,7 +284,7 @@ struct Connection::QueueBackpressure {
// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount pubsub_ec;
std::atomic_size_t subscriber_bytes = 0;
atomic_size_t subscriber_bytes = 0;

// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
Expand Down Expand Up @@ -504,7 +505,7 @@ void Connection::AsyncOperations::operator()(const InvalidationMessage& msg) {
if (msg.invalidate_due_to_flush) {
rbuilder->SendNull();
} else {
std::string_view keys[] = {msg.key};
string_view keys[] = {msg.key};
rbuilder->SendBulkStrArr(keys);
}
}
Expand Down Expand Up @@ -552,7 +553,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,

// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
// We use it for reference counting and accessing `this` (without managing it).
self_ = {std::make_shared<std::monostate>(), this};
self_ = {make_shared<std::monostate>(), this};

#ifdef DFLY_USE_SSL
// Increment reference counter so Listener won't free the context while we're
Expand Down Expand Up @@ -688,6 +689,7 @@ void Connection::HandleRequests() {
LOG(INFO) << "Error handshaking " << aresult.error().message();
return;
}
is_tls_ = 1;
VLOG(1) << "TLS handshake succeeded";
}
}
Expand Down Expand Up @@ -756,7 +758,7 @@ void Connection::RegisterBreakHook(BreakerCb breaker_cb) {
breaker_cb_ = breaker_cb;
}

std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() const {
pair<string, string> Connection::GetClientInfoBeforeAfterTid() const {
if (!socket_) {
LOG(DFATAL) << "unexpected null socket_ "
<< " phase " << unsigned(phase_) << ", is_http: " << unsigned(is_http_);
Expand Down Expand Up @@ -854,18 +856,18 @@ bool Connection::IsMain() const {
return static_cast<Listener*>(listener())->IsMainInterface();
}

void Connection::SetName(std::string name) {
void Connection::SetName(string name) {
util::ThisFiber::SetName(absl::StrCat("DflyConnection_", name));
name_ = std::move(name);
}

void Connection::SetLibName(std::string name) {
void Connection::SetLibName(string name) {
UpdateLibNameVerMap(lib_name_, lib_ver_, -1);
lib_name_ = std::move(name);
UpdateLibNameVerMap(lib_name_, lib_ver_, +1);
}

void Connection::SetLibVersion(std::string version) {
void Connection::SetLibVersion(string version) {
UpdateLibNameVerMap(lib_name_, lib_ver_, -1);
lib_ver_ = std::move(version);
UpdateLibNameVerMap(lib_name_, lib_ver_, +1);
Expand Down Expand Up @@ -1154,7 +1156,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
if (MemcacheParser::IsStoreCmd(cmd.type)) {
total_len += cmd.bytes_len + 2;
if (io_buf_.InputLen() >= total_len) {
std::string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2);
string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2);
if (parsed_value[cmd.bytes_len] != '\r' && parsed_value[cmd.bytes_len + 1] != '\n') {
builder->SendClientError("bad data chunk");
// We consume the whole buffer because we don't really know where it ends
Expand Down Expand Up @@ -1241,6 +1243,29 @@ void Connection::HandleMigrateRequest() {
}
}

error_code Connection::HandleRecvSocket() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only change in this file. the code is moved to HandleRecvSocket

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the boilerplate in RegisterBufRings

io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());

phase_ = READ_SOCKET;
error_code ec;

::io::Result<size_t> recv_sz = socket_->Recv(append_buf);
last_interaction_ = time(nullptr);

if (!recv_sz) {
return recv_sz.error();
}

size_t commit_sz = *recv_sz;

io_buf_.CommitWrite(commit_sz);
stats_->io_read_bytes += commit_sz;
++stats_->io_read_cnt;

return ec;
}

auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
error_code ec;
ParserStatus parse_status = OK;
Expand All @@ -1250,25 +1275,11 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {

do {
HandleMigrateRequest();

io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());

phase_ = READ_SOCKET;

::io::Result<size_t> recv_sz = peer->Recv(append_buf);
last_interaction_ = time(nullptr);

if (!recv_sz) {
ec = recv_sz.error();
parse_status = OK;
break;
ec = HandleRecvSocket();
if (ec) {
return ec;
}

io_buf_.CommitWrite(*recv_sz);
stats_->io_read_bytes += *recv_sz;
++stats_->io_read_cnt;

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

Expand Down Expand Up @@ -1411,8 +1422,8 @@ void Connection::ClearPipelinedMessages() {
queue_backpressure_->pubsub_ec.notifyAll();
}

std::string Connection::DebugInfo() const {
std::string info = "{";
string Connection::DebugInfo() const {
string info = "{";

absl::StrAppend(&info, "address=", uint64_t(this), ", ");
absl::StrAppend(&info, "phase=", phase_, ", ");
Expand Down
3 changes: 3 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ class Connection : public util::Connection {
PipelineMessagePtr GetFromPipelinePool();

void HandleMigrateRequest();
std::error_code HandleRecvSocket();

bool ShouldEndAsyncFiber(const MessageHandle& msg);

void LaunchAsyncFiberIfNeeded(); // Async fiber is started lazily
Expand Down Expand Up @@ -449,6 +451,7 @@ class Connection : public util::Connection {
bool migration_enabled_ : 1;
bool migration_in_process_ : 1;
bool is_http_ : 1;
bool is_tls_ : 1;
};
};
};
Expand Down
4 changes: 4 additions & 0 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ extern __thread FacadeStats* tl_facade_stats;

void ResetStats();

// Constants for socket bufring.
constexpr uint16_t kRecvSockGid = 0;
constexpr size_t kRecvBufSize = 128;

} // namespace facade

namespace std {
Expand Down
35 changes: 34 additions & 1 deletion src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#endif

#ifdef __linux__
#include <liburing.h>
#include "util/fibers/uring_proactor.h"
#endif

#include <mimalloc.h>
Expand Down Expand Up @@ -81,6 +81,9 @@ ABSL_FLAG(bool, version_check, true,
"If true, Will monitor for new releases on Dragonfly servers once a day.");

ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter.");
ABSL_FLAG(uint16_t, uring_recv_buffer_cnt, 0,
"How many socket recv buffers of size 256 to allocate per thread."
"Relevant only for modern kernels with io_uring enabled");

using namespace util;
using namespace facade;
Expand Down Expand Up @@ -603,6 +606,35 @@ void SetupAllocationTracker(ProactorPool* pool) {
#endif
}

void RegisterBufRings(ProactorPool* pool) {
#ifdef __linux__
auto bufcnt = absl::GetFlag(FLAGS_uring_recv_buffer_cnt);
if (bufcnt == 0) {
return;
}

if (dfly::kernel_version < 602 || pool->at(0)->GetKind() != ProactorBase::IOURING) {
LOG(WARNING) << "uring_recv_buffer_cnt is only supported on kernels >= 6.2 and with "
"io_uring proactor";
return;
}

// We need a power of 2 length.
bufcnt = absl::bit_ceil(bufcnt);
pool->AwaitBrief([&](unsigned, ProactorBase* pb) {
auto up = static_cast<fb2::UringProactor*>(pb);
int res = up->RegisterBufferRing(facade::kRecvSockGid, bufcnt, facade::kRecvBufSize);
if (res != 0) {
LOG(ERROR) << "Failed to register buf ring for proactor "
<< util::detail::SafeErrorMessage(res);
exit(1);
}
});
LOG(INFO) << "Registered a bufring with " << bufcnt << " buffers of size " << facade::kRecvBufSize
<< " per thread ";
#endif
}

} // namespace
} // namespace dfly

Expand Down Expand Up @@ -791,6 +823,7 @@ Usage: dragonfly [FLAGS]
pool->Run();

SetupAllocationTracker(pool.get());
RegisterBufRings(pool.get());

AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true);
acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));
Expand Down
Loading