From 1584445280c9c696fe9af0db0cd5fada33981113 Mon Sep 17 00:00:00 2001 From: wilesduan Date: Tue, 8 Jan 2019 17:29:09 +0800 Subject: [PATCH 1/3] 1.shuffly command 'cluster route'; 2. only close error nodes when reset cluster; 3. 'cluster route' only when 'MOVED' or connection failed; 4. connect 1time in 5s, 'cluster route' 1time in 10s --- hircluster.c | 166 +++++++++++++++++++++++++++++---------------------- hircluster.h | 2 + 2 files changed, 98 insertions(+), 70 deletions(-) diff --git a/hircluster.c b/hircluster.c index b051936..84b7bd4 100644 --- a/hircluster.c +++ b/hircluster.c @@ -270,6 +270,8 @@ static int cluster_node_init(cluster_node *node) node->acon = NULL; node->slots = NULL; node->failure_count = 0; + node->conn_failed = 0; + node->last_conn_time = 0; node->data = NULL; node->migrating = NULL; node->importing = NULL; @@ -1030,7 +1032,7 @@ parse_cluster_nodes(redisClusterContext *cc, } //add master node - if(role_len >= 6 && memcmp(role, "master", 6) == 0){ + if(role_len >= 6 && memcmp(role, "master", 6) == 0 && memcmp(role, "master,fail", 11) != 0){ if(count_part < 8){ __redisClusterSetError(cc,REDIS_ERR_OTHER, "Master node parts number error: less than 8."); @@ -1883,11 +1885,20 @@ cluster_update_route_with_nodes_old(redisClusterContext *cc, int cluster_update_route(redisClusterContext *cc) { + uint64_t now = hi_msec_now(); + uint64_t last_update = cc->update_route_time; + if(now < last_update + 5000){ + return REDIS_OK; + } + cc->update_route_time = now; + + cc->retry_count = 0; int ret; int flag_err_not_set = 1; cluster_node *node; dictIterator *it; dictEntry *de; + uint32_t start, idx, real_cnt; if(cc == NULL) { @@ -1905,7 +1916,7 @@ cluster_update_route(redisClusterContext *cc) flag_err_not_set = 0; } - if(cc->nodes == NULL) + if(cc->nodes == NULL || cc->nodes->used == 0) { if(flag_err_not_set) { @@ -1915,9 +1926,19 @@ cluster_update_route(redisClusterContext *cc) return REDIS_ERR; } + start = rand() % cc->nodes->used; + idx = 0; + real_cnt = 0; +start_point: it = dictGetIterator(cc->nodes); - while ((de = dictNext(it)) != NULL) + while ((de = dictNext(it)) != NULL && real_cnt < cc->nodes->used) { + if(idx++ < start){ + continue; + } + + printf("[REDIS FIX]%s:%d start point:%u:%u:%d\n", __FILE__, __LINE__, start, idx, cc->nodes->used); + ++real_cnt; node = dictGetEntryVal(de); if(node == NULL || node->host == NULL || node->port < 0) { @@ -1933,15 +1954,22 @@ cluster_update_route(redisClusterContext *cc) memset(cc->errstr, '\0', strlen(cc->errstr)); } + printf("[REDIS FIX]%s:%d update route success:%s:%d\n", __FILE__, __LINE__, node->host, node->port); dictReleaseIterator(it); return REDIS_OK; } + printf("[REDIS FIX]%s:%d failed to update route:%s:%d\n", __FILE__, __LINE__, node->host, node->port); flag_err_not_set = 0; } dictReleaseIterator(it); + if(real_cnt < cc->nodes->used){ + printf("[REDIS FIX]%s:%d retry update route\n", __FILE__, __LINE__); + goto start_point; + } + printf("[REDIS FIX]%s:%d failed to update route\n", __FILE__, __LINE__); if(flag_err_not_set) { __redisClusterSetError(cc, REDIS_ERR_OTHER, "no valid server address"); @@ -2408,7 +2436,7 @@ int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval t di = dictGetIterator(cc->nodes); - while (de=dictNext(di)) + while ((de=dictNext(di))) { node = dictGetEntryVal(de); if (node->con && node->con->flags&REDIS_CONNECTED && node->con->err == 0) @@ -2423,7 +2451,7 @@ int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval t listNode *ln; li = listGetIterator(node->slaves, AL_START_HEAD); - while (ln = listNext(li)) + while ((ln = listNext(li))) { slave = listNodeValue(ln); if (slave->con && slave->con->flags&REDIS_CONNECTED && slave->con->err == 0) @@ -2469,47 +2497,49 @@ int redisClusterConnect2(redisClusterContext *cc) redisContext *ctx_get_by_node(redisClusterContext *cc, cluster_node *node) { redisContext *c = NULL; - if(node == NULL) - { + if(node == NULL){ return NULL; } c = node->con; - if(c != NULL) - { - if(c->err) - { - redisReconnect(c); - - if (cc->timeout && c->err == 0) { - redisSetTimeout(c, *cc->timeout); - } - } - - return c; - } - - if(node->host == NULL || node->port <= 0) - { - return NULL; - } - - if(cc->connect_timeout) - { + if(!c && (node->host == NULL || node->port <= 0)){ + return NULL; + } + + if(c && !(c->err)){ + return c; + } + + uint64_t now = hi_msec_now(); + if(now < node->last_conn_time + 5000){ + return NULL; + } + node->last_conn_time = now; + + if(c){ + redisReconnect(c); + }else if(cc->connect_timeout){ c = redisConnectWithTimeout(node->host, node->port, *cc->connect_timeout); - } - else - { + }else{ c = redisConnect(node->host, node->port); - } + } + + if((!c || c->err) && (++(node->conn_failed)) > cc->max_redirect_count){ + cc->need_update_route = 1; + printf("[REDIS FIX]%s:%d node reconnect failed up to:%d:%d\n", __FILE__, __LINE__, node->conn_failed, cc->max_redirect_count); + node->conn_failed = 0; + } + + if(c && !(c->err)){ + node->conn_failed = 0; + } if (cc->timeout && c != NULL && c->err == 0) { redisSetTimeout(c, *cc->timeout); } - node->con = c; - return c; + return c; } static cluster_node *node_get_by_slot(redisClusterContext *cc, uint32_t slot_num) @@ -2593,6 +2623,11 @@ static cluster_node *node_get_by_table(redisClusterContext *cc, uint32_t slot_nu return NULL; } + if(!(cc->table[slot_num])){ + cc->need_update_route = (++cc->retry_count>cc->max_redirect_count)?1:0; + printf("[REDIS FIX]%s:%d null context need update route to:%d:%d>%d\n", __FILE__, __LINE__, cc->need_update_route, cc->retry_count, cc->max_redirect_count); + } + return cc->table[slot_num]; } @@ -2875,15 +2910,6 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num, void ** } else if(c->err) { - if(cc->need_update_route == 0) - { - cc->retry_count ++; - if(cc->retry_count > cc->max_redirect_count) - { - cc->need_update_route = 1; - cc->retry_count = 0; - } - } __redisClusterSetError(cc, c->err, c->errstr); return REDIS_ERR; } @@ -4030,7 +4056,7 @@ static int redisCLusterSendAll(redisClusterContext *cc) return REDIS_OK; } -static int redisCLusterClearAll(redisClusterContext *cc) +static int redisCLusterClearErrNode(redisClusterContext *cc) { dictIterator *di; dictEntry *de; @@ -4059,13 +4085,15 @@ static int redisCLusterClearAll(redisClusterContext *cc) } c = node->con; - if(c == NULL) + if(c == NULL || c->err == REDIS_OK) { continue; } + printf("[REDIS FIX]%s:%d close host:%s:%d\n", __FILE__, __LINE__, node->host, node->port); redisFree(c); node->con = NULL; + node->conn_failed = 0; } dictReleaseIterator(di); @@ -4179,21 +4207,17 @@ void redisClusterReset(redisClusterContext *cc) return; } + redisCLusterSendAll(cc); + do { + status = redisClusterGetReply(cc, &reply); + if (status == REDIS_OK) { + freeReplyObject(reply); + } + } while(reply != NULL); + if (cc->err) { - redisCLusterClearAll(cc); - } else { - redisCLusterSendAll(cc); - - do { - status = redisClusterGetReply(cc, &reply); - if (status == REDIS_OK) { - freeReplyObject(reply); - } else { - redisCLusterClearAll(cc); - break; - } - } while(reply != NULL); - } + redisCLusterClearErrNode(cc); + } if(cc->requests) { @@ -4201,17 +4225,19 @@ void redisClusterReset(redisClusterContext *cc) cc->requests = NULL; } - if(cc->need_update_route) - { - status = cluster_update_route(cc); - if(status != REDIS_OK) - { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "route update error, please recreate redisClusterContext!"); - return; - } - cc->need_update_route = 0; - } + if(!cc->need_update_route){ + return; + } + + printf("[REDIS FIX]%s:%d need to update route\n", __FILE__, __LINE__); + status = cluster_update_route(cc); + if(status != REDIS_OK){ + printf("[REDIS FIX]%s,%d clear udpate route failed:%d\n", __FILE__, __LINE__, status); + __redisClusterSetError(cc, REDIS_ERR_OTHER, + "route update error, please recreate redisClusterContext!"); + return; + } + cc->need_update_route = 0; } /*############redis cluster async############*/ diff --git a/hircluster.h b/hircluster.h index 95585c9..69a0338 100644 --- a/hircluster.h +++ b/hircluster.h @@ -46,6 +46,8 @@ typedef struct cluster_node struct hilist *slots; struct hilist *slaves; int failure_count; + int conn_failed; + uint64_t last_conn_time; void *data; /* Not used by hiredis */ struct hiarray *migrating; /* copen_slot[] */ struct hiarray *importing; /* copen_slot[] */ From 00f199f3401f29f7d58474a51ccea79945970e00 Mon Sep 17 00:00:00 2001 From: wilesduan Date: Wed, 30 Jan 2019 10:37:19 +0800 Subject: [PATCH 2/3] fix bug --- hircluster.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hircluster.c b/hircluster.c index 84b7bd4..31bb709 100644 --- a/hircluster.c +++ b/hircluster.c @@ -4045,7 +4045,8 @@ static int redisCLusterSendAll(redisClusterContext *cc) if (redisBufferWrite(c,&wdone) == REDIS_ERR) { dictReleaseIterator(di); - return REDIS_ERR; + c->err = REDIS_ERR; + break; } } while (!wdone); } From ae3439889cfac7ed16ac380ab34ae3f76b860a32 Mon Sep 17 00:00:00 2001 From: duanlijun Date: Mon, 25 Feb 2019 07:22:53 +0800 Subject: [PATCH 3/3] fix bug --- hircluster.c | 8 ++++++-- hircluster.h | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hircluster.c b/hircluster.c index 31bb709..c3c3398 100644 --- a/hircluster.c +++ b/hircluster.c @@ -4011,8 +4011,12 @@ int redisClusterAppendCommandArgv(redisClusterContext *cc, return ret; } -static int redisCLusterSendAll(redisClusterContext *cc) +int redisCLusterSendAll(redisClusterContext *cc) { + if(!cc){ + return REDIS_ERR; + } + dictIterator *di; dictEntry *de; struct cluster_node *node; @@ -4044,7 +4048,7 @@ static int redisCLusterSendAll(redisClusterContext *cc) do { if (redisBufferWrite(c,&wdone) == REDIS_ERR) { - dictReleaseIterator(di); + //dictReleaseIterator(di); c->err = REDIS_ERR; break; } diff --git a/hircluster.h b/hircluster.h index 69a0338..cda6507 100644 --- a/hircluster.h +++ b/hircluster.h @@ -143,6 +143,7 @@ int cluster_update_route(redisClusterContext *cc); int test_cluster_update_route(redisClusterContext *cc); struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags); struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags); +int redisCLusterSendAll(redisClusterContext *cc); /*############redis cluster async############*/