From 41a92c6ab489b7c070036fefe4bda557c51e9dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 17 Jan 2025 10:12:25 +0100 Subject: [PATCH] Add cluster slots parsing tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/ut_slotmap_update.c | 281 +++++++++++++++++++++++++++++++++++++- 1 file changed, 274 insertions(+), 7 deletions(-) diff --git a/tests/ut_slotmap_update.c b/tests/ut_slotmap_update.c index 7d19688..7127c7d 100644 --- a/tests/ut_slotmap_update.c +++ b/tests/ut_slotmap_update.c @@ -19,22 +19,108 @@ const char *__asan_default_options(void) { #include -/* Helper to create a valkeyReply that contains a bulkstring. */ -valkeyReply *create_cluster_nodes_reply(const char *bulkstr) { - valkeyReply *reply; +valkeyReply *create_reply(const char *buf, size_t len); +char *resp_encode_array(char *p, sds *resp); +/* Helper to create a valkeyReply that contains a bulkstring. */ +valkeyReply *create_cluster_nodes_reply(const char *str) { /* Create a RESP Bulk String. */ - char cmd[1024]; - int len = sprintf(cmd, "$%zu\r\n%s\r\n", strlen(bulkstr), bulkstr); + char buf[1024]; + int len = sprintf(buf, "$%zu\r\n%s\r\n", strlen(str), str); + + return create_reply(buf, len); +} - /* Create a valkeyReply. */ +/* Helper to create a cluster slots response. + * Parses the string using a rudimentary JSON like format which accepts: + * - arrays example: [elem1, elem2] + * - strings example: 'mystring' + * - integers example: 123 + * - null example: null + * See resp_encode_array for details. + */ +valkeyReply *create_cluster_slots_reply(const char *str) { + sds resp = sdsempty(); + + char *s = strdup(str); + resp_encode_array(s, &resp); + free(s); + + valkeyReply *reply = create_reply(resp, sdslen(resp)); + sdsfree(resp); + return reply; +} + +/* Create a valkeyReply from a RESP encoded buffer. */ +valkeyReply *create_reply(const char *buf, size_t len) { + valkeyReply *reply; valkeyReader *reader = valkeyReaderCreate(); - valkeyReaderFeed(reader, cmd, len); + valkeyReaderFeed(reader, buf, len); assert(valkeyReaderGetReply(reader, (void **)&reply) == VALKEY_OK); valkeyReaderFree(reader); return reply; } +/* Primitive parser of a JSON subset which is encoded to RESP. + * The string `p` is parsed to a RESP encoded string and returned + * by the pre-allocated sds string `resp`. + * The function returns the next character to parse in p. + * The parser accepts: + * - arrays: [elem1, elem2] + * - strings: 'mystring' + * - integers: 123 + * - null values: null + */ +char *resp_encode_array(char *p, sds *resp) { + int elements = 0; + sds s = sdsempty(); + while (*p != '\0') { + if (*p == '\'') { + /* Parse and encode a string. */ + char *str = ++p; // Skip first ' and find next ' + while (*p != '\'' && *p != '\0') + ++p; + assert(*p != '\0'); /* Premature end of indata */ + *p = '\0'; + s = sdscatfmt(s, "$%i\r\n%s\r\n", strlen(str), str); + ++p; /* Skip last ' */ + elements += 1; + } else if (*p >= '0' && *p <= '9') { + /* Parse and encode an integer. */ + char *start = p; + while (*p >= '0' && *p <= '9') + ++p; + int integer = vk_atoi(start, (p - start)); + s = sdscatfmt(s, ":%i\r\n", integer); + elements += 1; + } else if (*p == '[') { + /* Parse and encode an array in current array. */ + p = resp_encode_array(++p, &s); + elements += 1; + } else if (*p == ']') { + /* Finalize the current array. */ + *resp = sdscatfmt(*resp, "*%i\r\n%s", elements, s); + sdsfree(s); + return ++p; + } else if (*p == 'n') { + /* Parse and encode a null bulk string as in RESP2 */ + if ((strlen(p) >= 4) && memcmp(p, "null", 4) == 0) { + s = sdscat(s, "$-1\r\n"); + elements += 1; + p += 4; + } else { + ++p; // ignore + } + } else { + ++p; // ignore + } + } + assert(elements == 1); /* Only accept a single top array */ + *resp = sdscat(*resp, s); + sdsfree(s); + return p; +} + /* Parse a cluster nodes reply from a basic deployment. */ void test_parse_cluster_nodes(bool parse_replicas) { valkeyClusterContext *cc = valkeyClusterContextInit(); @@ -426,6 +512,181 @@ void test_parse_cluster_nodes_with_legacy_format(void) { valkeyClusterFree(cc); } +/* Parse a cluster slots reply from a basic deployment. */ +void test_parse_cluster_slots(bool parse_replicas) { + valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterNode *node; + cluster_slot *slot; + dictIterator di; + + if (parse_replicas) + cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; + + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 5460, ['127.0.0.1', 30001, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca', ['hostname', 'localhost']]," + " ['127.0.0.1', 30004, '07c37dfeb235213a872192d90877d0cd55635b91', ['hostname', 'localhost']]]," + " [5461, 10922, ['127.0.0.1', 30002, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1', ['hostname', 'localhost']]," + " ['127.0.0.1', 30005, '6ec23923021cf3ffec47632106199cb7f496ce01', ['hostname', 'localhost']]]," + " [10923, 16383, ['127.0.0.1', 30003, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f', ['hostname', 'localhost']]" + " ['127.0.0.1', 30006, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3', ['hostname', 'localhost']]]]"); + + dict *nodes = parse_cluster_slots(cc, reply); + freeReplyObject(reply); + + assert(nodes); + assert(dictSize(nodes) == 3); /* 3 primaries */ + dictInitIterator(&di, nodes); + /* Verify node 1 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:30001") == 0); + assert(strcmp(node->host, "127.0.0.1") == 0); + assert(node->port == 30001); + assert(node->role == VALKEY_ROLE_PRIMARY); + assert(listLength(node->slots) == 1); /* 1 slot range */ + slot = listNodeValue(listFirst(node->slots)); + assert(slot->start == 0); + assert(slot->end == 5460); + if (parse_replicas) { + assert(listLength(node->replicas) == 1); + node = listNodeValue(listFirst(node->replicas)); + assert(strcmp(node->addr, "127.0.0.1:30004") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + } else { + assert(node->replicas == NULL); + } + /* Verify node 2 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:30002") == 0); + assert(strcmp(node->host, "127.0.0.1") == 0); + assert(node->port == 30002); + assert(node->role == VALKEY_ROLE_PRIMARY); + assert(listLength(node->slots) == 1); /* 1 slot range */ + slot = listNodeValue(listFirst(node->slots)); + assert(slot->start == 5461); + assert(slot->end == 10922); + if (parse_replicas) { + assert(listLength(node->replicas) == 1); + node = listNodeValue(listFirst(node->replicas)); + assert(strcmp(node->addr, "127.0.0.1:30005") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + } else { + assert(node->replicas == NULL); + } + /* Verify node 3 */ + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:30003") == 0); + assert(strcmp(node->host, "127.0.0.1") == 0); + assert(node->port == 30003); + assert(node->role == VALKEY_ROLE_PRIMARY); + assert(listLength(node->slots) == 1); /* 1 slot range */ + slot = listNodeValue(listFirst(node->slots)); + assert(slot->start == 10923); + assert(slot->end == 16383); + if (parse_replicas) { + assert(listLength(node->replicas) == 1); + node = listNodeValue(listFirst(node->replicas)); + assert(strcmp(node->addr, "127.0.0.1:30006") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + } else { + assert(node->replicas == NULL); + } + + dictRelease(nodes); + valkeyClusterFree(cc); +} + +void test_parse_cluster_slots_with_empty_ip(void) { + valkeyClusterContext *cc = valkeyClusterContextInit(); + + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 5460, ['', 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']]," + " [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']]," + " [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); + + dict *nodes = parse_cluster_slots(cc, reply); + freeReplyObject(reply); + + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + valkeyClusterFree(cc); +} + +void test_parse_cluster_slots_with_null_ip(void) { + valkeyClusterContext *cc = valkeyClusterContextInit(); + + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 5460, [null, 6379, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']]," + " [5461, 10922, ['127.0.0.1', 6379, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']]," + " [10923, 16383, ['127.0.0.1', 6379, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']]]"); + + dict *nodes = parse_cluster_slots(cc, reply); + freeReplyObject(reply); + + assert(nodes == NULL); + assert(cc->err == VALKEY_ERR_OTHER); + + valkeyClusterFree(cc); +} + +/* Parse a cluster slots reply containing a primary with multiple replicas. */ +void test_parse_cluster_slots_with_multiple_replicas(void) { + valkeyClusterContext *cc = valkeyClusterContextInit(); + valkeyClusterNode *node; + cluster_slot *slot; + dictIterator di; + listIter li; + + cc->flags |= VALKEYCLUSTER_FLAG_ADD_SLAVE; + + valkeyReply *reply = create_cluster_slots_reply( + "[[0, 16383, ['127.0.0.1', 30001, 'e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca']," + " ['127.0.0.1', 30004, '07c37dfeb235213a872192d90877d0cd55635b91']," + " ['127.0.0.1', 30005, '67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1']," + " ['127.0.0.1', 30006, '292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f']," + " ['127.0.0.1', 30002, '6ec23923021cf3ffec47632106199cb7f496ce01']," + " ['127.0.0.1', 30003, '824fe116063bc5fcf9f4ffd895bc17aee7731ac3']]]"); + + dict *nodes = parse_cluster_slots(cc, reply); + freeReplyObject(reply); + + /* Verify master. */ + assert(nodes); + assert(dictSize(nodes) == 1); /* 1 master */ + dictInitIterator(&di, nodes); + node = dictGetEntryVal(dictNext(&di)); + assert(strcmp(node->addr, "127.0.0.1:30001") == 0); + assert(strcmp(node->host, "127.0.0.1") == 0); + assert(node->port == 30001); + assert(node->role == VALKEY_ROLE_PRIMARY); + assert(listLength(node->slots) == 1); /* 1 slot range */ + slot = listNodeValue(listFirst(node->slots)); + assert(slot->start == 0); + assert(slot->end == 16383); + + /* Verify replicas. */ + assert(listLength(node->replicas) == 5); + listRewind(node->replicas, &li); + node = listNodeValue(listNext(&li)); + assert(strcmp(node->addr, "127.0.0.1:30004") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + node = listNodeValue(listNext(&li)); + assert(strcmp(node->addr, "127.0.0.1:30005") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + node = listNodeValue(listNext(&li)); + assert(strcmp(node->addr, "127.0.0.1:30006") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + node = listNodeValue(listNext(&li)); + assert(strcmp(node->addr, "127.0.0.1:30002") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + node = listNodeValue(listNext(&li)); + assert(strcmp(node->addr, "127.0.0.1:30003") == 0); + assert(node->role == VALKEY_ROLE_REPLICA); + + dictRelease(nodes); + valkeyClusterFree(cc); +} + int main(void) { test_parse_cluster_nodes(false /* replicas not parsed */); test_parse_cluster_nodes(true /* replicas parsed */); @@ -436,5 +697,11 @@ int main(void) { test_parse_cluster_nodes_with_multiple_replicas(); test_parse_cluster_nodes_with_parse_error(); test_parse_cluster_nodes_with_legacy_format(); + + test_parse_cluster_slots(false /* replicas not parsed */); + test_parse_cluster_slots(true /* replicas parsed */); + test_parse_cluster_slots_with_empty_ip(); + test_parse_cluster_slots_with_null_ip(); + test_parse_cluster_slots_with_multiple_replicas(); return 0; }