From 097fea4e9b1d93e9ad5181d222ca23be62de0453 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 1 Jul 2024 03:35:26 -0400 Subject: [PATCH 1/9] * FIX [mqtt/proto] Fix the unfinished aio when send sub/unsub. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 935dc2983..8e5d92c2c 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -287,7 +287,7 @@ mqtt_sock_get_next_packet_id(mqtt_sock_t *s) do { packet_id = nni_atomic_get(&s->next_packet_id); /* PROTOCOL ERROR: when packet_id == 0 */ - while (packet_id & 0xFFFF == 0) { + while ((packet_id & 0xFFFF) == 0) { while(!nni_atomic_cas(&s->next_packet_id, packet_id, packet_id + 1)) { @@ -407,6 +407,7 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) nni_msg * msg = NULL; nni_msg * tmsg = NULL; nni_aio * taio = NULL; + uint16_t ttype = 0; if (p == NULL || nni_atomic_get_bool(&p->closed) || aio == NULL) { //pipe closed, should never gets here @@ -450,14 +451,12 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) packet_id = nni_mqtt_msg_get_packet_id(msg); taio = nni_id_get(&p->sent_unack, packet_id); if (taio != NULL) { - nni_plat_printf("Warning : msg %d lost due to " - "packetID duplicated!", - packet_id); + log_warn("msg %d lost due to packetID duplicated!", packet_id); nni_aio_finish_error(taio, NNG_ECANCELED); nni_id_remove(&p->sent_unack, packet_id); } if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) { - nni_plat_printf("Warning : aio caching failed"); + log_warn("aio caching failed"); nni_aio_finish_error(aio, NNG_ECANCELED); } break; @@ -483,8 +482,19 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) return; } if (nni_lmq_full(&p->send_messages)) { - log_error("rhack: pipe is busy and lmq is full\n"); + log_warn("pipe is busy and lmq is full, drop a msg\n"); (void) nni_lmq_get(&p->send_messages, &tmsg); + ttype = nni_mqtt_msg_get_packet_type(msg); + if (ttype == NNG_MQTT_SUBSCRIBE || ttype == NNG_MQTT_UNSUBSCRIBE) { + packet_id = nni_mqtt_msg_get_packet_id(tmsg); + taio = nni_id_get(&p->sent_unack, packet_id); // Also the user aio + if (taio != NULL) { + log_warn("msg of packetid %d lost due to full lmq!", packet_id); + nni_aio_finish_error(taio, NNG_ECANCELED); + nni_id_remove(&p->sent_unack, packet_id); + nni_mqtt_msg_set_aio(tmsg, NULL); + } + } nni_msg_free(tmsg); } if (0 != nni_lmq_put(&p->send_messages, msg)) { @@ -904,7 +914,7 @@ mqtt_recv_cb(void *arg) } if (rv != MQTT_SUCCESS) { - nni_plat_printf("Error in encoding CONNACK.\n"); + log_error("Error in encoding CONNACK.\n"); } conn_param_clone(p->cparam); if ((ctx = nni_list_first(&s->recv_queue)) == NULL) { From 0026910bbbd0d329ef48e51f14e6d2e4475bc169 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 2 Jul 2024 00:30:31 -0400 Subject: [PATCH 2/9] * NEW [proto/mqtt] Batch send is supported. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 112 +++++++++++++++++---------- 1 file changed, 71 insertions(+), 41 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 8e5d92c2c..92f8635df 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -97,6 +97,10 @@ struct mqtt_sock_s { nni_duration retry; nni_duration keepalive; // mqtt keepalive nni_duration timeleft; // left time to send next ping + uint16_t batchsz; // resend qos in batchs + uint16_t batchcnt; + nni_duration batchtmo; // interval of batch sending + uint16_t lastpid; nni_mtx mtx; // more fine grained mutual exclusion mqtt_ctx_t master; // to which we delegate send/recv calls mqtt_pipe_t *mqtt_pipe; @@ -134,6 +138,9 @@ mqtt_sock_init(void *arg, nni_sock *sock) s->retry = NNI_SECOND * 5; s->keepalive = NNI_SECOND * 10; // default mqtt keepalive s->timeleft = NNI_SECOND * 10; + s->batchcnt = 0; + s->batchsz = 8; + s->batchtmo = 10; // Interval of batch sending (ms) nni_mtx_init(&s->mtx); mqtt_ctx_init(&s->master, s); @@ -407,7 +414,6 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) nni_msg * msg = NULL; nni_msg * tmsg = NULL; nni_aio * taio = NULL; - uint16_t ttype = 0; if (p == NULL || nni_atomic_get_bool(&p->closed) || aio == NULL) { //pipe closed, should never gets here @@ -453,11 +459,14 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) if (taio != NULL) { log_warn("msg %d lost due to packetID duplicated!", packet_id); nni_aio_finish_error(taio, NNG_ECANCELED); + nni_aio_set_msg(taio, NULL); nni_id_remove(&p->sent_unack, packet_id); } + log_info("IN <%d,%p>", packet_id, aio); if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) { log_warn("aio caching failed"); nni_aio_finish_error(aio, NNG_ECANCELED); + nni_aio_set_msg(aio, NULL); } break; @@ -484,16 +493,13 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) if (nni_lmq_full(&p->send_messages)) { log_warn("pipe is busy and lmq is full, drop a msg\n"); (void) nni_lmq_get(&p->send_messages, &tmsg); - ttype = nni_mqtt_msg_get_packet_type(msg); - if (ttype == NNG_MQTT_SUBSCRIBE || ttype == NNG_MQTT_UNSUBSCRIBE) { - packet_id = nni_mqtt_msg_get_packet_id(tmsg); - taio = nni_id_get(&p->sent_unack, packet_id); // Also the user aio - if (taio != NULL) { - log_warn("msg of packetid %d lost due to full lmq!", packet_id); - nni_aio_finish_error(taio, NNG_ECANCELED); - nni_id_remove(&p->sent_unack, packet_id); - nni_mqtt_msg_set_aio(tmsg, NULL); - } + packet_id = nni_mqtt_msg_get_packet_id(tmsg); + taio = nni_id_get(&p->sent_unack, packet_id); // Also the user aio + if (taio != NULL) { + log_warn("msg of packetid %d lost due to full lmq!", packet_id); + nni_aio_finish_error(taio, NNG_ECANCELED); + nni_id_remove(&p->sent_unack, packet_id); + nni_mqtt_msg_set_aio(tmsg, NULL); } nni_msg_free(tmsg); } @@ -623,6 +629,10 @@ mqtt_timer_cb(void *arg) { mqtt_pipe_t *p = arg; mqtt_sock_t *s = p->mqtt_sock; + nni_msg *msg = NULL; + uint16_t pid; + uint16_t ptype; + nni_aio *aio = NULL; if (nng_aio_result(&p->time_aio) != 0) { log_info("Timer aio error!"); @@ -634,6 +644,34 @@ mqtt_timer_cb(void *arg) return; } + // If batchcnt > 0. Batch sending was started. So handle batch first. + if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { + pid = s->lastpid + 1; + aio = nni_id_get_min(&p->sent_unack, &pid); + s->batchcnt ++; + } + if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { + log_info("clonemsg %p", msg); + nni_msg_clone(msg); + s->lastpid = pid; + log_info("NO.%d Batch sending id %d", s->batchcnt-1, pid); + ptype = nni_mqtt_msg_get_packet_type(msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->pipe, &p->send_aio); + } else { + nni_lmq_put(&p->send_messages, msg); + } + } + if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { + nni_mtx_unlock(&s->mtx); + nni_sleep_aio(s->batchtmo, &p->time_aio); + return; + } + if (p->pingcnt > 1) { log_warn("MQTT Timeout and disconnect"); nni_mtx_unlock(&s->mtx); @@ -660,37 +698,29 @@ mqtt_timer_cb(void *arg) } // start message resending - // msg = nni_id_get_min(&p->sent_unack, &pid); - // if (msg != NULL) { - // uint16_t ptype; - // ptype = nni_mqtt_msg_get_packet_type(msg); - // if (ptype == NNG_MQTT_PUBLISH) { - // nni_mqtt_msg_set_publish_dup(msg, true); - // } - // if (!p->busy) { - // p->busy = true; - // nni_msg_clone(msg); - // aio = nni_mqtt_msg_get_aio(msg); - // if (aio) { - // nni_aio_bump_count(aio, - // nni_msg_header_len(msg) + - // nni_msg_len(msg)); - // nni_aio_set_msg(aio, NULL); - // } - // nni_aio_set_msg(&p->send_aio, msg); - // nni_pipe_send(p->pipe, &p->send_aio); - - // nni_mtx_unlock(&s->mtx); - // nni_sleep_aio(s->retry, &p->time_aio); - // return; - // } else { - // nni_msg_clone(msg); - // nni_lmq_put(&p->send_messages, msg); - // } - // } + aio = nni_id_get_min(&p->sent_unack, &pid); + if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { + nni_msg_clone(msg); + s->lastpid = pid; + s->batchcnt ++; + log_info("Batch sending started id%d", pid); + ptype = nni_mqtt_msg_get_packet_type(msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->pipe, &p->send_aio); + + nni_mtx_unlock(&s->mtx); + nni_sleep_aio(s->batchtmo, &p->time_aio); + return; + } else { + nni_lmq_put(&p->send_messages, msg); + } + } #if defined(NNG_SUPP_SQLITE) if (!p->busy) { - nni_msg *msg = NULL; nni_mqtt_sqlite_option *sqlite = mqtt_sock_get_sqlite_option(s); if (sqlite_is_enabled(sqlite)) { @@ -949,7 +979,7 @@ mqtt_recv_cb(void *arg) // FALLTHROUGH case NNG_MQTT_UNSUBACK: // we have received a UNSUBACK, successful unsubscription - packet_id = nni_mqtt_msg_get_packet_id(msg); + packet_id = nni_mqtt_msg_get_packet_id(msg); p->rid ++; user_aio = nni_id_get(&p->sent_unack, packet_id); if (user_aio != NULL) { From 803770b2be46d1faf831b567ad1d748fb0ff54d1 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 2 Jul 2024 00:42:13 -0400 Subject: [PATCH 3/9] * FIX [mqtt/proto] Fix the heap use after free of qos msg. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 92f8635df..e4e767e31 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -459,14 +459,17 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) if (taio != NULL) { log_warn("msg %d lost due to packetID duplicated!", packet_id); nni_aio_finish_error(taio, NNG_ECANCELED); + if ((tmsg = nni_aio_get_msg(taio)) != NULL) + nni_msg_free(tmsg); nni_aio_set_msg(taio, NULL); nni_id_remove(&p->sent_unack, packet_id); } - log_info("IN <%d,%p>", packet_id, aio); + nni_msg_clone(msg); // clone for resend, will free when ack received if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) { log_warn("aio caching failed"); nni_aio_finish_error(aio, NNG_ECANCELED); nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); } break; @@ -493,14 +496,6 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) if (nni_lmq_full(&p->send_messages)) { log_warn("pipe is busy and lmq is full, drop a msg\n"); (void) nni_lmq_get(&p->send_messages, &tmsg); - packet_id = nni_mqtt_msg_get_packet_id(tmsg); - taio = nni_id_get(&p->sent_unack, packet_id); // Also the user aio - if (taio != NULL) { - log_warn("msg of packetid %d lost due to full lmq!", packet_id); - nni_aio_finish_error(taio, NNG_ECANCELED); - nni_id_remove(&p->sent_unack, packet_id); - nni_mqtt_msg_set_aio(tmsg, NULL); - } nni_msg_free(tmsg); } if (0 != nni_lmq_put(&p->send_messages, msg)) { @@ -651,10 +646,9 @@ mqtt_timer_cb(void *arg) s->batchcnt ++; } if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { - log_info("clonemsg %p", msg); nni_msg_clone(msg); s->lastpid = pid; - log_info("NO.%d Batch sending id %d", s->batchcnt-1, pid); + log_info("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -703,7 +697,7 @@ mqtt_timer_cb(void *arg) nni_msg_clone(msg); s->lastpid = pid; s->batchcnt ++; - log_info("Batch sending started id%d", pid); + log_info("Batch sending started id%d msg%p", pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -983,6 +977,8 @@ mqtt_recv_cb(void *arg) p->rid ++; user_aio = nni_id_get(&p->sent_unack, packet_id); if (user_aio != NULL) { + if (nni_aio_get_msg(user_aio) != NULL) + nni_msg_free(nni_aio_get_msg(user_aio)); nni_id_remove(&p->sent_unack, packet_id); if (packet_type == NNG_MQTT_SUBACK || packet_type == NNG_MQTT_UNSUBACK) { From 50f88d8b2ee7223b49693270de3a0b594fe9353d Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 2 Jul 2024 06:00:17 -0400 Subject: [PATCH 4/9] * FIX [mqtt/proto] Update the counter of msgs will be sent in a batch. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index e4e767e31..d22680adc 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -660,9 +660,13 @@ mqtt_timer_cb(void *arg) nni_lmq_put(&p->send_messages, msg); } } - if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { + if (s->batchcnt > 0) { + s->batchcnt %= s->batchsz; nni_mtx_unlock(&s->mtx); - nni_sleep_aio(s->batchtmo, &p->time_aio); + if (s->batchcnt == 0) + nni_sleep_aio(s->retry, &p->time_aio); + else + nni_sleep_aio(s->batchtmo, &p->time_aio); return; } From e857d811e844d1340931a553310418cdcadcb188 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 2 Jul 2024 06:07:13 -0400 Subject: [PATCH 5/9] * FIX [mqtt/proto] Update the level of logs. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index d22680adc..53452cbdb 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -648,7 +648,7 @@ mqtt_timer_cb(void *arg) if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { nni_msg_clone(msg); s->lastpid = pid; - log_info("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); + log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -701,7 +701,7 @@ mqtt_timer_cb(void *arg) nni_msg_clone(msg); s->lastpid = pid; s->batchcnt ++; - log_info("Batch sending started id%d msg%p", pid, msg); + log_debug("Batch sending started id%d msg%p", pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); From 1ac64c318484a6ee7f7cbc4833c84699a008f37e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Fri, 12 Jul 2024 02:17:05 -0400 Subject: [PATCH 6/9] * NEW [mqtt/proto] Move batch sending to batch aio of batch_cb. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 85 +++++++++++++++------------- 1 file changed, 47 insertions(+), 38 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 53452cbdb..049c31ed9 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -77,7 +77,8 @@ struct mqtt_pipe_s { nni_id_map recv_unack; // recv messages unacknowledged nni_aio send_aio; // send aio to the underlying transport nni_aio recv_aio; // recv aio to the underlying transport - nni_aio time_aio; // timer aio to resend unack msg + nni_aio time_aio; // timer aio to trigger batch_aio and ping + nni_aio batch_aio; // batch aio to resend unack msg nni_lmq recv_messages; // recv messages queue nni_lmq send_messages; // send messages queue uint16_t rid; // index of resending packet id @@ -332,6 +333,7 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s) nni_aio_init(&p->send_aio, mqtt_send_cb, p); nni_aio_init(&p->recv_aio, mqtt_recv_cb, p); nni_aio_init(&p->time_aio, mqtt_timer_cb, p); + nni_aio_init(&p->batch_aio, mqtt_batch_cb, p); // Packet IDs are 16 bits // We start at a random point, to minimize likelihood of // accidental collision across restarts. @@ -374,6 +376,7 @@ mqtt_pipe_fini(void *arg) nni_aio_fini(&p->send_aio); nni_aio_fini(&p->recv_aio); nni_aio_fini(&p->time_aio); + nni_aio_fini(&p->batch_aio); nni_id_map_fini(&p->sent_unack); nni_id_map_fini(&p->recv_unack); @@ -546,6 +549,7 @@ mqtt_pipe_stop(void *arg) nni_aio_stop(&p->send_aio); nni_aio_stop(&p->recv_aio); nni_aio_stop(&p->time_aio); + nni_aio_stop(&p->batch_aio); } static int @@ -560,6 +564,7 @@ mqtt_pipe_close(void *arg) nni_aio_close(&p->send_aio); nni_aio_close(&p->recv_aio); nni_aio_close(&p->time_aio); + nni_aio_close(&p->batch_aio); #if defined(NNG_SUPP_SQLITE) // flush to disk @@ -618,9 +623,8 @@ mqtt_pipe_close(void *arg) return 0; } -// Timer callback, we use it for retransmitting. static void -mqtt_timer_cb(void *arg) +mqtt_batch_cb(void *arg) { mqtt_pipe_t *p = arg; mqtt_sock_t *s = p->mqtt_sock; @@ -629,8 +633,8 @@ mqtt_timer_cb(void *arg) uint16_t ptype; nni_aio *aio = NULL; - if (nng_aio_result(&p->time_aio) != 0) { - log_info("Timer aio error!"); + if (nng_aio_result(&p->batch_aio) != 0) { + log_error("Batch aio error!"); return; } nni_mtx_lock(&s->mtx); @@ -639,16 +643,26 @@ mqtt_timer_cb(void *arg) return; } - // If batchcnt > 0. Batch sending was started. So handle batch first. - if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { - pid = s->lastpid + 1; + // If batchcnt > 0. Batch sending was started. + if (s->batchcnt > 0) { + if (s->batchcnt < s->batchsz) { + pid = ++ s->lastpid; + aio = nni_id_get_min(&p->sent_unack, &pid); + } else { + // This batch sending finished + s->batchcnt = 0; + nni_mtx_unlock(&s->mtx); + return; + } + } else { + // The first run of this batch aio = nni_id_get_min(&p->sent_unack, &pid); - s->batchcnt ++; } + s->batchcnt ++; if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { nni_msg_clone(msg); s->lastpid = pid; - log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); + log_info("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -660,13 +674,28 @@ mqtt_timer_cb(void *arg) nni_lmq_put(&p->send_messages, msg); } } - if (s->batchcnt > 0) { - s->batchcnt %= s->batchsz; + nni_mtx_unlock(&s->mtx); + nni_sleep_aio(s->batchtmo, &p->batch_aio); +} + +// Timer callback, we use it for retransmitting. +static void +mqtt_timer_cb(void *arg) +{ + mqtt_pipe_t *p = arg; + mqtt_sock_t *s = p->mqtt_sock; + nni_msg *msg = NULL; + uint16_t pid; + uint16_t ptype; + nni_aio *aio = NULL; + + if (nng_aio_result(&p->time_aio) != 0) { + log_error("Timer aio error!"); + return; + } + nni_mtx_lock(&s->mtx); + if (NULL == p || nni_atomic_get_bool(&p->closed)) { nni_mtx_unlock(&s->mtx); - if (s->batchcnt == 0) - nni_sleep_aio(s->retry, &p->time_aio); - else - nni_sleep_aio(s->batchtmo, &p->time_aio); return; } @@ -695,28 +724,8 @@ mqtt_timer_cb(void *arg) return; } - // start message resending - aio = nni_id_get_min(&p->sent_unack, &pid); - if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { - nni_msg_clone(msg); - s->lastpid = pid; - s->batchcnt ++; - log_debug("Batch sending started id%d msg%p", pid, msg); - ptype = nni_mqtt_msg_get_packet_type(msg); - if (ptype == NNG_MQTT_PUBLISH) - nni_mqtt_msg_set_publish_dup(msg, true); - if (!p->busy) { - p->busy = true; - nni_aio_set_msg(&p->send_aio, msg); - nni_pipe_send(p->pipe, &p->send_aio); - - nni_mtx_unlock(&s->mtx); - nni_sleep_aio(s->batchtmo, &p->time_aio); - return; - } else { - nni_lmq_put(&p->send_messages, msg); - } - } + nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend + // #if defined(NNG_SUPP_SQLITE) if (!p->busy) { nni_mqtt_sqlite_option *sqlite = From 41882662766612177666094989c100c80378e459 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Fri, 12 Jul 2024 02:19:46 -0400 Subject: [PATCH 7/9] * FIX [mqtt/proto] Fix the errors in compiling. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 049c31ed9..db4cd46d1 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -40,6 +40,7 @@ static void mqtt_sock_recv(void *arg, nni_aio *aio); static void mqtt_send_cb(void *arg); static void mqtt_recv_cb(void *arg); static void mqtt_timer_cb(void *arg); +static void mqtt_batch_cb(void *arg); static int mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s); static void mqtt_pipe_fini(void *arg); @@ -684,10 +685,6 @@ mqtt_timer_cb(void *arg) { mqtt_pipe_t *p = arg; mqtt_sock_t *s = p->mqtt_sock; - nni_msg *msg = NULL; - uint16_t pid; - uint16_t ptype; - nni_aio *aio = NULL; if (nng_aio_result(&p->time_aio) != 0) { log_error("Timer aio error!"); @@ -727,6 +724,7 @@ mqtt_timer_cb(void *arg) nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend // #if defined(NNG_SUPP_SQLITE) + nni_msg *msg = NULL; if (!p->busy) { nni_mqtt_sqlite_option *sqlite = mqtt_sock_get_sqlite_option(s); From 9a260ebbd98a54e329bc41b887d6c9048c3ae691 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Fri, 12 Jul 2024 04:35:50 -0400 Subject: [PATCH 8/9] * NEW [mqtt/proto] Don't run batch aio when batch aio is busy. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index db4cd46d1..7ff330615 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -721,8 +721,9 @@ mqtt_timer_cb(void *arg) return; } - nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend - // + if (s->batchcnt == 0) + nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend + #if defined(NNG_SUPP_SQLITE) nni_msg *msg = NULL; if (!p->busy) { From 2d9dd3119037c338898d72e9b47ced6ac4633c43 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Fri, 12 Jul 2024 05:33:11 -0400 Subject: [PATCH 9/9] * NEW [mqtt/proto] Only send qos msgs which stay in unack over 10s. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_client.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 7ff330615..d640a0161 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -468,6 +468,7 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) nni_aio_set_msg(taio, NULL); nni_id_remove(&p->sent_unack, packet_id); } + nni_msg_set_timestamp(msg, nni_timestamp()); nni_msg_clone(msg); // clone for resend, will free when ack received if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) { log_warn("aio caching failed"); @@ -635,11 +636,13 @@ mqtt_batch_cb(void *arg) nni_aio *aio = NULL; if (nng_aio_result(&p->batch_aio) != 0) { + s->batchcnt = 0; log_error("Batch aio error!"); return; } nni_mtx_lock(&s->mtx); if (NULL == p || nni_atomic_get_bool(&p->closed)) { + s->batchcnt = 0; nni_mtx_unlock(&s->mtx); return; } @@ -661,9 +664,13 @@ mqtt_batch_cb(void *arg) } s->batchcnt ++; if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { - nni_msg_clone(msg); s->lastpid = pid; - log_info("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg); + if (nni_timestamp() - nni_msg_get_timestamp(msg) <= 10 * NNI_SECOND) { + log_debug("NO.%d id%d Time short than 10s %ld", s->batchcnt, pid, nni_timestamp() - nni_msg_get_timestamp(msg)); + goto next; + } + nni_msg_clone(msg); + log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -674,7 +681,10 @@ mqtt_batch_cb(void *arg) } else { nni_lmq_put(&p->send_messages, msg); } + } else { + log_debug("NO.%d Batch sending missing", s->batchcnt); } +next: nni_mtx_unlock(&s->mtx); nni_sleep_aio(s->batchtmo, &p->batch_aio); }