Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug Session#resendInflightNotAcked() fails due to handling a freed ByteBuf #464

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publi
sub.getClientId(), sub.getTopicFilter(), qos);
// we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0
ByteBuf payload = origPayload.retainedDuplicate();
// without this retain(), Session.resendInflightNotAcked() fails because it tries to handle a freed ByteBuf
payload.retain();
targetSession.sendPublishOnSessionAtQos(topic, qos, payload);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
Expand Down
5 changes: 4 additions & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage message = inflightWindow.remove(ackPacketId);
if (message instanceof SessionRegistry.PublishedMessage) {
((SessionRegistry.PublishedMessage) message).payload.release();
}
inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.awaitility.Awaitility;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -159,4 +162,46 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup
m_willSubscriber.disconnect();
}

@Test
public void testResendNotAckedPublishes()
throws MqttException, InterruptedException
{
LOG.info("*** testResendNotAckedPublishes ***");
String topic = "/test";

MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber");
MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher");

try {
subscriber.connect();
publisher.connect();

AtomicBoolean isFirst = new AtomicBoolean(true);
CountDownLatch countDownLatch = new CountDownLatch(2);
subscriber.subscribe(topic, 1, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage message)
throws Exception
{
if (isFirst.getAndSet(false)) {
// wait to trigger resending PUBLISH
TimeUnit.SECONDS.sleep(12);
}
countDownLatch.countDown();
}
});

publisher.publish(topic, "hello".getBytes(), 1, false);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}
finally {
try {
subscriber.disconnect();
}
finally {
publisher.disconnect();
}
}
}

}