Skip to content

Commit

Permalink
fix: provide resp3 option to CapturingReplyBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jan 3, 2025
1 parent 6e9409c commit 8611b2e
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString

CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
CapturingReplyBuilder(bool is_resp_3, ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
SetResp3(is_resp_3);
}

using Payload = std::variant<std::monostate, Null, Error, long, double, SimpleString, BulkString,
Expand Down
2 changes: 1 addition & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt};

absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
facade::CapturingReplyBuilder crb(false);
ConnectionContext local_cntx{cntx, stub_tx.get()};
absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/http_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
facade::ConnectionContext* context = (facade::ConnectionContext*)http_cntx->user_data();
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
facade::CapturingReplyBuilder reply_builder(false);

// TODO: to finish this.
service->DispatchCommand(absl::MakeSpan(cmd_slices), &reply_builder, context);
Expand Down
4 changes: 3 additions & 1 deletion src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
} // namespace

JournalExecutor::JournalExecutor(Service* service)
: service_{service}, reply_builder_{facade::ReplyMode::NONE}, conn_context_{nullptr, nullptr} {
: service_{service},
reply_builder_{false, facade::ReplyMode::NONE},
conn_context_{nullptr, nullptr} {
conn_context_.is_replicating = true;
conn_context_.journal_emulated = true;
conn_context_.skip_acl_validation = true;
Expand Down
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,7 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
DCHECK(eval_cid);
tx->MultiSwitchCmd(eval_cid);

CapturingReplyBuilder crb{ReplyMode::ONLY_ERR};
CapturingReplyBuilder crb{false, ReplyMode::ONLY_ERR};
MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, true, true);

info->async_cmds_heap_mem = 0;
Expand Down
17 changes: 10 additions & 7 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor
return true;
}

OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard* es) {
OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard* es,
bool is_resp3) {
auto& sinfo = sharded_[es->shard_id()];
DCHECK(!sinfo.cmds.empty());

auto* local_tx = sinfo.local_tx.get();
facade::CapturingReplyBuilder crb;
facade::CapturingReplyBuilder crb(is_resp3);
ConnectionContext local_cntx{cntx_, local_tx};
if (cntx_->conn()) {
local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged();
Expand Down Expand Up @@ -244,14 +245,15 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
cntx_->cid = base_cid_;
auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
tx->PrepareSquashedMultiHop(base_cid_, cb);
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
tx->ScheduleSingleHop(
[this, rb](auto* tx, auto* es) { return SquashedHopCb(tx, es, rb->IsResp3()); });
} else {
#if 1
fb2::BlockingCounter bc(num_shards);
DVLOG(1) << "Squashing " << num_shards << " " << tx->DebugId();

auto cb = [this, tx, bc]() mutable {
this->SquashedHopCb(tx, EngineShard::tlocal());
auto cb = [this, tx, bc, rb]() mutable {
this->SquashedHopCb(tx, EngineShard::tlocal(), rb->IsResp3());
bc->Dec();
};

Expand All @@ -261,8 +263,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
}
bc->Wait();
#else
shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
shard_set->RunBlockingInParallel(
[this, tx, rb](auto* es) { SquashedHopCb(tx, es, rb->IsResp3()); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/multi_command_squasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class MultiCommandSquasher {
bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd);

// Callback that runs on shards during squashed hop.
facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es);
facade::OpStatus SquashedHopCb(Transaction* parent_tx, EngineShard* es, bool is_resp3);

// Execute all currently squashed commands. Return false if aborting on error.
bool ExecuteSquashed(facade::RedisReplyBuilder* rb);
Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2745,7 +2745,7 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
facade::CapturingReplyBuilder crb{false};
ConnectionContext cntx{nullptr, nullptr};
cntx.is_replicating = true;
cntx.journal_emulated = true;
Expand Down
2 changes: 1 addition & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ error_code Replica::ConsumeRedisStream() {
conn_context.ns = &namespaces->GetDefaultNamespace();

// we never reply back on the commands.
facade::CapturingReplyBuilder null_builder{facade::ReplyMode::NONE};
facade::CapturingReplyBuilder null_builder{false, facade::ReplyMode::NONE};
ResetParser(true);

// Master waits for this command in order to start sending replication stream.
Expand Down

0 comments on commit 8611b2e

Please sign in to comment.