Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cluster flushslot command. #1384

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void moduleCallClusterReceivers(const char *sender_id,
const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot, int lazy);
void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
list *clusterLookupNodeListByShardId(const char *shard_id);
void clusterRemoveNodeFromShard(clusterNode *node);
Expand Down Expand Up @@ -123,6 +123,7 @@ int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);
void clusterCommandFlushslot(client *c);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -2764,7 +2765,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
for (int j = 0; j < dirty_slots_count; j++) {
serverLog(LL_NOTICE, "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", dirty_slots[j],
myself->name, myself->human_nodename, myself->shard_id);
delKeysInSlot(dirty_slots[j]);
delKeysInSlot(dirty_slots[j], server.lazyfree_lazy_server_del);
}
}
}
Expand Down Expand Up @@ -5750,7 +5751,7 @@ int verifyClusterConfigWithData(void) {
server.cluster->importing_slots_from[j]->shard_id, j, server.cluster->slots[j]->name,
server.cluster->slots[j]->human_nodename, server.cluster->slots[j]->shard_id);
}
delKeysInSlot(j);
delKeysInSlot(j, server.lazyfree_lazy_server_del);
}
}
if (update_config) clusterSaveConfigOrDie(1);
Expand Down Expand Up @@ -6337,13 +6338,14 @@ void removeChannelsInSlot(unsigned int slot) {

/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int delKeysInSlot(unsigned int hashslot, int lazy) {
if (!countKeysInSlot(hashslot)) return 0;

/* We may lose a slot during the pause. We need to track this
* state so that we don't assert in propagateNow(). */
server.server_del_keys_in_slot = 1;
unsigned int j = 0;
int before_execution_nesting = server.execution_nesting;

kvstoreHashtableIterator *kvs_di = NULL;
void *next;
Expand All @@ -6353,8 +6355,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
enterExecutionUnit(1, 0);
sds sdskey = objectGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
if (lazy) {
dbAsyncDelete(&server.db[0], key);
} else {
dbSyncDelete(&server.db[0], key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the deleted keys are replicated just by multiple DEL commands. For comparison, FLUSHDB is replicated as a single FLUSHDB command, which is much less traffic than many DEL commands. If we want to replicate FLUSHSLOT as a single FLUSHSLOT command, then we should probably check that the replicas supports this though. We already have the replica's version stored in c->repl_data->replica_version (REPLCONF VERSION is sent by the replica since 8.0) so we could check that it's >= 8.1 for all connected replicas. It's also fine to skip this. We can do it later as a separate optimization.

}
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
propagateDeletion(&server.db[0], key, lazy);
signalModifiedKey(NULL, &server.db[0], key);
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code comment about keyspace notifications:

        /* The keys are not actually logically deleted from the database, just moved to another node.
         * The modules needs to know that these keys are no longer available locally, so just send the
         * keyspace notification to the modules, but not to clients. */

delKeysInSlot is used for slot migrations, which are considered not deleting the keys but just migrating them to another node.

This is not necessarily the case for CLUSTER FLUSHSLOT. I think we should treat CLUSTER FLUSHSLOT more like FLUSHDB. For FLUSHDB, what are the keyspace notifications sent? A lot of DEL events or one FLUSHDB event?

Maybe we shouldn't use delKeysInSlot here, or use some extra argument or flag to indicate if it's a migration or if we're just deleting the keys.

Expand All @@ -6369,7 +6376,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
kvstoreReleaseHashtableIterator(kvs_di);

server.server_del_keys_in_slot = 0;
serverAssert(server.execution_nesting == 0);
serverAssert(server.execution_nesting == before_execution_nesting);
return j;
}

Expand Down Expand Up @@ -7115,6 +7122,9 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
clusterCommandFlushslot(c);
} else {
return 0;
}
Expand Down Expand Up @@ -7314,3 +7324,21 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {

return C_OK;
}

void clusterCommandFlushslot(client *c) {
int slot;
int lazy = server.lazyfree_lazy_user_flush;
if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return;
if (c->argc == 4) {
if (!strcasecmp(c->argv[3]->ptr, "async")) {
lazy = 1;
} else if (!strcasecmp(c->argv[3]->ptr, "sync")) {
lazy = 0;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should fail the c->argc > 4 case I think

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the call site of clusterCommandFlushslot, I have already performed a check on the number of parameters. Is it still necessary to validate the number of parameters again within the function?

 else if (!strcasecmp(c->argv[1]->ptr, "flushslot") && (c->argc == 3 || c->argc == 4)) {
        /* CLUSTER FLUSHSLOT <slot> [ASYNC|SYNC] */
        clusterCommandFlushslot(c);
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that the arity check is split into multiple places. But I don't mind too much because it's like that for many commands already.

But I think this should be in cluster.c instead of cluster_legacy.c. Cluster_legacy is about the cluster bus, and we would replace this file if we replace the cluster bus for cluster V2. At least, that's the idea. This command doesn't use the cluster bus at all, so I think it should be in cluster.c and be called directly from clusterCommand().

delKeysInSlot(slot, lazy);
addReply(c, shared.ok);
}
30 changes: 30 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,35 @@ struct COMMAND_ARG CLUSTER_FAILOVER_Args[] = {
{MAKE_ARG("options",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FAILOVER_options_Subargs},
};

/********** CLUSTER FLUSHSLOT ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER FLUSHSLOT history */
#define CLUSTER_FLUSHSLOT_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER FLUSHSLOT tips */
#define CLUSTER_FLUSHSLOT_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER FLUSHSLOT key specs */
#define CLUSTER_FLUSHSLOT_Keyspecs NULL
#endif

/* CLUSTER FLUSHSLOT flush_type argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_flush_type_Subargs[] = {
{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER FLUSHSLOT argument table */
struct COMMAND_ARG CLUSTER_FLUSHSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_FLUSHSLOT_flush_type_Subargs},
};

/********** CLUSTER FLUSHSLOTS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1012,6 +1041,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("delslots","Sets hash slots as unbound for a node.","O(N) where N is the total number of hash slot arguments","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTS_History,0,CLUSTER_DELSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTS_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTS_Args},
{MAKE_CMD("delslotsrange","Sets hash slot ranges as unbound for a node.","O(N) where N is the total number of the slots between the start slot and end slot arguments.","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_DELSLOTSRANGE_History,0,CLUSTER_DELSLOTSRANGE_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_DELSLOTSRANGE_Keyspecs,0,NULL,1),.args=CLUSTER_DELSLOTSRANGE_Args},
{MAKE_CMD("failover","Forces a replica to perform a manual failover of its primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FAILOVER_History,0,CLUSTER_FAILOVER_Tips,0,clusterCommand,-2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FAILOVER_Keyspecs,0,NULL,1),.args=CLUSTER_FAILOVER_Args},
{MAKE_CMD("flushslot","Remove all keys from the target slot.","O(N) where N is the number of keys in the target slot","8.1.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOT_History,0,CLUSTER_FLUSHSLOT_Tips,0,clusterCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,CLUSTER_FLUSHSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_FLUSHSLOT_Args},
{MAKE_CMD("flushslots","Deletes all slots information from a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FLUSHSLOTS_History,0,CLUSTER_FLUSHSLOTS_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FLUSHSLOTS_Keyspecs,0,NULL,0)},
{MAKE_CMD("forget","Removes a node from the nodes table.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_FORGET_History,0,CLUSTER_FORGET_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_FORGET_Keyspecs,0,NULL,1),.args=CLUSTER_FORGET_Args},
{MAKE_CMD("getkeysinslot","Returns the key names in a hash slot.","O(N) where N is the number of requested keys","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_GETKEYSINSLOT_History,0,CLUSTER_GETKEYSINSLOT_Tips,1,clusterCommand,4,CMD_STALE,0,CLUSTER_GETKEYSINSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_GETKEYSINSLOT_Args},
Expand Down
46 changes: 46 additions & 0 deletions src/commands/cluster-flushslot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"FLUSHSLOT": {
"summary": "Remove all keys from the target slot.",
"complexity": "O(N) where N is the number of keys in the target slot",
"group": "cluster",
"since": "8.1.0",
"arity": -3,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"WRITE"
],
"acl_categories": [
"KEYSPACE",
"DANGEROUS"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "slot",
"type": "integer"
},
{
"name": "flush-type",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "async",
"type": "pure-token",
"token": "ASYNC",
"since": "8.1.0"
},
{
"name": "sync",
"type": "pure-token",
"token": "SYNC",
"since": "8.1.0"
}
]
}
]
}
}
34 changes: 34 additions & 0 deletions tests/unit/cluster/cluster-flush-slot.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
start_cluster 2 2 {tags {external:skip cluster}} {
test "SYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]

# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num

# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot SYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
}

test "ASYNC Flush slot command" {
set key_slot [R 0 CLUSTER KEYSLOT FC]
set slot_keys_num [R 0 CLUSTER COUNTKEYSINSLOT $key_slot]

# set key
for {set i 0} {$i < 1000} {incr i} {
R 0 set "{FC}-$i" "value"
}
set after_keys_num [expr {$slot_keys_num + 1000}]
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] $after_keys_num

# flush slot key
R 0 CLUSTER FLUSHSLOT $key_slot ASYNC
assert_equal [R 0 CLUSTER COUNTKEYSINSLOT $key_slot] 0
wuranxx marked this conversation as resolved.
Show resolved Hide resolved
}
}

Loading