Skip to content

Commit

Permalink
Fix flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 13, 2024
1 parent c7754ee commit f99955c
Showing 1 changed file with 36 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception {
}
}

// wait for the filters to be processed in background
Thread.sleep(5000);

TextMessage textMessage = (TextMessage) consumer1.receive();
assertEquals("foo-9", textMessage.getText());

Expand All @@ -113,23 +116,30 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception {
// no more messages
assertNull(consumer1.receiveNoWait());

// ensure that the filter didn't reject any message while dispatching to the consumer
// because the filter has been already applied on the write path
TopicStats stats = getPulsarContainer().getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue");
if (transacted) {
// when we enable transactions the stats are not updated correctly
// it seems that the transaction marker is counted as "processed by filters"
// but actually it is not processed by the JMSFilter at all
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
session.commit();
} else {
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
}
Awaitility.await()
.untilAsserted(
() -> {
// ensure that the filter didn't reject any message while dispatching to the
// consumer
// because the filter has been already applied on the write path
TopicStats stats =
getPulsarContainer().getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats =
stats.getSubscriptions().get("jms-queue");
if (transacted) {
// when we enable transactions the stats are not updated correctly
// it seems that the transaction marker is counted as "processed by filters"
// but actually it is not processed by the JMSFilter at all
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
session.commit();
} else {
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
}
});
}

// create a message that doesn't match the filter
Expand All @@ -139,9 +149,15 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception {
TextMessage textMessage = session.createTextMessage("backlog");
producer.send(textMessage);

TopicStats stats = getPulsarContainer().getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue");
assertEquals(0, subscriptionStats.getMsgBacklog());
Awaitility.await()
.untilAsserted(
() -> {
TopicStats stats =
getPulsarContainer().getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats =
stats.getSubscriptions().get("jms-queue");
assertEquals(0, subscriptionStats.getMsgBacklog());
});

if (transacted) {
session.commit();
Expand Down

0 comments on commit f99955c

Please sign in to comment.