Skip to content

Commit

Permalink
chore: fixes the parse error for xread/xreadgroup with unbalanced ids
Browse files Browse the repository at this point in the history
Partially addresses #4193

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 9, 2024
1 parent bf4f45a commit 38677a2
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
10 changes: 7 additions & 3 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad
}

/* Avoid overflow when trying to add an element to the stream (listpack
* can only host up to 32bit length sttrings, and also a total listpack size
* can only host up to 32bit length strings, and also a total listpack size
* can't be bigger than 32bit length. */
size_t totelelen = 0;
for (size_t i = 0; i < fields.size(); i++) {
Expand Down Expand Up @@ -2114,8 +2114,12 @@ std::optional<ReadOpts> ParseReadArgsOrReply(CmdArgList args, bool read_group,

size_t pair_count = args.size() - opts.streams_arg;
if ((pair_count % 2) != 0) {
const auto m = "Unbalanced list of streams: for each stream key an ID must be specified";
builder->SendError(m, kSyntaxErr);
const char* cmd_name = read_group ? "xreadgroup" : "xread";
const char* symbol = read_group ? ">" : "$";
const auto msg = absl::StrCat("Unbalanced '", cmd_name,
"' list of streams: for each stream key an ID or '", symbol,
"' must be specified");
builder->SendError(msg, kSyntaxErr);
return std::nullopt;
}
streams_count = pair_count / 2;
Expand Down
11 changes: 8 additions & 3 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ TEST_F(StreamFamilyTest, XReadInvalidArgs) {
EXPECT_THAT(resp, ErrArg("syntax error"));

// Unbalanced list of streams.
resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "s3", "0", "0"});
EXPECT_THAT(resp, ErrArg("syntax error"));
resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "0", "0"});
EXPECT_THAT(resp, ErrArg("value is not an integer"));

// Wrong type.
Run({"set", "foo", "v"});
Expand Down Expand Up @@ -442,7 +442,12 @@ TEST_F(StreamFamilyTest, XReadGroupInvalidArgs) {

// Unbalanced list of streams.
resp = Run({"xreadgroup", "group", "group", "alice", "streams", "s1", "s2", "s3", "0", "0"});
EXPECT_THAT(resp, ErrArg("syntax error"));
EXPECT_THAT(resp, ErrArg("Unbalanced 'xreadgroup' list of streams: for each stream key an ID or "
"'>' must be specified"));

resp = Run({"XREAD", "COUNT", "1", "STREAMS", "mystream"});
ASSERT_THAT(resp, ErrArg("Unbalanced 'xread' list of streams: for each stream key an ID or '$' "
"must be specified"));
}

TEST_F(StreamFamilyTest, XReadGroupEmpty) {
Expand Down
3 changes: 0 additions & 3 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1563,9 +1563,6 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
string_view arg = ArgS(args, i);
if (absl::EqualsIgnoreCase(arg, "STREAMS")) {
size_t left = args.size() - i - 1;
if (left < 2 || left % 2 != 0)
return OpStatus::SYNTAX_ERR;

return KeyIndex(i + 1, i + 1 + (left / 2));
}
}
Expand Down

0 comments on commit 38677a2

Please sign in to comment.