diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java index 7975b19c..7264456c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java @@ -104,6 +104,9 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { } } + // wait for the filters to be processed in background + Thread.sleep(5000); + TextMessage textMessage = (TextMessage) consumer1.receive(); assertEquals("foo-9", textMessage.getText()); @@ -113,23 +116,30 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { // no more messages assertNull(consumer1.receiveNoWait()); - // ensure that the filter didn't reject any message while dispatching to the consumer - // because the filter has been already applied on the write path - TopicStats stats = getPulsarContainer().getAdmin().topics().getStats(topicName); - SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); - if (transacted) { - // when we enable transactions the stats are not updated correctly - // it seems that the transaction marker is counted as "processed by filters" - // but actually it is not processed by the JMSFilter at all - assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2); - assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); - assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); - session.commit(); - } else { - assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); - assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); - assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); - } + Awaitility.await() + .untilAsserted( + () -> { + // ensure that the filter didn't reject any message while dispatching to the + // consumer + // because the filter has been already applied on the write path + TopicStats stats = + getPulsarContainer().getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = + stats.getSubscriptions().get("jms-queue"); + if (transacted) { + // when we enable transactions the stats are not updated correctly + // it seems that the transaction marker is counted as "processed by filters" + // but actually it is not processed by the JMSFilter at all + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + session.commit(); + } else { + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + } + }); } // create a message that doesn't match the filter @@ -139,9 +149,15 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { TextMessage textMessage = session.createTextMessage("backlog"); producer.send(textMessage); - TopicStats stats = getPulsarContainer().getAdmin().topics().getStats(topicName); - SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); - assertEquals(0, subscriptionStats.getMsgBacklog()); + Awaitility.await() + .untilAsserted( + () -> { + TopicStats stats = + getPulsarContainer().getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = + stats.getSubscriptions().get("jms-queue"); + assertEquals(0, subscriptionStats.getMsgBacklog()); + }); if (transacted) { session.commit();