Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster slots parsing tests #149

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 274 additions & 7 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,108 @@ const char *__asan_default_options(void) {

#include <stdbool.h>

/* Helper to create a valkeyReply that contains a bulkstring. */
valkeyReply *create_cluster_nodes_reply(const char *bulkstr) {
valkeyReply *reply;
valkeyReply *create_reply(const char *buf, size_t len);
char *resp_encode_array(char *p, sds *resp);

/* Helper to create a valkeyReply that contains a bulkstring. */
valkeyReply *create_cluster_nodes_reply(const char *str) {
/* Create a RESP Bulk String. */
char cmd[1024];
int len = sprintf(cmd, "$%zu\r\n%s\r\n", strlen(bulkstr), bulkstr);
char buf[1024];
int len = sprintf(buf, "$%zu\r\n%s\r\n", strlen(str), str);

return create_reply(buf, len);
}

/* Create a valkeyReply. */
/* Helper to create a cluster slots response.
* Parses the string using a rudimentary JSON like format which accepts:
* - arrays example: [elem1, elem2]
* - strings example: 'mystring'
* - integers example: 123
* - null example: null
* See resp_encode_array for details.
*/
valkeyReply *create_cluster_slots_reply(const char *str) {
sds resp = sdsempty();

char *s = strdup(str);
resp_encode_array(s, &resp);
free(s);

valkeyReply *reply = create_reply(resp, sdslen(resp));
sdsfree(resp);
return reply;
}

/* Create a valkeyReply from a RESP encoded buffer. */
valkeyReply *create_reply(const char *buf, size_t len) {
valkeyReply *reply;
valkeyReader *reader = valkeyReaderCreate();
valkeyReaderFeed(reader, cmd, len);
valkeyReaderFeed(reader, buf, len);
assert(valkeyReaderGetReply(reader, (void **)&reply) == VALKEY_OK);
valkeyReaderFree(reader);
return reply;
}

/* Primitive parser of a JSON subset which is encoded to RESP.
* The string `p` is parsed to a RESP encoded string and returned
* by the pre-allocated sds string `resp`.
* The function returns the next character to parse in p.
* The parser accepts:
* - arrays: [elem1, elem2]
* - strings: 'mystring'
* - integers: 123
* - null values: null
*/
char *resp_encode_array(char *p, sds *resp) {
int elements = 0;
sds s = sdsempty();
while (*p != '\0') {
if (*p == '\'') {
/* Parse and encode a string. */
char *str = ++p; // Skip first ' and find next '
while (*p != '\'' && *p != '\0')
++p;
assert(*p != '\0'); /* Premature end of indata */
*p = '\0';
s = sdscatfmt(s, "$%i\r\n%s\r\n", strlen(str), str);
++p; /* Skip last ' */
elements += 1;
} else if (*p >= '0' && *p <= '9') {
/* Parse and encode an integer. */
char *start = p;
while (*p >= '0' && *p <= '9')
++p;
int integer = vk_atoi(start, (p - start));
s = sdscatfmt(s, ":%i\r\n", integer);
elements += 1;
} else if (*p == '[') {
/* Parse and encode an array in current array. */
p = resp_encode_array(++p, &s);
elements += 1;
} else if (*p == ']') {
/* Finalize the current array. */
*resp = sdscatfmt(*resp, "*%i\r\n%s", elements, s);
sdsfree(s);
return ++p;
} else if (*p == 'n') {
/* Parse and encode a null bulk string as in RESP2 */
if ((strlen(p) >= 4) && memcmp(p, "null", 4) == 0) {
s = sdscat(s, "$-1\r\n");
elements += 1;
p += 4;
} else {
++p; // ignore
}
} else {
++p; // ignore
}
}
assert(elements == 1); /* Only accept a single top array */
*resp = sdscat(*resp, s);
sdsfree(s);
return p;
}

/* Parse a cluster nodes reply from a basic deployment. */
void test_parse_cluster_nodes(bool parse_replicas) {
valkeyClusterContext *cc = valkeyClusterContextInit();
Expand Down Expand Up @@ -426,6 +512,181 @@ void test_parse_cluster_nodes_with_legacy_format(void) {
valkeyClusterFree(cc);
}

/* Parse a cluster slots reply from a basic deployment. */
void test_parse_cluster_slots(bool parse_replicas) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;

if (parse_replicas)
cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE;

valkeyReply *reply = create_cluster_slots_reply(
"[[0, 5460, ['127.0.0.1', 30001, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca', ['hostname', 'localhost']],"
" ['127.0.0.1', 30004, '07c37dfeb235213a872192d90877d0cd55635b91', ['hostname', 'localhost']]],"
" [5461, 10922, ['127.0.0.1', 30002, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1', ['hostname', 'localhost']],"
Comment on lines +525 to +528
Copy link
Collaborator

@zuiderkwast zuiderkwast Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I guess it's pretty easy to convert this pseudo-JSON to RESP, althought it adds some non-trivial code to the testing. My first though is that single-quoted stings are not allowed in JSON. :)

My second thought is that it might be possible to construct a parse tree structure that is readable and easy to serialize to RESP, without parsing. Ideally, test code should be trivial, so there is no doubt if there's a bug, is it a bug in the test code or in the real implementation?

Here is a reply literal with the first slot range (untested). It's much bigger than the pseudo-JSON obviously.

valkeyReply reply = {.type = VALKEY_REPLY_ARRAY, .elements = 3, .element = (valkeyReply[]){
    {.type = VALKEY_REPLY_ARRAY, .elements = 4, .element = (valkeyReply[]){
        {.type = VALKEY_REPLY_INTEGER, .integer = 0},
        {.type = VALKEY_REPLY_INTEGER, .integer = 5460},
        {.type = VALKEY_REPLY_ARRAY, .elements = 4, .element = (valkeyReply[]){
            {.type = VALKEY_REPLY_STRING, .str = "127.0.0.1"},
            {.type = VALKEY_REPLY_INTEGER, .integer = 30005},
            {.type = VALKEY_REPLY_STRING, .str = "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca"},
            {.type = VALKEY_REPLY_ARRAY, .elements = 2, .element = (valkeyReply[]){
                {.type = VALKEY_REPLY_STRING, .str = "hostname"},
                {.type = VALKEY_REPLY_STRING, .str = "localhost"} 
            }
        }
    },
    ...
};

" ['127.0.0.1', 30005, '6ec23923021cf3ffec47632106199cb7f496ce01', ['hostname', 'localhost']]],"
" [10923, 16383, ['127.0.0.1', 30003, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f', ['hostname', 'localhost']]"
" ['127.0.0.1', 30006, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3', ['hostname', 'localhost']]]]");

dict *nodes = parse_cluster_slots(cc, reply);
freeReplyObject(reply);

assert(nodes);
assert(dictSize(nodes) == 3); /* 3 primaries */
dictInitIterator(&di, nodes);
/* Verify node 1 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:30001") == 0);
assert(strcmp(node->host, "127.0.0.1") == 0);
assert(node->port == 30001);
assert(node->role == VALKEY_ROLE_PRIMARY);
assert(listLength(node->slots) == 1); /* 1 slot range */
slot = listNodeValue(listFirst(node->slots));
assert(slot->start == 0);
assert(slot->end == 5460);
if (parse_replicas) {
assert(listLength(node->replicas) == 1);
node = listNodeValue(listFirst(node->replicas));
assert(strcmp(node->addr, "127.0.0.1:30004") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
} else {
assert(node->replicas == NULL);
}
/* Verify node 2 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:30002") == 0);
assert(strcmp(node->host, "127.0.0.1") == 0);
assert(node->port == 30002);
assert(node->role == VALKEY_ROLE_PRIMARY);
assert(listLength(node->slots) == 1); /* 1 slot range */
slot = listNodeValue(listFirst(node->slots));
assert(slot->start == 5461);
assert(slot->end == 10922);
if (parse_replicas) {
assert(listLength(node->replicas) == 1);
node = listNodeValue(listFirst(node->replicas));
assert(strcmp(node->addr, "127.0.0.1:30005") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
} else {
assert(node->replicas == NULL);
}
/* Verify node 3 */
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:30003") == 0);
assert(strcmp(node->host, "127.0.0.1") == 0);
assert(node->port == 30003);
assert(node->role == VALKEY_ROLE_PRIMARY);
assert(listLength(node->slots) == 1); /* 1 slot range */
slot = listNodeValue(listFirst(node->slots));
assert(slot->start == 10923);
assert(slot->end == 16383);
if (parse_replicas) {
assert(listLength(node->replicas) == 1);
node = listNodeValue(listFirst(node->replicas));
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
} else {
assert(node->replicas == NULL);
}

dictRelease(nodes);
valkeyClusterFree(cc);
}

void test_parse_cluster_slots_with_empty_ip(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();

valkeyReply *reply = create_cluster_slots_reply(
"[[0, 5460, ['', 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']],"
" [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']],"
" [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]");

dict *nodes = parse_cluster_slots(cc, reply);
freeReplyObject(reply);

assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);

valkeyClusterFree(cc);
}

void test_parse_cluster_slots_with_null_ip(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();

valkeyReply *reply = create_cluster_slots_reply(
"[[0, 5460, [null, 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']],"
" [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']],"
" [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]");

dict *nodes = parse_cluster_slots(cc, reply);
freeReplyObject(reply);

assert(nodes == NULL);
assert(cc->err == VALKEY_ERR_OTHER);

valkeyClusterFree(cc);
}

/* Parse a cluster slots reply containing a primary with multiple replicas. */
void test_parse_cluster_slots_with_multiple_replicas(void) {
valkeyClusterContext *cc = valkeyClusterContextInit();
valkeyClusterNode *node;
cluster_slot *slot;
dictIterator di;
listIter li;

cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE;

valkeyReply *reply = create_cluster_slots_reply(
"[[0, 16383, ['127.0.0.1', 30001, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca'],"
" ['127.0.0.1', 30004, '07c37dfeb235213a872192d90877d0cd55635b91'],"
" ['127.0.0.1', 30005, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1'],"
" ['127.0.0.1', 30006, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f'],"
" ['127.0.0.1', 30002, '6ec23923021cf3ffec47632106199cb7f496ce01'],"
" ['127.0.0.1', 30003, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3']]]");

dict *nodes = parse_cluster_slots(cc, reply);
freeReplyObject(reply);

/* Verify master. */
assert(nodes);
assert(dictSize(nodes) == 1); /* 1 master */
dictInitIterator(&di, nodes);
node = dictGetEntryVal(dictNext(&di));
assert(strcmp(node->addr, "127.0.0.1:30001") == 0);
assert(strcmp(node->host, "127.0.0.1") == 0);
assert(node->port == 30001);
assert(node->role == VALKEY_ROLE_PRIMARY);
assert(listLength(node->slots) == 1); /* 1 slot range */
slot = listNodeValue(listFirst(node->slots));
assert(slot->start == 0);
assert(slot->end == 16383);

/* Verify replicas. */
assert(listLength(node->replicas) == 5);
listRewind(node->replicas, &li);
node = listNodeValue(listNext(&li));
assert(strcmp(node->addr, "127.0.0.1:30004") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
node = listNodeValue(listNext(&li));
assert(strcmp(node->addr, "127.0.0.1:30005") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
node = listNodeValue(listNext(&li));
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
node = listNodeValue(listNext(&li));
assert(strcmp(node->addr, "127.0.0.1:30002") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);
node = listNodeValue(listNext(&li));
assert(strcmp(node->addr, "127.0.0.1:30003") == 0);
assert(node->role == VALKEY_ROLE_REPLICA);

dictRelease(nodes);
valkeyClusterFree(cc);
}

int main(void) {
test_parse_cluster_nodes(false /* replicas not parsed */);
test_parse_cluster_nodes(true /* replicas parsed */);
Expand All @@ -436,5 +697,11 @@ int main(void) {
test_parse_cluster_nodes_with_multiple_replicas();
test_parse_cluster_nodes_with_parse_error();
test_parse_cluster_nodes_with_legacy_format();

test_parse_cluster_slots(false /* replicas not parsed */);
test_parse_cluster_slots(true /* replicas parsed */);
test_parse_cluster_slots_with_empty_ip();
test_parse_cluster_slots_with_null_ip();
test_parse_cluster_slots_with_multiple_replicas();
return 0;
}
Loading