Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize #99

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 104 additions & 73 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ static int cluster_node_init(cluster_node *node)
node->acon = NULL;
node->slots = NULL;
node->failure_count = 0;
node->conn_failed = 0;
node->last_conn_time = 0;
node->data = NULL;
node->migrating = NULL;
node->importing = NULL;
Expand Down Expand Up @@ -1030,7 +1032,7 @@ parse_cluster_nodes(redisClusterContext *cc,
}

//add master node
if(role_len >= 6 && memcmp(role, "master", 6) == 0){
if(role_len >= 6 && memcmp(role, "master", 6) == 0 && memcmp(role, "master,fail", 11) != 0){
if(count_part < 8){
__redisClusterSetError(cc,REDIS_ERR_OTHER,
"Master node parts number error: less than 8.");
Expand Down Expand Up @@ -1883,11 +1885,20 @@ cluster_update_route_with_nodes_old(redisClusterContext *cc,
int
cluster_update_route(redisClusterContext *cc)
{
uint64_t now = hi_msec_now();
uint64_t last_update = cc->update_route_time;
if(now < last_update + 5000){
return REDIS_OK;
}
cc->update_route_time = now;

cc->retry_count = 0;
int ret;
int flag_err_not_set = 1;
cluster_node *node;
dictIterator *it;
dictEntry *de;
uint32_t start, idx, real_cnt;

if(cc == NULL)
{
Expand All @@ -1905,7 +1916,7 @@ cluster_update_route(redisClusterContext *cc)
flag_err_not_set = 0;
}

if(cc->nodes == NULL)
if(cc->nodes == NULL || cc->nodes->used == 0)
{
if(flag_err_not_set)
{
Expand All @@ -1915,9 +1926,19 @@ cluster_update_route(redisClusterContext *cc)
return REDIS_ERR;
}

start = rand() % cc->nodes->used;
idx = 0;
real_cnt = 0;
start_point:
it = dictGetIterator(cc->nodes);
while ((de = dictNext(it)) != NULL)
while ((de = dictNext(it)) != NULL && real_cnt < cc->nodes->used)
{
if(idx++ < start){
continue;
}

printf("[REDIS FIX]%s:%d start point:%u:%u:%d\n", __FILE__, __LINE__, start, idx, cc->nodes->used);
++real_cnt;
node = dictGetEntryVal(de);
if(node == NULL || node->host == NULL || node->port < 0)
{
Expand All @@ -1933,15 +1954,22 @@ cluster_update_route(redisClusterContext *cc)
memset(cc->errstr, '\0', strlen(cc->errstr));
}

printf("[REDIS FIX]%s:%d update route success:%s:%d\n", __FILE__, __LINE__, node->host, node->port);
dictReleaseIterator(it);
return REDIS_OK;
}

printf("[REDIS FIX]%s:%d failed to update route:%s:%d\n", __FILE__, __LINE__, node->host, node->port);
flag_err_not_set = 0;
}

dictReleaseIterator(it);
if(real_cnt < cc->nodes->used){
printf("[REDIS FIX]%s:%d retry update route\n", __FILE__, __LINE__);
goto start_point;
}

printf("[REDIS FIX]%s:%d failed to update route\n", __FILE__, __LINE__);
if(flag_err_not_set)
{
__redisClusterSetError(cc, REDIS_ERR_OTHER, "no valid server address");
Expand Down Expand Up @@ -2408,7 +2436,7 @@ int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval t

di = dictGetIterator(cc->nodes);

while (de=dictNext(di))
while ((de=dictNext(di)))
{
node = dictGetEntryVal(de);
if (node->con && node->con->flags&REDIS_CONNECTED && node->con->err == 0)
Expand All @@ -2423,7 +2451,7 @@ int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval t
listNode *ln;

li = listGetIterator(node->slaves, AL_START_HEAD);
while (ln = listNext(li))
while ((ln = listNext(li)))
{
slave = listNodeValue(ln);
if (slave->con && slave->con->flags&REDIS_CONNECTED && slave->con->err == 0)
Expand Down Expand Up @@ -2469,47 +2497,49 @@ int redisClusterConnect2(redisClusterContext *cc)
redisContext *ctx_get_by_node(redisClusterContext *cc, cluster_node *node)
{
redisContext *c = NULL;
if(node == NULL)
{
if(node == NULL){
return NULL;
}

c = node->con;
if(c != NULL)
{
if(c->err)
{
redisReconnect(c);

if (cc->timeout && c->err == 0) {
redisSetTimeout(c, *cc->timeout);
}
}

return c;
}

if(node->host == NULL || node->port <= 0)
{
return NULL;
}

if(cc->connect_timeout)
{
if(!c && (node->host == NULL || node->port <= 0)){
return NULL;
}

if(c && !(c->err)){
return c;
}

uint64_t now = hi_msec_now();
if(now < node->last_conn_time + 5000){
return NULL;
}
node->last_conn_time = now;

if(c){
redisReconnect(c);
}else if(cc->connect_timeout){
c = redisConnectWithTimeout(node->host, node->port, *cc->connect_timeout);
}
else
{
}else{
c = redisConnect(node->host, node->port);
}
}

if((!c || c->err) && (++(node->conn_failed)) > cc->max_redirect_count){
cc->need_update_route = 1;
printf("[REDIS FIX]%s:%d node reconnect failed up to:%d:%d\n", __FILE__, __LINE__, node->conn_failed, cc->max_redirect_count);
node->conn_failed = 0;
}

if(c && !(c->err)){
node->conn_failed = 0;
}

if (cc->timeout && c != NULL && c->err == 0) {
redisSetTimeout(c, *cc->timeout);
}

node->con = c;

return c;
return c;
}

static cluster_node *node_get_by_slot(redisClusterContext *cc, uint32_t slot_num)
Expand Down Expand Up @@ -2593,6 +2623,11 @@ static cluster_node *node_get_by_table(redisClusterContext *cc, uint32_t slot_nu
return NULL;
}

if(!(cc->table[slot_num])){
cc->need_update_route = (++cc->retry_count>cc->max_redirect_count)?1:0;
printf("[REDIS FIX]%s:%d null context need update route to:%d:%d>%d\n", __FILE__, __LINE__, cc->need_update_route, cc->retry_count, cc->max_redirect_count);
}

return cc->table[slot_num];

}
Expand Down Expand Up @@ -2875,15 +2910,6 @@ static int __redisClusterGetReply(redisClusterContext *cc, int slot_num, void **
}
else if(c->err)
{
if(cc->need_update_route == 0)
{
cc->retry_count ++;
if(cc->retry_count > cc->max_redirect_count)
{
cc->need_update_route = 1;
cc->retry_count = 0;
}
}
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
Expand Down Expand Up @@ -3985,8 +4011,12 @@ int redisClusterAppendCommandArgv(redisClusterContext *cc,
return ret;
}

static int redisCLusterSendAll(redisClusterContext *cc)
int redisCLusterSendAll(redisClusterContext *cc)
{
if(!cc){
return REDIS_ERR;
}

dictIterator *di;
dictEntry *de;
struct cluster_node *node;
Expand Down Expand Up @@ -4018,8 +4048,9 @@ static int redisCLusterSendAll(redisClusterContext *cc)
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
{
dictReleaseIterator(di);
return REDIS_ERR;
//dictReleaseIterator(di);
c->err = REDIS_ERR;
break;
}
} while (!wdone);
}
Expand All @@ -4030,7 +4061,7 @@ static int redisCLusterSendAll(redisClusterContext *cc)
return REDIS_OK;
}

static int redisCLusterClearAll(redisClusterContext *cc)
static int redisCLusterClearErrNode(redisClusterContext *cc)
{
dictIterator *di;
dictEntry *de;
Expand Down Expand Up @@ -4059,13 +4090,15 @@ static int redisCLusterClearAll(redisClusterContext *cc)
}

c = node->con;
if(c == NULL)
if(c == NULL || c->err == REDIS_OK)
{
continue;
}

printf("[REDIS FIX]%s:%d close host:%s:%d\n", __FILE__, __LINE__, node->host, node->port);
redisFree(c);
node->con = NULL;
node->conn_failed = 0;
}

dictReleaseIterator(di);
Expand Down Expand Up @@ -4179,39 +4212,37 @@ void redisClusterReset(redisClusterContext *cc)
return;
}

redisCLusterSendAll(cc);
do {
status = redisClusterGetReply(cc, &reply);
if (status == REDIS_OK) {
freeReplyObject(reply);
}
} while(reply != NULL);

if (cc->err) {
redisCLusterClearAll(cc);
} else {
redisCLusterSendAll(cc);

do {
status = redisClusterGetReply(cc, &reply);
if (status == REDIS_OK) {
freeReplyObject(reply);
} else {
redisCLusterClearAll(cc);
break;
}
} while(reply != NULL);
}
redisCLusterClearErrNode(cc);
}

if(cc->requests)
{
listRelease(cc->requests);
cc->requests = NULL;
}

if(cc->need_update_route)
{
status = cluster_update_route(cc);
if(status != REDIS_OK)
{
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"route update error, please recreate redisClusterContext!");
return;
}
cc->need_update_route = 0;
}
if(!cc->need_update_route){
return;
}

printf("[REDIS FIX]%s:%d need to update route\n", __FILE__, __LINE__);
status = cluster_update_route(cc);
if(status != REDIS_OK){
printf("[REDIS FIX]%s,%d clear udpate route failed:%d\n", __FILE__, __LINE__, status);
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"route update error, please recreate redisClusterContext!");
return;
}
cc->need_update_route = 0;
}

/*############redis cluster async############*/
Expand Down
3 changes: 3 additions & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef struct cluster_node
struct hilist *slots;
struct hilist *slaves;
int failure_count;
int conn_failed;
uint64_t last_conn_time;
void *data; /* Not used by hiredis */
struct hiarray *migrating; /* copen_slot[] */
struct hiarray *importing; /* copen_slot[] */
Expand Down Expand Up @@ -141,6 +143,7 @@ int cluster_update_route(redisClusterContext *cc);
int test_cluster_update_route(redisClusterContext *cc);
struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags);
struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags);
int redisCLusterSendAll(redisClusterContext *cc);


/*############redis cluster async############*/
Expand Down