-
Notifications
You must be signed in to change notification settings - Fork 704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add cluster flushslot command. #1384
base: unstable
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,7 +94,7 @@ void moduleCallClusterReceivers(const char *sender_id, | |
const char *clusterGetMessageTypeString(int type); | ||
void removeChannelsInSlot(unsigned int slot); | ||
unsigned int countChannelsInSlot(unsigned int hashslot); | ||
unsigned int delKeysInSlot(unsigned int hashslot); | ||
unsigned int delKeysInSlot(unsigned int hashslot, int lazy); | ||
void clusterAddNodeToShard(const char *shard_id, clusterNode *node); | ||
list *clusterLookupNodeListByShardId(const char *shard_id); | ||
void clusterRemoveNodeFromShard(clusterNode *node); | ||
|
@@ -123,6 +123,7 @@ int verifyClusterNodeId(const char *name, int length); | |
sds clusterEncodeOpenSlotsAuxField(int rdbflags); | ||
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); | ||
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now); | ||
void clusterCommandFlushslot(client *c); | ||
|
||
/* Only primaries that own slots have voting rights. | ||
* Returns 1 if the node has voting rights, otherwise returns 0. */ | ||
|
@@ -2764,7 +2765,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc | |
for (int j = 0; j < dirty_slots_count; j++) { | ||
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], | ||
myself->name, myself->human_nodename, myself->shard_id); | ||
delKeysInSlot(dirty_slots[j]); | ||
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del); | ||
} | ||
} | ||
} | ||
|
@@ -5750,7 +5751,7 @@ int verifyClusterConfigWithData(void) { | |
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name, | ||
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id); | ||
} | ||
delKeysInSlot(j); | ||
delKeysInSlot(j, server.lazyfree_lazy_server_del); | ||
} | ||
} | ||
if (update_config) clusterSaveConfigOrDie(1); | ||
|
@@ -6337,13 +6338,14 @@ void removeChannelsInSlot(unsigned int slot) { | |
|
||
/* Remove all the keys in the specified hash slot. | ||
* The number of removed items is returned. */ | ||
unsigned int delKeysInSlot(unsigned int hashslot) { | ||
unsigned int delKeysInSlot(unsigned int hashslot, int lazy) { | ||
if (!countKeysInSlot(hashslot)) return 0; | ||
|
||
/* We may lose a slot during the pause. We need to track this | ||
* state so that we don't assert in propagateNow(). */ | ||
server.server_del_keys_in_slot = 1; | ||
unsigned int j = 0; | ||
int before_execution_nesting = server.execution_nesting; | ||
|
||
kvstoreHashtableIterator *kvs_di = NULL; | ||
void *next; | ||
|
@@ -6353,8 +6355,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) { | |
enterExecutionUnit(1, 0); | ||
sds sdskey = objectGetKey(valkey); | ||
robj *key = createStringObject(sdskey, sdslen(sdskey)); | ||
if (lazy) { | ||
dbAsyncDelete(&server.db[0], key); | ||
} else { | ||
dbSyncDelete(&server.db[0], key); | ||
} | ||
dbDelete(&server.db[0], key); | ||
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); | ||
propagateDeletion(&server.db[0], key, lazy); | ||
signalModifiedKey(NULL, &server.db[0], key); | ||
/* The keys are not actually logically deleted from the database, just moved to another node. | ||
* The modules needs to know that these keys are no longer available locally, so just send the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code comment about keyspace notifications:
This is not necessarily the case for CLUSTER FLUSHSLOT. I think we should treat CLUSTER FLUSHSLOT more like FLUSHDB. For FLUSHDB, what are the keyspace notifications sent? A lot of DEL events or one FLUSHDB event? Maybe we shouldn't use |
||
|
@@ -6369,7 +6376,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { | |
kvstoreReleaseHashtableIterator(kvs_di); | ||
|
||
server.server_del_keys_in_slot = 0; | ||
serverAssert(server.execution_nesting == 0); | ||
serverAssert(server.execution_nesting == before_execution_nesting); | ||
return j; | ||
} | ||
|
||
|
@@ -7115,6 +7122,9 @@ int clusterCommandSpecial(client *c) { | |
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) { | ||
/* CLUSTER LINKS */ | ||
addReplyClusterLinksDescription(c); | ||
} else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) { | ||
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */ | ||
clusterCommandFlushslot(c); | ||
} else { | ||
return 0; | ||
} | ||
|
@@ -7314,3 +7324,21 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) { | |
|
||
return C_OK; | ||
} | ||
|
||
void clusterCommandFlushslot(client *c) { | ||
int slot; | ||
int lazy = server.lazyfree_lazy_user_flush; | ||
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return; | ||
if (c->argc == 4) { | ||
if (!strcasecmp(c->argv[3]->ptr, "async")) { | ||
lazy = 1; | ||
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) { | ||
lazy = 0; | ||
} else { | ||
addReplyErrorObject(c, shared.syntaxerr); | ||
return; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should fail the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the call site of else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
clusterCommandFlushslot(c);
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit weird that the arity check is split into multiple places. But I don't mind too much because it's like that for many commands already. But I think this should be in cluster.c instead of cluster_legacy.c. Cluster_legacy is about the cluster bus, and we would replace this file if we replace the cluster bus for cluster V2. At least, that's the idea. This command doesn't use the cluster bus at all, so I think it should be in cluster.c and be called directly from |
||
delKeysInSlot(slot, lazy); | ||
addReply(c, shared.ok); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
{ | ||
"FLUSHSLOT": { | ||
"summary": "Remove all keys from the target slot.", | ||
"complexity": "O(N) where N is the number of keys in the target slot", | ||
"group": "cluster", | ||
"since": "8.1.0", | ||
"arity": -3, | ||
"container": "CLUSTER", | ||
"function": "clusterCommand", | ||
"command_flags": [ | ||
"WRITE" | ||
], | ||
"acl_categories": [ | ||
"KEYSPACE", | ||
"DANGEROUS" | ||
], | ||
"reply_schema": { | ||
"const": "OK" | ||
}, | ||
"arguments": [ | ||
{ | ||
"name": "slot", | ||
"type": "integer" | ||
}, | ||
{ | ||
"name": "flush-type", | ||
"type": "oneof", | ||
"optional": true, | ||
"arguments": [ | ||
{ | ||
"name": "async", | ||
"type": "pure-token", | ||
"token": "ASYNC", | ||
"since": "8.1.0" | ||
}, | ||
{ | ||
"name": "sync", | ||
"type": "pure-token", | ||
"token": "SYNC", | ||
"since": "8.1.0" | ||
} | ||
] | ||
} | ||
] | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
start_cluster 2 2 {tags {external:skip cluster}} { | ||
test "SYNC Flush slot command" { | ||
set key_slot [R 0 CLUSTER KEYSLOT FC] | ||
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] | ||
|
||
# set key | ||
for {set i 0} {$i < 1000} {incr i} { | ||
R 0 set "{FC}-$i" "value" | ||
} | ||
set after_keys_num [expr {$slot_keys_num + 1000}] | ||
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num | ||
|
||
# flush slot key | ||
R 0 CLUSTER FLUSHSLOT $key_slot SYNC | ||
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 | ||
} | ||
|
||
test "ASYNC Flush slot command" { | ||
set key_slot [R 0 CLUSTER KEYSLOT FC] | ||
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] | ||
|
||
# set key | ||
for {set i 0} {$i < 1000} {incr i} { | ||
R 0 set "{FC}-$i" "value" | ||
} | ||
set after_keys_num [expr {$slot_keys_num + 1000}] | ||
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num | ||
|
||
# flush slot key | ||
R 0 CLUSTER FLUSHSLOT $key_slot ASYNC | ||
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 | ||
wuranxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, the deleted keys are replicated just by multiple DEL commands. For comparison, FLUSHDB is replicated as a single FLUSHDB command, which is much less traffic than many DEL commands. If we want to replicate FLUSHSLOT as a single FLUSHSLOT command, then we should probably check that the replicas supports this though. We already have the replica's version stored in
c->repl_data->replica_version
(REPLCONF VERSION is sent by the replica since 8.0) so we could check that it's >= 8.1 for all connected replicas. It's also fine to skip this. We can do it later as a separate optimization.