diff --git a/src/mqtt/protocol/mqtt/mqtt_client.c b/src/mqtt/protocol/mqtt/mqtt_client.c index 935dc2983..d640a0161 100644 --- a/src/mqtt/protocol/mqtt/mqtt_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_client.c @@ -40,6 +40,7 @@ static void mqtt_sock_recv(void *arg, nni_aio *aio); static void mqtt_send_cb(void *arg); static void mqtt_recv_cb(void *arg); static void mqtt_timer_cb(void *arg); +static void mqtt_batch_cb(void *arg); static int mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s); static void mqtt_pipe_fini(void *arg); @@ -77,7 +78,8 @@ struct mqtt_pipe_s { nni_id_map recv_unack; // recv messages unacknowledged nni_aio send_aio; // send aio to the underlying transport nni_aio recv_aio; // recv aio to the underlying transport - nni_aio time_aio; // timer aio to resend unack msg + nni_aio time_aio; // timer aio to trigger batch_aio and ping + nni_aio batch_aio; // batch aio to resend unack msg nni_lmq recv_messages; // recv messages queue nni_lmq send_messages; // send messages queue uint16_t rid; // index of resending packet id @@ -97,6 +99,10 @@ struct mqtt_sock_s { nni_duration retry; nni_duration keepalive; // mqtt keepalive nni_duration timeleft; // left time to send next ping + uint16_t batchsz; // resend qos in batchs + uint16_t batchcnt; + nni_duration batchtmo; // interval of batch sending + uint16_t lastpid; nni_mtx mtx; // more fine grained mutual exclusion mqtt_ctx_t master; // to which we delegate send/recv calls mqtt_pipe_t *mqtt_pipe; @@ -134,6 +140,9 @@ mqtt_sock_init(void *arg, nni_sock *sock) s->retry = NNI_SECOND * 5; s->keepalive = NNI_SECOND * 10; // default mqtt keepalive s->timeleft = NNI_SECOND * 10; + s->batchcnt = 0; + s->batchsz = 8; + s->batchtmo = 10; // Interval of batch sending (ms) nni_mtx_init(&s->mtx); mqtt_ctx_init(&s->master, s); @@ -287,7 +296,7 @@ mqtt_sock_get_next_packet_id(mqtt_sock_t *s) do { packet_id = nni_atomic_get(&s->next_packet_id); /* PROTOCOL ERROR: when packet_id == 0 */ - while (packet_id & 0xFFFF == 0) { + while ((packet_id & 0xFFFF) == 0) { while(!nni_atomic_cas(&s->next_packet_id, packet_id, packet_id + 1)) { @@ -325,6 +334,7 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s) nni_aio_init(&p->send_aio, mqtt_send_cb, p); nni_aio_init(&p->recv_aio, mqtt_recv_cb, p); nni_aio_init(&p->time_aio, mqtt_timer_cb, p); + nni_aio_init(&p->batch_aio, mqtt_batch_cb, p); // Packet IDs are 16 bits // We start at a random point, to minimize likelihood of // accidental collision across restarts. @@ -367,6 +377,7 @@ mqtt_pipe_fini(void *arg) nni_aio_fini(&p->send_aio); nni_aio_fini(&p->recv_aio); nni_aio_fini(&p->time_aio); + nni_aio_fini(&p->batch_aio); nni_id_map_fini(&p->sent_unack); nni_id_map_fini(&p->recv_unack); @@ -450,15 +461,20 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) packet_id = nni_mqtt_msg_get_packet_id(msg); taio = nni_id_get(&p->sent_unack, packet_id); if (taio != NULL) { - nni_plat_printf("Warning : msg %d lost due to " - "packetID duplicated!", - packet_id); + log_warn("msg %d lost due to packetID duplicated!", packet_id); nni_aio_finish_error(taio, NNG_ECANCELED); + if ((tmsg = nni_aio_get_msg(taio)) != NULL) + nni_msg_free(tmsg); + nni_aio_set_msg(taio, NULL); nni_id_remove(&p->sent_unack, packet_id); } + nni_msg_set_timestamp(msg, nni_timestamp()); + nni_msg_clone(msg); // clone for resend, will free when ack received if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) { - nni_plat_printf("Warning : aio caching failed"); + log_warn("aio caching failed"); nni_aio_finish_error(aio, NNG_ECANCELED); + nni_aio_set_msg(aio, NULL); + nni_msg_free(msg); } break; @@ -483,7 +499,7 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg) return; } if (nni_lmq_full(&p->send_messages)) { - log_error("rhack: pipe is busy and lmq is full\n"); + log_warn("pipe is busy and lmq is full, drop a msg\n"); (void) nni_lmq_get(&p->send_messages, &tmsg); nni_msg_free(tmsg); } @@ -535,6 +551,7 @@ mqtt_pipe_stop(void *arg) nni_aio_stop(&p->send_aio); nni_aio_stop(&p->recv_aio); nni_aio_stop(&p->time_aio); + nni_aio_stop(&p->batch_aio); } static int @@ -549,6 +566,7 @@ mqtt_pipe_close(void *arg) nni_aio_close(&p->send_aio); nni_aio_close(&p->recv_aio); nni_aio_close(&p->time_aio); + nni_aio_close(&p->batch_aio); #if defined(NNG_SUPP_SQLITE) // flush to disk @@ -607,6 +625,70 @@ mqtt_pipe_close(void *arg) return 0; } +static void +mqtt_batch_cb(void *arg) +{ + mqtt_pipe_t *p = arg; + mqtt_sock_t *s = p->mqtt_sock; + nni_msg *msg = NULL; + uint16_t pid; + uint16_t ptype; + nni_aio *aio = NULL; + + if (nng_aio_result(&p->batch_aio) != 0) { + s->batchcnt = 0; + log_error("Batch aio error!"); + return; + } + nni_mtx_lock(&s->mtx); + if (NULL == p || nni_atomic_get_bool(&p->closed)) { + s->batchcnt = 0; + nni_mtx_unlock(&s->mtx); + return; + } + + // If batchcnt > 0. Batch sending was started. + if (s->batchcnt > 0) { + if (s->batchcnt < s->batchsz) { + pid = ++ s->lastpid; + aio = nni_id_get_min(&p->sent_unack, &pid); + } else { + // This batch sending finished + s->batchcnt = 0; + nni_mtx_unlock(&s->mtx); + return; + } + } else { + // The first run of this batch + aio = nni_id_get_min(&p->sent_unack, &pid); + } + s->batchcnt ++; + if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) { + s->lastpid = pid; + if (nni_timestamp() - nni_msg_get_timestamp(msg) <= 10 * NNI_SECOND) { + log_debug("NO.%d id%d Time short than 10s %ld", s->batchcnt, pid, nni_timestamp() - nni_msg_get_timestamp(msg)); + goto next; + } + nni_msg_clone(msg); + log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg); + ptype = nni_mqtt_msg_get_packet_type(msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->pipe, &p->send_aio); + } else { + nni_lmq_put(&p->send_messages, msg); + } + } else { + log_debug("NO.%d Batch sending missing", s->batchcnt); + } +next: + nni_mtx_unlock(&s->mtx); + nni_sleep_aio(s->batchtmo, &p->batch_aio); +} + // Timer callback, we use it for retransmitting. static void mqtt_timer_cb(void *arg) @@ -615,7 +697,7 @@ mqtt_timer_cb(void *arg) mqtt_sock_t *s = p->mqtt_sock; if (nng_aio_result(&p->time_aio) != 0) { - log_info("Timer aio error!"); + log_error("Timer aio error!"); return; } nni_mtx_lock(&s->mtx); @@ -649,38 +731,12 @@ mqtt_timer_cb(void *arg) return; } - // start message resending - // msg = nni_id_get_min(&p->sent_unack, &pid); - // if (msg != NULL) { - // uint16_t ptype; - // ptype = nni_mqtt_msg_get_packet_type(msg); - // if (ptype == NNG_MQTT_PUBLISH) { - // nni_mqtt_msg_set_publish_dup(msg, true); - // } - // if (!p->busy) { - // p->busy = true; - // nni_msg_clone(msg); - // aio = nni_mqtt_msg_get_aio(msg); - // if (aio) { - // nni_aio_bump_count(aio, - // nni_msg_header_len(msg) + - // nni_msg_len(msg)); - // nni_aio_set_msg(aio, NULL); - // } - // nni_aio_set_msg(&p->send_aio, msg); - // nni_pipe_send(p->pipe, &p->send_aio); - - // nni_mtx_unlock(&s->mtx); - // nni_sleep_aio(s->retry, &p->time_aio); - // return; - // } else { - // nni_msg_clone(msg); - // nni_lmq_put(&p->send_messages, msg); - // } - // } + if (s->batchcnt == 0) + nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend + #if defined(NNG_SUPP_SQLITE) + nni_msg *msg = NULL; if (!p->busy) { - nni_msg *msg = NULL; nni_mqtt_sqlite_option *sqlite = mqtt_sock_get_sqlite_option(s); if (sqlite_is_enabled(sqlite)) { @@ -904,7 +960,7 @@ mqtt_recv_cb(void *arg) } if (rv != MQTT_SUCCESS) { - nni_plat_printf("Error in encoding CONNACK.\n"); + log_error("Error in encoding CONNACK.\n"); } conn_param_clone(p->cparam); if ((ctx = nni_list_first(&s->recv_queue)) == NULL) { @@ -939,10 +995,12 @@ mqtt_recv_cb(void *arg) // FALLTHROUGH case NNG_MQTT_UNSUBACK: // we have received a UNSUBACK, successful unsubscription - packet_id = nni_mqtt_msg_get_packet_id(msg); + packet_id = nni_mqtt_msg_get_packet_id(msg); p->rid ++; user_aio = nni_id_get(&p->sent_unack, packet_id); if (user_aio != NULL) { + if (nni_aio_get_msg(user_aio) != NULL) + nni_msg_free(nni_aio_get_msg(user_aio)); nni_id_remove(&p->sent_unack, packet_id); if (packet_type == NNG_MQTT_SUBACK || packet_type == NNG_MQTT_UNSUBACK) {