diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 94d3532dfc..611633f370 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -94,7 +94,7 @@ void moduleCallClusterReceivers(const char *sender_id, const char *clusterGetMessageTypeString(int type); void removeChannelsInSlot(unsigned int slot); unsigned int countChannelsInSlot(unsigned int hashslot); -unsigned int delKeysInSlot(unsigned int hashslot); +unsigned int delKeysInSlot(unsigned int hashslot, int lazy); void clusterAddNodeToShard(const char *shard_id, clusterNode *node); list *clusterLookupNodeListByShardId(const char *shard_id); void clusterRemoveNodeFromShard(clusterNode *node); @@ -123,6 +123,7 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now); +void clusterCommandFlushslot(client *c); /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ @@ -2764,7 +2765,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc for (int j = 0; j < dirty_slots_count; j++) { serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j], myself->name, myself->human_nodename, myself->shard_id); - delKeysInSlot(dirty_slots[j]); + delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del); } } } @@ -5750,7 +5751,7 @@ int verifyClusterConfigWithData(void) { server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id); } - delKeysInSlot(j); + delKeysInSlot(j, server.lazyfree_lazy_server_del); } } if (update_config) clusterSaveConfigOrDie(1); @@ -6337,13 +6338,14 @@ void removeChannelsInSlot(unsigned int slot) { /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ -unsigned int delKeysInSlot(unsigned int hashslot) { +unsigned int delKeysInSlot(unsigned int hashslot, int lazy) { if (!countKeysInSlot(hashslot)) return 0; /* We may lose a slot during the pause. We need to track this * state so that we don't assert in propagateNow(). */ server.server_del_keys_in_slot = 1; unsigned int j = 0; + int before_execution_nesting = server.execution_nesting; kvstoreHashtableIterator *kvs_di = NULL; void *next; @@ -6353,8 +6355,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) { enterExecutionUnit(1, 0); sds sdskey = objectGetKey(valkey); robj *key = createStringObject(sdskey, sdslen(sdskey)); + if (lazy) { + dbAsyncDelete(&server.db[0], key); + } else { + dbSyncDelete(&server.db[0], key); + } dbDelete(&server.db[0], key); - propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); + propagateDeletion(&server.db[0], key, lazy); signalModifiedKey(NULL, &server.db[0], key); /* The keys are not actually logically deleted from the database, just moved to another node. * The modules needs to know that these keys are no longer available locally, so just send the @@ -6369,7 +6376,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) { kvstoreReleaseHashtableIterator(kvs_di); server.server_del_keys_in_slot = 0; - serverAssert(server.execution_nesting == 0); + serverAssert(server.execution_nesting == before_execution_nesting); return j; } @@ -7115,6 +7122,9 @@ int clusterCommandSpecial(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) { /* CLUSTER LINKS */ addReplyClusterLinksDescription(c); + } else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) { + /* CLUSTER FLUSHSLOT [ASYNC|SYNC] */ + clusterCommandFlushslot(c); } else { return 0; } @@ -7314,3 +7324,21 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) { return C_OK; } + +void clusterCommandFlushslot(client *c) { + int slot; + int lazy = server.lazyfree_lazy_user_flush; + if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return; + if (c->argc == 4) { + if (!strcasecmp(c->argv[3]->ptr, "async")) { + lazy = 1; + } else if (!strcasecmp(c->argv[3]->ptr, "sync")) { + lazy = 0; + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } + } + delKeysInSlot(slot, lazy); + addReply(c, shared.ok); +} diff --git a/src/commands.def b/src/commands.def index cd919a80e1..c383871c5d 100644 --- a/src/commands.def +++ b/src/commands.def @@ -518,6 +518,35 @@ struct COMMAND_ARG CLUSTER_FAILOVER_Args[] = { {MAKE_ARG("options",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FAILOVER_options_Subargs}, }; +/********** CLUSTER FLUSHSLOT ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLUSTER FLUSHSLOT history */ +#define CLUSTER_FLUSHSLOT_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLUSTER FLUSHSLOT tips */ +#define CLUSTER_FLUSHSLOT_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLUSTER FLUSHSLOT key specs */ +#define CLUSTER_FLUSHSLOT_Keyspecs NULL +#endif + +/* CLUSTER FLUSHSLOT flush_type argument table */ +struct COMMAND_ARG CLUSTER_FLUSHSLOT_flush_type_Subargs[] = { +{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)}, +}; + +/* CLUSTER FLUSHSLOT argument table */ +struct COMMAND_ARG CLUSTER_FLUSHSLOT_Args[] = { +{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FLUSHSLOT_flush_type_Subargs}, +}; + /********** CLUSTER FLUSHSLOTS ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1012,6 +1041,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args}, {MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args}, {MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args}, +{MAKE_CMD("flushslot","Remove all keys from the target slot.","O(N) where N is the number of keys in the target slot","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOT_History,0,CLUSTER_FLUSHSLOT_Tips,0,clusterCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,CLUSTER_FLUSHSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_FLUSHSLOT_Args}, {MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)}, {MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args}, {MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args}, diff --git a/src/commands/cluster-flushslot.json b/src/commands/cluster-flushslot.json new file mode 100644 index 0000000000..89d29417bf --- /dev/null +++ b/src/commands/cluster-flushslot.json @@ -0,0 +1,46 @@ +{ + "FLUSHSLOT": { + "summary": "Remove all keys from the target slot.", + "complexity": "O(N) where N is the number of keys in the target slot", + "group": "cluster", + "since": "8.1.0", + "arity": -3, + "container": "CLUSTER", + "function": "clusterCommand", + "command_flags": [ + "WRITE" + ], + "acl_categories": [ + "KEYSPACE", + "DANGEROUS" + ], + "reply_schema": { + "const": "OK" + }, + "arguments": [ + { + "name": "slot", + "type": "integer" + }, + { + "name": "flush-type", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "async", + "type": "pure-token", + "token": "ASYNC", + "since": "8.1.0" + }, + { + "name": "sync", + "type": "pure-token", + "token": "SYNC", + "since": "8.1.0" + } + ] + } + ] + } +} diff --git a/tests/unit/cluster/cluster-flush-slot.tcl b/tests/unit/cluster/cluster-flush-slot.tcl new file mode 100644 index 0000000000..f16578f2d1 --- /dev/null +++ b/tests/unit/cluster/cluster-flush-slot.tcl @@ -0,0 +1,34 @@ +start_cluster 2 2 {tags {external:skip cluster}} { + test "SYNC Flush slot command" { + set key_slot [R 0 CLUSTER KEYSLOT FC] + set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] + + # set key + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{FC}-$i" "value" + } + set after_keys_num [expr {$slot_keys_num + 1000}] + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num + + # flush slot key + R 0 CLUSTER FLUSHSLOT $key_slot SYNC + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 + } + + test "ASYNC Flush slot command" { + set key_slot [R 0 CLUSTER KEYSLOT FC] + set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] + + # set key + for {set i 0} {$i < 1000} {incr i} { + R 0 set "{FC}-$i" "value" + } + set after_keys_num [expr {$slot_keys_num + 1000}] + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num + + # flush slot key + R 0 CLUSTER FLUSHSLOT $key_slot ASYNC + assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0 + } +} +