From ca695f113f81148a3054c18f9e7c112b8ff197cb Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 9 Apr 2024 15:02:21 +0200 Subject: [PATCH] Fix test and also wait for Pulsar to be ready --- .../oss/pulsar/jms/tests/DockerTest.java | 1 - .../datastax/oss/pulsar/jms/PriorityTest.java | 7 +++---- .../jms/VirtualDestinationsConsumerTest.java | 15 +++++++------- .../jms/utils/PulsarContainerExtension.java | 20 +++++++++++++++---- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java index e9b1c955..04b936e2 100644 --- a/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java +++ b/pulsar-jms-integration-tests/src/test/java/com/datastax/oss/pulsar/jms/tests/DockerTest.java @@ -31,7 +31,6 @@ import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.auth.AuthenticationToken; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import javax.jms.Connection; diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index 2dadad43..13d5d5d2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -22,7 +22,6 @@ import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashSet; @@ -69,12 +68,12 @@ public class PriorityTest { @Override @SneakyThrows public void accept(PulsarContainerExtension pulsarContainerExtension) { + Set clusters = + new HashSet<>(pulsarContainerExtension.getAdmin().clusters().getClusters()); pulsarContainerExtension .getAdmin() .tenants() - .createTenant( - "foo", - TenantInfo.builder().allowedClusters(ImmutableSet.of("pulsar")).build()); + .createTenant("foo", TenantInfo.builder().allowedClusters(clusters).build()); pulsarContainerExtension .getAdmin() .namespaces() diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java index d612d3a8..72efca81 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java @@ -43,7 +43,6 @@ import javax.jms.TextMessage; import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; import org.awaitility.Awaitility; @@ -463,10 +462,10 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti factory.getPulsarAdmin().topics().createNonPartitionedTopic(topicName); // await that the consumer session creates the subscription, then we update it Awaitility.await() - .untilAsserted(() -> { - List subs = pulsarContainer - .getAdmin() - .topics().getSubscriptions(topicName); + .untilAsserted( + () -> { + List subs = + pulsarContainer.getAdmin().topics().getSubscriptions(topicName); assertEquals(subs.size(), 1); assertTrue(subs.contains("jms-queue")); }); @@ -474,9 +473,9 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti subscriptionProperties.put("jms.selector", "keepme=TRUE"); subscriptionProperties.put("jms.filtering", "true"); pulsarContainer - .getAdmin() - .topics() - .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); + .getAdmin() + .topics() + .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); Queue newDestination = session.createQueue(topicName); TextMessage nextMessage = session.createTextMessage("new"); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java index f9124b92..1a2e2b00 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java @@ -15,8 +15,12 @@ */ package com.datastax.oss.pulsar.jms.utils; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -67,20 +71,28 @@ public void afterAll(ExtensionContext extensionContext) { @SneakyThrows public void beforeAll(ExtensionContext extensionContext) { network = Network.newNetwork(); + CountDownLatch pulsarReady = new CountDownLatch(1); pulsarContainer = new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE)) .withNetwork(network) .withEnv(env) .withLogConsumer( - outputFrame -> log.debug("pulsar> {}", outputFrame.getUtf8String().trim())) + (f) -> { + String text = f.getUtf8String().trim(); + if (text.contains("messaging service is ready")) { + pulsarReady.countDown(); + } + System.out.println(text); + }) .withCopyFileToContainer( MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters"); // start Pulsar and wait for it to be ready to accept requests pulsarContainer.start(); + assertTrue(pulsarReady.await(1, TimeUnit.MINUTES)); admin = - PulsarAdmin.builder() - .serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080)) - .build(); + PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080)) + .build(); if (onContainerReady != null) { onContainerReady.accept(this); }