Skip to content

Commit

Permalink
fix: reply_builder should properly serialize bulk strings endings. (#…
Browse files Browse the repository at this point in the history
…4441)

Due to a corner-case bug, reply builder could add \0\0 to the end of bulk strings, instead of
\r\n. The bug slipped our tests because redis-py parser most probably does not validate the ending
as long as everything else is consistent.

This PR:
1. Adds a test that catches the bug
2. Adds a debug check that verifies the destination pointer is consistent with the iovec being used.
3. Fixes the bug.

Fixes #4424
  • Loading branch information
romange authored Jan 12, 2025
1 parent e39e682 commit 7a7b671
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
27 changes: 16 additions & 11 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ template <typename... Ts> void SinkReplyBuilder::WritePieces(Ts&&... pieces) {
if (size_t required = (piece_size(pieces) + ...); buffer_.AppendLen() <= required)
Flush(required);

auto iovec_end = [](const iovec& v) { return reinterpret_cast<char*>(v.iov_base) + v.iov_len; };

// Ensure last iovec points to buffer segment
char* dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data());
if (vecs_.empty() || ((char*)vecs_.back().iov_base) + vecs_.back().iov_len != dest)
NextVec({dest, 0});
if (vecs_.empty()) {
vecs_.push_back(iovec{dest, 0});
} else if (iovec_end(vecs_.back()) != dest) {
if (vecs_.size() >= IOV_MAX - 2)
Flush();
dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data());
vecs_.push_back(iovec{dest, 0});
}

dest = reinterpret_cast<char*>(buffer_.AppendBuffer().data());
DCHECK(iovec_end(vecs_.back()) == dest);
char* ptr = dest;
([&]() { ptr = write_piece(pieces, ptr); }(), ...);

Expand All @@ -124,7 +132,9 @@ template <typename... Ts> void SinkReplyBuilder::WritePieces(Ts&&... pieces) {
}

void SinkReplyBuilder::WriteRef(std::string_view str) {
NextVec(str);
if (vecs_.size() >= IOV_MAX - 2)
Flush();
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
total_size_ += str.size();
}

Expand Down Expand Up @@ -183,7 +193,7 @@ void SinkReplyBuilder::FinishScope() {
if (ref_bytes > buffer_.AppendLen())
return Flush(ref_bytes);

// Copy all extenral references to buffer to safely keep batching
// Copy all external references to buffer to safely keep batching
for (size_t i = guaranteed_pieces_; i < vecs_.size(); i++) {
auto ib = buffer_.InputBuffer();
if (vecs_[i].iov_base >= ib.data() && vecs_[i].iov_base <= ib.data() + ib.size())
Expand All @@ -198,12 +208,6 @@ void SinkReplyBuilder::FinishScope() {
guaranteed_pieces_ = vecs_.size(); // all vecs are pieces
}

void SinkReplyBuilder::NextVec(std::string_view str) {
if (vecs_.size() >= IOV_MAX - 2)
Flush();
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), all_(0) {
}

Expand Down Expand Up @@ -312,6 +316,7 @@ void RedisReplyBuilderBase::SendBulkString(std::string_view str) {
if (str.size() <= kMaxInlineSize)
return WritePieces(kLengthPrefix, uint32_t(str.size()), kCRLF, str, kCRLF);

DVLOG(1) << "SendBulk " << str.size();
WritePieces(kLengthPrefix, uint32_t(str.size()), kCRLF);
WriteRef(str);
WritePieces(kCRLF);
Expand Down
4 changes: 1 addition & 3 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ class SinkReplyBuilder {
void WritePieces(Ts&&... pieces); // Copy pieces into buffer and reference buffer
void WriteRef(std::string_view str); // Add iovec bypassing buffer

void FinishScope(); // Called when scope ends
void NextVec(std::string_view str);

void FinishScope(); // Called when scope ends to flush buffer if needed
void Send();

protected:
Expand Down
17 changes: 17 additions & 0 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,23 @@ TEST_F(RedisReplyBuilderTest, Issue3449) {
EXPECT_EQ(10000, parse_result.args.size());
}

TEST_F(RedisReplyBuilderTest, Issue4424) {
vector<string> records;
for (unsigned i = 0; i < 800; ++i) {
records.push_back(string(100, 'a'));
}

for (unsigned j = 0; j < 2; ++j) {
builder_->SendBulkStrArr(records);
ASSERT_TRUE(NoErrors());
ParsingResults parse_result = Parse();
ASSERT_FALSE(parse_result.IsError()) << int(parse_result.result);
ASSERT_TRUE(parse_result.Verify(SinkSize()));
EXPECT_EQ(800, parse_result.args.size());
sink_.Clear();
}
}

static void BM_FormatDouble(benchmark::State& state) {
vector<double> values;
char buf[64];
Expand Down

0 comments on commit 7a7b671

Please sign in to comment.