Skip to content

Commit

Permalink
fix incorrect delete.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed May 29, 2024
1 parent 82025b8 commit dec66dc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,7 @@ void recover(final VoidCallback callback) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {

updateCursorLedgerStat(info, stat);
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -1313,6 +1314,67 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro

}

@Test
public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws Exception {
String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck";

try {
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ok.. (if test fails intermittently and namespace is already created)
}
// set enable subscription expiration.
admin.namespaces().setSubscriptionExpirationTime(namespace, 1);

String topic = "persistent://" + namespace + "/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producer.send("test".getBytes());
producer.close();

// create consumer to consume all messages
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
consumer.acknowledge(consumer.receive());

Optional<Topic> topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
assertTrue(topicOptional.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();

// wait for 1min, but consumer is still connected all the time.
// so subscription should not be deleted.
Thread.sleep(60000);
persistentTopic.checkInactiveSubscriptions();
assertTrue(persistentTopic.getSubscriptions().containsKey("sub1"));
PersistentSubscription sub = persistentTopic.getSubscription("sub1");

// shutdown pulsar ungracefully
// disable the updateLastActive method to simulate the ungraceful shutdown
ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
ManagedCursorImpl spyCursor = Mockito.spy(cursor);
doNothing().when(spyCursor).updateLastActive();
Field cursorField = PersistentSubscription.class.getDeclaredField("cursor");
cursorField.setAccessible(true);
cursorField.set(sub, spyCursor);

// restart pulsar
consumer.close();
restartBroker();

admin.lookups().lookupTopic(topic);
topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
assertTrue(topicOptional.isPresent());
persistentTopic = (PersistentTopic) topicOptional.get();
persistentTopic.checkInactiveSubscriptions();
// wait for two seconds to complete the async task
Thread.sleep(2000);

// check if subscription is still present
assertTrue(persistentTopic.getSubscriptions().containsKey("sub1"));
sub = (PersistentSubscription) persistentTopic.getSubscription("sub1");
assertNotNull(sub);
}


/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.
Expand Down

0 comments on commit dec66dc

Please sign in to comment.