-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: dragonfly connection refactorings #4434
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,7 @@ ABSL_FLAG(bool, migrate_connections, true, | |
"happen at most once per connection."); | ||
|
||
using namespace util; | ||
using namespace std; | ||
using absl::GetFlag; | ||
using nonstd::make_unexpected; | ||
|
||
|
@@ -239,7 +240,7 @@ void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp, | |
} | ||
|
||
// Write the data itself. | ||
std::array<iovec, 16> blobs; | ||
array<iovec, 16> blobs; | ||
unsigned index = 0; | ||
if (next != stack_buf) { | ||
blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)}; | ||
|
@@ -283,7 +284,7 @@ struct Connection::QueueBackpressure { | |
// Used by publisher/subscriber actors to make sure we do not publish too many messages | ||
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit. | ||
util::fb2::EventCount pubsub_ec; | ||
std::atomic_size_t subscriber_bytes = 0; | ||
atomic_size_t subscriber_bytes = 0; | ||
|
||
// Used by pipelining/execution fiber to throttle the incoming pipeline messages. | ||
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread. | ||
|
@@ -504,7 +505,7 @@ void Connection::AsyncOperations::operator()(const InvalidationMessage& msg) { | |
if (msg.invalidate_due_to_flush) { | ||
rbuilder->SendNull(); | ||
} else { | ||
std::string_view keys[] = {msg.key}; | ||
string_view keys[] = {msg.key}; | ||
rbuilder->SendBulkStrArr(keys); | ||
} | ||
} | ||
|
@@ -552,7 +553,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, | |
|
||
// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor). | ||
// We use it for reference counting and accessing `this` (without managing it). | ||
self_ = {std::make_shared<std::monostate>(), this}; | ||
self_ = {make_shared<std::monostate>(), this}; | ||
|
||
#ifdef DFLY_USE_SSL | ||
// Increment reference counter so Listener won't free the context while we're | ||
|
@@ -688,6 +689,7 @@ void Connection::HandleRequests() { | |
LOG(INFO) << "Error handshaking " << aresult.error().message(); | ||
return; | ||
} | ||
is_tls_ = 1; | ||
VLOG(1) << "TLS handshake succeeded"; | ||
} | ||
} | ||
|
@@ -756,7 +758,7 @@ void Connection::RegisterBreakHook(BreakerCb breaker_cb) { | |
breaker_cb_ = breaker_cb; | ||
} | ||
|
||
std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() const { | ||
pair<string, string> Connection::GetClientInfoBeforeAfterTid() const { | ||
if (!socket_) { | ||
LOG(DFATAL) << "unexpected null socket_ " | ||
<< " phase " << unsigned(phase_) << ", is_http: " << unsigned(is_http_); | ||
|
@@ -854,18 +856,18 @@ bool Connection::IsMain() const { | |
return static_cast<Listener*>(listener())->IsMainInterface(); | ||
} | ||
|
||
void Connection::SetName(std::string name) { | ||
void Connection::SetName(string name) { | ||
util::ThisFiber::SetName(absl::StrCat("DflyConnection_", name)); | ||
name_ = std::move(name); | ||
} | ||
|
||
void Connection::SetLibName(std::string name) { | ||
void Connection::SetLibName(string name) { | ||
UpdateLibNameVerMap(lib_name_, lib_ver_, -1); | ||
lib_name_ = std::move(name); | ||
UpdateLibNameVerMap(lib_name_, lib_ver_, +1); | ||
} | ||
|
||
void Connection::SetLibVersion(std::string version) { | ||
void Connection::SetLibVersion(string version) { | ||
UpdateLibNameVerMap(lib_name_, lib_ver_, -1); | ||
lib_ver_ = std::move(version); | ||
UpdateLibNameVerMap(lib_name_, lib_ver_, +1); | ||
|
@@ -1154,7 +1156,7 @@ auto Connection::ParseMemcache() -> ParserStatus { | |
if (MemcacheParser::IsStoreCmd(cmd.type)) { | ||
total_len += cmd.bytes_len + 2; | ||
if (io_buf_.InputLen() >= total_len) { | ||
std::string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2); | ||
string_view parsed_value = str.substr(consumed, cmd.bytes_len + 2); | ||
if (parsed_value[cmd.bytes_len] != '\r' && parsed_value[cmd.bytes_len + 1] != '\n') { | ||
builder->SendClientError("bad data chunk"); | ||
// We consume the whole buffer because we don't really know where it ends | ||
|
@@ -1241,6 +1243,29 @@ void Connection::HandleMigrateRequest() { | |
} | ||
} | ||
|
||
error_code Connection::HandleRecvSocket() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the only change in this file. the code is moved to HandleRecvSocket There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
io::MutableBytes append_buf = io_buf_.AppendBuffer(); | ||
DCHECK(!append_buf.empty()); | ||
|
||
phase_ = READ_SOCKET; | ||
error_code ec; | ||
|
||
::io::Result<size_t> recv_sz = socket_->Recv(append_buf); | ||
last_interaction_ = time(nullptr); | ||
|
||
if (!recv_sz) { | ||
return recv_sz.error(); | ||
} | ||
|
||
size_t commit_sz = *recv_sz; | ||
|
||
io_buf_.CommitWrite(commit_sz); | ||
stats_->io_read_bytes += commit_sz; | ||
++stats_->io_read_cnt; | ||
|
||
return ec; | ||
} | ||
|
||
auto Connection::IoLoop() -> variant<error_code, ParserStatus> { | ||
error_code ec; | ||
ParserStatus parse_status = OK; | ||
|
@@ -1250,25 +1275,11 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> { | |
|
||
do { | ||
HandleMigrateRequest(); | ||
|
||
io::MutableBytes append_buf = io_buf_.AppendBuffer(); | ||
DCHECK(!append_buf.empty()); | ||
|
||
phase_ = READ_SOCKET; | ||
|
||
::io::Result<size_t> recv_sz = peer->Recv(append_buf); | ||
last_interaction_ = time(nullptr); | ||
|
||
if (!recv_sz) { | ||
ec = recv_sz.error(); | ||
parse_status = OK; | ||
break; | ||
ec = HandleRecvSocket(); | ||
if (ec) { | ||
return ec; | ||
} | ||
|
||
io_buf_.CommitWrite(*recv_sz); | ||
stats_->io_read_bytes += *recv_sz; | ||
++stats_->io_read_cnt; | ||
|
||
phase_ = PROCESS; | ||
bool is_iobuf_full = io_buf_.AppendLen() == 0; | ||
|
||
|
@@ -1411,8 +1422,8 @@ void Connection::ClearPipelinedMessages() { | |
queue_backpressure_->pubsub_ec.notifyAll(); | ||
} | ||
|
||
std::string Connection::DebugInfo() const { | ||
std::string info = "{"; | ||
string Connection::DebugInfo() const { | ||
string info = "{"; | ||
|
||
absl::StrAppend(&info, "address=", uint64_t(this), ", "); | ||
absl::StrAppend(&info, "phase=", phase_, ", "); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am allergic to
unqualified lookup
unless it's used in generic code as a customization point 😄Ignore me 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do not know what it means but I do not think our code has lots of ambiguous names. as long as we enable it locally in cc file I think it's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long story short there are different taxonomy rules for looking up names qualified (e.g,
std::some_algo
etc) vs unqualified (e.g,some_algo
). This can lead to many surprises that are amplified in the presence of generic code (templates and their two phase lookup upon parsing and instantiation).A simple example without templates:
This is easy, the compiler looks
within the std
namespace and it findsstd::accumulate
.Now if you do the same
unqualified
The compiler will look in
namespaces
that areassociated
with the arguments passed in the function call. In our case it'sVector::Iterator
. Now here is a little bit of madness: this is NOT platform independend code*. It might work on some machines and it might fail to compile for others. Why?Because the
cpp standard does not mandate if the iterator is just a pointer or if it's wrapper struct
. If it's a typedef to anint
then guess what! The program won't compile. If the implementation is astruct Iterator
it will because the rules mandate that the namespaces for the compiler to search include the namespace of the class. However if it was an int, these namespaces are ignored so the compiler ends upsearching in a different way
.Also, because we did not
include algorithm
we might or might not pull the rightstd::accumulate
, or we might not even consider it (and which header includes what again is not defined by the cpp standard)This is just without the templates. You get templates and customization points and now you are dancing with
tag_dispatch
and the infamoustwo-step dance
(using declaration + followed by an unqualified lookup)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.s. I am not advocating this here, just left the comment to explain some of the reasons I don't like it.
p.s.2 We also do not use a lot of templates so we are kinda on the safer side 😄