Skip to content

Commit

Permalink
* NEW [mqtt/proto] Only send qos msgs which stay in unack over 10s.
Browse files Browse the repository at this point in the history
Signed-off-by: wanghaemq <[email protected]>
  • Loading branch information
wanghaEMQ committed Jul 12, 2024
1 parent 9a260eb commit 2d9dd31
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
nni_aio_set_msg(taio, NULL);
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_set_timestamp(msg, nni_timestamp());
nni_msg_clone(msg); // clone for resend, will free when ack received
if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) {
log_warn("aio caching failed");
Expand Down Expand Up @@ -635,11 +636,13 @@ mqtt_batch_cb(void *arg)
nni_aio *aio = NULL;

if (nng_aio_result(&p->batch_aio) != 0) {
s->batchcnt = 0;
log_error("Batch aio error!");
return;
}
nni_mtx_lock(&s->mtx);
if (NULL == p || nni_atomic_get_bool(&p->closed)) {
s->batchcnt = 0;
nni_mtx_unlock(&s->mtx);
return;
}
Expand All @@ -661,9 +664,13 @@ mqtt_batch_cb(void *arg)
}
s->batchcnt ++;
if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) {
nni_msg_clone(msg);
s->lastpid = pid;
log_info("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg);
if (nni_timestamp() - nni_msg_get_timestamp(msg) <= 10 * NNI_SECOND) {
log_debug("NO.%d id%d Time short than 10s %ld", s->batchcnt, pid, nni_timestamp() - nni_msg_get_timestamp(msg));
goto next;
}
nni_msg_clone(msg);
log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg);
ptype = nni_mqtt_msg_get_packet_type(msg);
if (ptype == NNG_MQTT_PUBLISH)
nni_mqtt_msg_set_publish_dup(msg, true);
Expand All @@ -674,7 +681,10 @@ mqtt_batch_cb(void *arg)
} else {
nni_lmq_put(&p->send_messages, msg);
}
} else {
log_debug("NO.%d Batch sending missing", s->batchcnt);
}
next:
nni_mtx_unlock(&s->mtx);
nni_sleep_aio(s->batchtmo, &p->batch_aio);
}
Expand Down

0 comments on commit 2d9dd31

Please sign in to comment.