diff --git a/pom.xml b/pom.xml index a60bd2fc..49f509b8 100644 --- a/pom.xml +++ b/pom.xml @@ -61,15 +61,15 @@ 1.11 5.1.0 1.7.30 - 1.2.3 5.7.1 - 3.0.0-M5 + 3.1.0 2.14.2 2.8.9 1.21 4.0.3 3.1.0 2.0 + 1.18.3 --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED @@ -214,6 +214,13 @@ snakeyaml ${snakeyaml.version} + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + @@ -320,9 +327,6 @@ limitations under the License.]]> ${test.additional.args} false - - ${project.basedir}/src/test/resources/logback-test.xml - @@ -330,9 +334,6 @@ limitations under the License.]]> ${surefire.version} false - - ${project.basedir}/src/test/resources/logback-test.xml - diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 8670b1af..d2f7c25b 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -88,23 +88,28 @@ test - ${pulsar.groupId} - pulsar-broker + org.awaitility + awaitility test - org.apache.curator - curator-test + org.mockito + mockito-core test - org.awaitility - awaitility + org.slf4j + slf4j-simple test - org.mockito - mockito-core + org.testcontainers + junit-jupiter + test + + + org.testcontainers + pulsar test diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java index d6828730..e2858488 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java @@ -20,8 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -38,36 +37,20 @@ import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Timeout(30) @Slf4j public class AcknowledgementModeTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void testAUTO_ACKNOWLEDGE() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -97,8 +80,7 @@ public void testAUTO_ACKNOWLEDGE() throws Exception { @Test public void testAUTO_ACKNOWLEDGE_ackReceipt() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); consumerConfig.put("ackReceiptEnabled", true); properties.put("consumerConfig", consumerConfig); @@ -145,8 +127,7 @@ public void onException(Message message, Exception e) {} @Test public void testADUPS_OK_ACKNOWLEDGE() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -176,8 +157,7 @@ public void testADUPS_OK_ACKNOWLEDGE() throws Exception { @Test() public void testACLIENT_ACKNOWLEDGE() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -225,8 +205,7 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception { @Test public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -290,8 +269,7 @@ public void testINDIVIDUAL_ACKNOWLEDGEWithBatchingWithoutBatchIndexAckEnabled() private void testINDIVIDUAL_ACKNOWLEDGEWithBatching(boolean batchIndexAckEnabled) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map producerConfig = new HashMap<>(); producerConfig.put("batchingEnabled", "true"); producerConfig.put("batchingMaxPublishDelayMicros", "1000000"); @@ -403,8 +381,7 @@ public void onException(Message message, Exception e) {} @Test public void testAutoNackWrongType() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext session = factory.createContext()) { Queue destination = diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java index 57bec93c..c27d90c0 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java @@ -20,8 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -40,9 +39,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.internal.util.reflection.Whitebox; @@ -50,35 +47,16 @@ @Slf4j public class BasicServerSideFilterTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - public BasicServerSideFilterTest() {} - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - (config) -> { - config.setTransactionCoordinatorEnabled(false); - config.setAllowAutoTopicCreation(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false"); static int refreshServerSideFiltersPeriod = 10; private Map buildProperties() { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.useServerSideFiltering", "true"); properties.put("jms.refreshServerSideFiltersPeriod", refreshServerSideFiltersPeriod); @@ -105,15 +83,11 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { String topicName = "topic-with-sub-" + UUID.randomUUID(); String topicName2 = "topic-with-sub-" + UUID.randomUUID(); if (numPartitions > 0) { - cluster - .getService() - .getAdminClient() - .topics() - .createPartitionedTopic(topicName, numPartitions); - - cluster - .getService() - .getAdminClient() + + pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions); + + pulsarContainer + .getAdmin() .namespaces() .setAutoTopicCreation( "public/default", @@ -123,10 +97,9 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { .allowAutoTopicCreation(true) .build()); } else { - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); - cluster - .getService() - .getAdminClient() + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); + pulsarContainer + .getAdmin() .namespaces() .setAutoTopicCreation( "public/default", @@ -144,9 +117,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { subscriptionProperties.put("jms.filtering", "true"); // create a Subscription with a selector - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription( topicName, subscriptionName, MessageId.earliest, false, subscriptionProperties); @@ -172,7 +144,7 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { } // unload the topic - cluster.getService().getAdminClient().topics().unload(topicName); + pulsarContainer.getAdmin().topics().unload(topicName); try (PulsarMessageConsumer consumer1 = session.createSharedDurableConsumer(destination, subscriptionName, null); ) { @@ -252,9 +224,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { subscriptionProperties.put("jms.selector", newSelector); subscriptionProperties.put("jms.filtering", "true"); - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .updateSubscriptionProperties(topicName, subscriptionName, subscriptionProperties); @@ -275,9 +246,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { subscriptionProperties.put("jms.selector", newSelector); subscriptionProperties.put("jms.filtering", "false"); - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .updateSubscriptionProperties(topicName, subscriptionName, subscriptionProperties); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java index bb39673a..4cff3c2b 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java @@ -17,43 +17,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; import javax.jms.Queue; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; public class ConfigurationTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void customizeProducerTest() throws Exception { Map producerConfig = new HashMap<>(); producerConfig.put("producerName", "the-name"); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("producerConfig", producerConfig); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); PulsarConnection connection = factory.createConnection(); ) { @@ -67,8 +49,7 @@ public void customizeProducerTest() throws Exception { public void customizeConsumerTest() throws Exception { Map consumerConfig = new HashMap<>(); consumerConfig.put("consumerName", "the-consumer-name"); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("consumerConfig", consumerConfig); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); PulsarConnection connection = factory.createConnection(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java index 6a19194a..e50a858b 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java @@ -20,12 +20,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -53,36 +50,17 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.internal.util.reflection.Whitebox; @Slf4j public class ConnectionConsumerTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - c -> { - c.setTransactionCoordinatorEnabled(false); - c.setEntryFilterNames(Collections.emptyList()); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_entryFilterNames", ""); private interface ConnectionConsumerBuilder { ConnectionConsumer build( @@ -189,8 +167,7 @@ private void executeTest( selector, maxMessages, maxMessagesLimitParallelism); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", true); properties.put("jms.maxMessagesLimitsParallelism", maxMessagesLimitParallelism); @@ -276,8 +253,7 @@ public void onException(Message message, Exception e) { Awaitility.await() .untilAsserted( () -> { - TopicStats stats = - cluster.getService().getAdminClient().topics().getStats(topicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); assertEquals(0, stats.getBacklogSize()); }); @@ -311,8 +287,7 @@ public void onException(Message message, Exception e) { public void testStopTimeoutWithMessageDrivenOnMessageCallbackStuck() throws Exception { long start = System.currentTimeMillis(); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", true); properties.put("jms.maxMessagesLimitsParallelism", false); // we should not be stuck even if connectionConsumerStopTimeout is 0 @@ -392,8 +367,7 @@ public ConnectionConsumer build( public void testStopTimeoutWithSpoolThreadStuck() throws Exception { long start = System.currentTimeMillis(); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", true); properties.put("jms.maxMessagesLimitsParallelism", false); properties.put("jms.connectionConsumerStopTimeout", 2000); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java index d4f6b730..5660419b 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java @@ -18,9 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; -import java.util.HashMap; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -38,37 +36,21 @@ import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class ConnectionPausedTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test @Timeout(60) public void pausedConnectionTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { // DO NOT START THE CONNECTION @@ -122,8 +104,7 @@ public void pausedConnectionTest() throws Exception { @Test @Timeout(60) public void stopConnectionMustWaitForPendingReceive() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); CountDownLatch beforeReceive = new CountDownLatch(1); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java index 6f147a73..5781a79d 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java @@ -19,9 +19,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.lang.reflect.Field; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -44,31 +43,18 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @Slf4j public class DeadLetterQueueTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir, config -> config.setTransactionCoordinatorEnabled(false)); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); @Test public void deadLetterTestForQueue() throws Exception { @@ -80,8 +66,7 @@ public void deadLetterTestForQueue() throws Exception { String queueSubscriptionName = "thesub"; String deadLetterTopic = topic + "-" + queueSubscriptionName + "-DLQ"; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.queueSubscriptionName", queueSubscriptionName); Map consumerConfig = new HashMap<>(); @@ -109,8 +94,7 @@ public void deadLetterTestForTopic() throws Exception { String topicSubscriptionName = "thesub"; String deadLetterTopic = topic + "-" + topicSubscriptionName + "-DLQ"; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); @@ -135,8 +119,7 @@ public void deadLetterConfigTest() throws Exception { String topic = "persistent://public/default/test-" + UUID.randomUUID(); String deadLetterTopic = "persistent://public/default/test-dlq-" + UUID.randomUUID(); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); @@ -249,7 +232,7 @@ public void batchingTest(int numPartitions) throws Exception { String deadLetterTopic; if (numPartitions > 0) { - cluster.getService().getAdminClient().topics().createPartitionedTopic(topic, numPartitions); + pulsarContainer.getAdmin().topics().createPartitionedTopic(topic, numPartitions); // multi:topic1,topic2.... deadLetterTopic = @@ -265,8 +248,7 @@ public void batchingTest(int numPartitions) throws Exception { } log.info("deadLetterTopic {}", deadLetterTopic); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.queueSubscriptionName", queueSubscriptionName); Map producerConfig = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSAdminTest.java index 1062e4d7..12e4d21f 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSAdminTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSAdminTest.java @@ -25,8 +25,7 @@ import com.datastax.oss.pulsar.jms.api.JMSAdmin; import com.datastax.oss.pulsar.jms.api.JMSDestinationMetadata; import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -36,42 +35,21 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; public class JMSAdminTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - c -> { - c.setAllowAutoTopicCreation(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension().withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false"); @ParameterizedTest(name = "numPartitions {0}") @ValueSource(ints = {0, 4}) public void adminApiForQueues(int numPartitions) throws Exception { Map producerConfig = new HashMap<>(); producerConfig.put("producerName", "the-name"); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("producerConfig", producerConfig); String topic = "test-" + UUID.randomUUID(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); @@ -221,8 +199,7 @@ public void adminApiForQueues(int numPartitions) throws Exception { public void adminApiForTopic(int numPartitions) throws Exception { Map producerConfig = new HashMap<>(); producerConfig.put("producerName", "the-name"); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("producerConfig", producerConfig); String topic = "test-" + UUID.randomUUID(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); @@ -377,8 +354,7 @@ public void adminApiForTopic(int numPartitions) throws Exception { @ValueSource(ints = {0, 4}) public void describeProducers(int numPartitions) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); String topic = "test-" + UUID.randomUUID(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { JMSAdmin admin = factory.getAdmin(); @@ -471,8 +447,7 @@ private static void verifyProducersMetadata( @ValueSource(ints = {0, 4}) public void describeConsumers(int numPartitions) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); String topic = "test-" + UUID.randomUUID(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { JMSAdmin admin = factory.getAdmin(); @@ -561,8 +536,7 @@ private static void verifyConsumersMetadata( private static Map buildProducerProperties( boolean priority, String priorityMapping) { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableJMSPriority", priority); properties.put("enableTransaction", true); properties.put("jms.priorityMapping", priorityMapping); @@ -571,8 +545,7 @@ private static Map buildProducerProperties( private static Map buildConsumerProperties(String name, boolean priority) { Map consumerConfig = new HashMap<>(); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("consumerConfig", consumerConfig); properties.put("jms.enableJMSPriority", priority); return properties; diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JNDITest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JNDITest.java index 4de4b3b0..ce03622c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JNDITest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JNDITest.java @@ -25,8 +25,7 @@ import com.datastax.oss.pulsar.jms.jndi.PulsarContext; import com.datastax.oss.pulsar.jms.jndi.PulsarInitialContextFactory; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -43,29 +42,14 @@ import javax.naming.InvalidNameException; import javax.naming.Name; import org.apache.pulsar.client.api.Producer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; public class JNDITest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @ParameterizedTest(name = "autoCloseConnectionFactory {0}") @ValueSource(strings = {"true", "false", ""}) @@ -74,7 +58,7 @@ public void basicJDNITest(String autoCloseConnectionFactory) throws Exception { Properties properties = new Properties(); properties.setProperty( Context.INITIAL_CONTEXT_FACTORY, PulsarInitialContextFactory.class.getName()); - properties.setProperty(Context.PROVIDER_URL, cluster.getAddress()); + properties.setProperty(Context.PROVIDER_URL, pulsarContainer.getHttpServiceUrl()); if (!autoCloseConnectionFactory.isEmpty()) { properties.setProperty( @@ -83,7 +67,8 @@ public void basicJDNITest(String autoCloseConnectionFactory) throws Exception { Map producerConfig = new HashMap<>(); producerConfig.put("producerName", "the-name"); - properties.put("webServiceUrl", cluster.getAddress()); + properties.put("webServiceUrl", pulsarContainer.getHttpServiceUrl()); + properties.put("brokerServiceUrl", pulsarContainer.getBrokerUrl()); properties.put("producerConfig", producerConfig); String queueName = "test-" + UUID.randomUUID(); @@ -153,7 +138,7 @@ public void testSharedContext(String autoCloseConnectionFactory) throws Exceptio Properties properties = new Properties(); properties.setProperty( Context.INITIAL_CONTEXT_FACTORY, PulsarInitialContextFactory.class.getName()); - properties.setProperty(Context.PROVIDER_URL, cluster.getAddress()); + properties.setProperty(Context.PROVIDER_URL, pulsarContainer.getBrokerUrl()); properties.setProperty(PulsarContext.USE_SHARED_JNDICONTEXT, "true"); properties.setProperty(PulsarContext.AUTOCLOSE_CONNECTION_FACTORY, autoCloseConnectionFactory); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/MessageListenerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/MessageListenerTest.java index 991fb4d1..cc6a60f8 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/MessageListenerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/MessageListenerTest.java @@ -23,9 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; -import java.util.HashMap; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.List; import java.util.Map; import java.util.UUID; @@ -49,37 +47,21 @@ import javax.jms.Session; import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @Slf4j public class MessageListenerTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @ParameterizedTest(name = "sessionListenersThreads {0}") @ValueSource(ints = {0, 4}) public void receiveWithListener(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -114,8 +96,7 @@ public void receiveWithListener(int sessionListenersThreads) throws Exception { @ValueSource(ints = {0, 4}) public void listenerForbiddenMethods(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -186,8 +167,7 @@ public void onMessage(Message message) { @ValueSource(ints = {0, 4}) public void multipleListenersSameSession(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -233,8 +213,7 @@ public void multipleListenersSameSession(int sessionListenersThreads) throws Exc @ValueSource(ints = {0, 4}) public void testJMSContextWithListener(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { @@ -265,8 +244,7 @@ public void testJMSContextWithListener(int sessionListenersThreads) throws Excep @ValueSource(ints = {0, 4}) public void testJMSContextWithListenerBadMethods(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -336,8 +314,7 @@ public void onMessage(Message message) { public void testJMSContextAsyncCompletionListenerBadMethods(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -413,8 +390,7 @@ public void onException(Message message, Exception e) { @ValueSource(ints = {0, 4}) public void queueSendRecvMessageListenerTest(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -451,8 +427,7 @@ public void queueSendRecvMessageListenerTest(int sessionListenersThreads) throws @ValueSource(ints = {0, 4}) public void closeConsumerOnMessageListener(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -496,8 +471,7 @@ public void onMessage(Message message) { @ValueSource(ints = {0, 4}) public void messageListenerInternalError(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -534,8 +508,7 @@ public void onMessage(Message message) { @ValueSource(ints = {0, 4}) public void closeSessionMessageListenerStops(int sessionListenersThreads) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.sessionListenersThreads", sessionListenersThreads); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); Connection connection = factory.createConnection(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java index 18220099..3cf0fa43 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java @@ -18,9 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; -import java.util.HashMap; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.Map; import java.util.UUID; import javax.jms.Connection; @@ -32,41 +30,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.policies.data.TopicStats; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class NoAutoCreateSubscriptionTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - config -> { - config.setAllowAutoTopicCreation(false); - config.setAllowAutoSubscriptionCreation(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false") + .withEnv("PULSAR_PREFIX_allowAutoSubscriptionCreation", "false"); @Test public void doNotPrecreateQueueSubscriptionTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.queueSubscriptionName", "default-sub-name"); properties.put("jms.precreateQueueSubscription", "false"); properties.put("operationTimeoutMs", "5000"); @@ -77,7 +56,7 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception { try (Session session = connection.createSession(); ) { String shortTopicName = "test-" + UUID.randomUUID(); - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(shortTopicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(shortTopicName); Queue destinationWithSubscription = session.createQueue(shortTopicName + ":sub1"); @@ -94,9 +73,8 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception { } // manually create the subscription topic:sub1 - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription(shortTopicName, "sub1", MessageId.earliest); @@ -106,8 +84,7 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception { } // verify that we have 1 subscription - TopicStats stats = - cluster.getService().getAdminClient().topics().getStats(shortTopicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(shortTopicName); log.info("Subscriptions {}", stats.getSubscriptions().keySet()); assertNotNull(stats.getSubscriptions().get("sub1")); } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java index 185cd15f..0fc474b2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java @@ -20,8 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -36,31 +35,15 @@ import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.SubscriptionType; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @Slf4j public class NoLocalTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); private static Stream combinations() { return Stream.of( @@ -75,8 +58,7 @@ private static Stream combinations() { public void sendMessageReceiveFromQueueWithNoLocal( boolean useServerSideFiltering, boolean enableBatching) throws Exception { useServerSideFiltering = false; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); @@ -122,8 +104,7 @@ public void sendMessageReceiveFromQueueWithNoLocal( public void sendMessageReceiveFromTopicWithNoLocal( boolean useServerSideFiltering, boolean enableBatching) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); @@ -170,8 +151,7 @@ public void sendMessageReceiveFromTopicWithNoLocal( public void sendMessageReceiveFromExclusiveSubscriptionWithSelector( boolean useServerSideFiltering, boolean enableBatching) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); @@ -218,8 +198,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector( @MethodSource("combinations") public void sendMessageReceiveFromSharedSubscriptionWithNoLocal( boolean useServerSideFiltering, boolean enableBatching) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); @@ -282,8 +261,7 @@ public void acknowledgeRejectedMessagesTest( boolean useServerSideFiltering, boolean enableBatching, boolean acknowledgeRejectedMessages) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java index f94d2501..df37a2f8 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java @@ -19,9 +19,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; -import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -34,40 +33,21 @@ import javax.jms.Session; import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class OverrideConsumerConfigurationTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - (config) -> { - config.setTransactionCoordinatorEnabled(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); @Test public void overrideDQLConfigurationWithJMSContext() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); PulsarJMSContext primaryContext = (PulsarJMSContext) factory.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) { @@ -119,8 +99,7 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception { @Test public void overrideDQLConfigurationWithSession() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); PulsarConnection connection = factory.createConnection(); PulsarSession primarySession = connection.createSession(Session.CLIENT_ACKNOWLEDGE)) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index aab9dfcc..2dadad43 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -20,33 +20,30 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Enumeration; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.jms.*; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -60,41 +57,30 @@ public class PriorityTest { static int LOW_PRIORITY = 4; static int HIGH_PRIORITY = 9; - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - config -> { - config.setAllowAutoTopicCreation(true); - config.setAllowAutoTopicCreationType(TopicType.PARTITIONED); - config.setDefaultNumPartitions(10); - config.setTransactionCoordinatorEnabled(false); - }); - cluster.start(); - - cluster - .getService() - .getAdminClient() - .tenants() - .createTenant( - "foo", - TenantInfo.builder() - .allowedClusters( - ImmutableSet.of(cluster.getService().getConfiguration().getClusterName())) - .build()); - cluster.getService().getAdminClient().namespaces().createNamespace(SYSTEM_NAMESPACE_OVERRIDDEN); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "partitioned") + .withEnv("PULSAR_PREFIX_defaultNumPartitions", "10") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withOnContainerReady( + new Consumer() { + @Override + @SneakyThrows + public void accept(PulsarContainerExtension pulsarContainerExtension) { + pulsarContainerExtension + .getAdmin() + .tenants() + .createTenant( + "foo", + TenantInfo.builder().allowedClusters(ImmutableSet.of("pulsar")).build()); + pulsarContainerExtension + .getAdmin() + .namespaces() + .createNamespace(SYSTEM_NAMESPACE_OVERRIDDEN); + } + }); private static Stream combinations() { return Stream.of( @@ -108,8 +94,7 @@ private static Stream combinations() { @MethodSource("combinations") public void basicTest(int numPartitions, String mapping) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableJMSPriority", true); properties.put("jms.priorityMapping", mapping); properties.put("jms.systemNamespace", SYSTEM_NAMESPACE_OVERRIDDEN); @@ -128,9 +113,8 @@ public void basicTest(int numPartitions, String mapping) throws Exception { try (Session session = connection.createSession(); ) { Queue destination = session.createQueue(topicName); - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createPartitionedTopic(factory.getPulsarTopicName(destination), numPartitions); @@ -204,8 +188,7 @@ public void basicTest(int numPartitions, String mapping) throws Exception { @ValueSource(strings = {"linear", "non-linear"}) public void basicPriorityBigBacklogTest(String mapping) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableJMSPriority", true); properties.put("jms.priorityMapping", mapping); properties.put( @@ -221,9 +204,8 @@ public void basicPriorityBigBacklogTest(String mapping) throws Exception { try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { Queue destination = session.createQueue("test-" + UUID.randomUUID()); - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createPartitionedTopic(factory.getPulsarTopicName(destination), 10); @@ -280,11 +262,7 @@ public void onException(Message message, Exception e) { if (!receivedTexts.add(msg.getText())) { String topicName = factory.getPulsarTopicName(destination); PartitionedTopicStats partitionedStats = - cluster - .getService() - .getAdminClient() - .topics() - .getPartitionedStats(topicName, true); + pulsarContainer.getAdmin().topics().getPartitionedStats(topicName, true); log.info("topicName {}", topicName); log.info( "stats {}", @@ -292,7 +270,7 @@ public void onException(Message message, Exception e) { for (int j = 0; j < 10; j++) { String partition = topicName + "-partition-" + j; PersistentTopicInternalStats internalStats = - cluster.getService().getAdminClient().topics().getInternalStats(partition); + pulsarContainer.getAdmin().topics().getInternalStats(partition); log.info("partition {}", partition); log.info( @@ -362,8 +340,7 @@ private static void verifyPriorities(List received) throws JMSException @Test public void basicPriorityMultiTopicTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableJMSPriority", true); properties.put("jms.systemNamespace", SYSTEM_NAMESPACE_OVERRIDDEN); properties.put("consumerConfig", ImmutableMap.of("receiverQueueSize", 10)); @@ -456,8 +433,7 @@ private static void testMultiTopicConsumer(Session session, int numMessages, Que @Test public void basicPriorityJMSContextTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableJMSPriority", true); properties.put("consumerConfig", ImmutableMap.of("receiverQueueSize", 10)); properties.put( diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java index abd0937e..bfe2dd33 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java @@ -18,9 +18,8 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -44,56 +43,43 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; public class PulsarInteropTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void sendFromJMSReceiveFromPulsarClientTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { try (Session session = connection.createSession(); ) { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = session.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - - try (Consumer consumer = - client.newConsumer(Schema.STRING).subscriptionName("test").topic(topic).subscribe()) { - - try (MessageProducer producer = session.createProducer(destination); ) { - TextMessage textMsg = session.createTextMessage("foo"); - textMsg.setStringProperty("JMSXGroupID", "bar"); - producer.send(textMsg); - - Message receivedMessage = consumer.receive(); - assertEquals("foo", receivedMessage.getValue()); - assertEquals("bar", receivedMessage.getKey()); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + + try (Consumer consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test") + .topic(topic) + .subscribe()) { + + try (MessageProducer producer = session.createProducer(destination); ) { + TextMessage textMsg = session.createTextMessage("foo"); + textMsg.setStringProperty("JMSXGroupID", "bar"); + producer.send(textMsg); + + Message receivedMessage = consumer.receive(); + assertEquals("foo", receivedMessage.getValue()); + assertEquals("bar", receivedMessage.getKey()); + } } } } @@ -104,29 +90,27 @@ public void sendFromJMSReceiveFromPulsarClientTest() throws Exception { @Test public void sendFromPulsarClientReceiveWithJMS() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = context.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - try (JMSConsumer consumer = context.createConsumer(destination)) { - - try (Producer producer = - client.newProducer(Schema.STRING).topic(topic).create(); ) { - producer.newMessage().value("foo").key("bar").send(); - - // the JMS client reads raw messages always as BytesMessage - BytesMessage message = (BytesMessage) consumer.receive(); - assertArrayEquals( - "foo".getBytes(StandardCharsets.UTF_8), message.getBody(byte[].class)); - assertEquals("bar", message.getStringProperty("JMSXGroupID")); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + try (JMSConsumer consumer = context.createConsumer(destination)) { + + try (Producer producer = + client.newProducer(Schema.STRING).topic(topic).create(); ) { + producer.newMessage().value("foo").key("bar").send(); + + // the JMS client reads raw messages always as BytesMessage + BytesMessage message = (BytesMessage) consumer.receive(); + assertArrayEquals( + "foo".getBytes(StandardCharsets.UTF_8), message.getBody(byte[].class)); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + } } } } @@ -136,8 +120,7 @@ public void sendFromPulsarClientReceiveWithJMS() throws Exception { @Test public void stringSchemaTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); consumerConfig.put("useSchema", true); @@ -148,19 +131,18 @@ public void stringSchemaTest() throws Exception { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = context.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - try (JMSConsumer consumer = context.createConsumer(destination)) { - try (Producer producer = - client.newProducer(Schema.STRING).topic(topic).create(); ) { - producer.newMessage().value("foo").key("bar").send(); - - // the JMS client reads Schema String as TextMessage - TextMessage message = (TextMessage) consumer.receive(); - assertEquals("foo", message.getText()); - assertEquals("bar", message.getStringProperty("JMSXGroupID")); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.STRING).topic(topic).create(); ) { + producer.newMessage().value("foo").key("bar").send(); + + // the JMS client reads Schema String as TextMessage + TextMessage message = (TextMessage) consumer.receive(); + assertEquals("foo", message.getText()); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + } } } } @@ -170,8 +152,7 @@ public void stringSchemaTest() throws Exception { @Test public void longSchemaTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); consumerConfig.put("useSchema", true); @@ -182,18 +163,18 @@ public void longSchemaTest() throws Exception { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = context.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - try (JMSConsumer consumer = context.createConsumer(destination)) { - try (Producer producer = client.newProducer(Schema.INT64).topic(topic).create(); ) { - producer.newMessage().value(23432424L).key("bar").send(); - - // the JMS client reads Schema INT64 as ObjectMessage - ObjectMessage message = (ObjectMessage) consumer.receive(); - assertEquals(23432424L, message.getObject()); - assertEquals("bar", message.getStringProperty("JMSXGroupID")); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.INT64).topic(topic).create(); ) { + producer.newMessage().value(23432424L).key("bar").send(); + + // the JMS client reads Schema INT64 as ObjectMessage + ObjectMessage message = (ObjectMessage) consumer.receive(); + assertEquals(23432424L, message.getObject()); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + } } } } @@ -216,8 +197,7 @@ static final class Pojo { @Test public void avroSchemaTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); consumerConfig.put("useSchema", true); @@ -228,42 +208,41 @@ public void avroSchemaTest() throws Exception { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = context.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - try (JMSConsumer consumer = context.createConsumer(destination)) { - try (Producer producer = - client.newProducer(Schema.AVRO(Pojo.class)).topic(topic).create(); ) { - Pojo pojo = new Pojo(); - pojo.setName("foo"); - Nested nested = new Nested(); - nested.setAge(23); - - Pojo pojo2 = new Pojo(); - pojo2.setName("foo2"); - nested.setPojo(pojo2); - - pojo.setNested(nested); - pojo.setNestedList(Arrays.asList(nested)); - producer.newMessage().value(pojo).key("bar").send(); - - // the JMS client reads Schema AVRO as TextMessage - MapMessage message = (MapMessage) consumer.receive(); - assertEquals("foo", message.getString("name")); - Map nestedValue = (Map) message.getObject("nested"); - assertEquals(23, nestedValue.get("age")); - assertEquals("bar", message.getStringProperty("JMSXGroupID")); - Map nestedPojo = (Map) nestedValue.get("pojo"); - assertEquals("foo2", nestedPojo.get("name")); - - List> nestedValueList = - (List>) message.getObject("nestedList"); - nestedValue = nestedValueList.get(0); - assertEquals(23, nestedValue.get("age")); - assertEquals("bar", message.getStringProperty("JMSXGroupID")); - nestedPojo = (Map) nestedValue.get("pojo"); - assertEquals("foo2", nestedPojo.get("name")); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.AVRO(Pojo.class)).topic(topic).create(); ) { + Pojo pojo = new Pojo(); + pojo.setName("foo"); + Nested nested = new Nested(); + nested.setAge(23); + + Pojo pojo2 = new Pojo(); + pojo2.setName("foo2"); + nested.setPojo(pojo2); + + pojo.setNested(nested); + pojo.setNestedList(Arrays.asList(nested)); + producer.newMessage().value(pojo).key("bar").send(); + + // the JMS client reads Schema AVRO as TextMessage + MapMessage message = (MapMessage) consumer.receive(); + assertEquals("foo", message.getString("name")); + Map nestedValue = (Map) message.getObject("nested"); + assertEquals(23, nestedValue.get("age")); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + Map nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + + List> nestedValueList = + (List>) message.getObject("nestedList"); + nestedValue = nestedValueList.get(0); + assertEquals(23, nestedValue.get("age")); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + } } } } @@ -273,8 +252,7 @@ public void avroSchemaTest() throws Exception { @Test public void avroKeyValueSchemaTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); consumerConfig.put("useSchema", true); @@ -285,56 +263,55 @@ public void avroKeyValueSchemaTest() throws Exception { String topic = "persistent://public/default/test-" + UUID.randomUUID(); Destination destination = context.createTopic(topic); - PulsarClient client = - cluster - .getService() - .getClient(); // do not close this client, it is internal to the broker - try (JMSConsumer consumer = context.createConsumer(destination)) { - try (Producer> producer = - client - .newProducer( - Schema.KeyValue( - Schema.AVRO(Nested.class), - Schema.AVRO(Pojo.class), - KeyValueEncodingType.INLINE)) - .topic(topic) - .create(); ) { - Pojo pojo = new Pojo(); - pojo.setName("foo"); - Nested nested = new Nested(); - nested.setAge(23); - - Pojo pojo2 = new Pojo(); - pojo2.setName("foo2"); - nested.setPojo(pojo2); - - pojo.setNested(nested); - pojo.setNestedList(Arrays.asList(nested)); - - KeyValue keyValue = new KeyValue<>(nested, pojo); - - producer.newMessage().value(keyValue).send(); - - // the JMS client reads Schema AVRO as TextMessage - MapMessage message = (MapMessage) consumer.receive(); - - Map key = (Map) message.getObject("key"); - assertEquals(23, key.get("age")); - - Map value = (Map) message.getObject("value"); - - assertEquals("foo", value.get("name")); - Map nestedValue = (Map) value.get("nested"); - assertEquals(23, nestedValue.get("age")); - Map nestedPojo = (Map) nestedValue.get("pojo"); - assertEquals("foo2", nestedPojo.get("name")); - - List> nestedValueList = - (List>) value.get("nestedList"); - nestedValue = nestedValueList.get(0); - assertEquals(23, nestedValue.get("age")); - nestedPojo = (Map) nestedValue.get("pojo"); - assertEquals("foo2", nestedPojo.get("name")); + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); ) { + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer> producer = + client + .newProducer( + Schema.KeyValue( + Schema.AVRO(Nested.class), + Schema.AVRO(Pojo.class), + KeyValueEncodingType.INLINE)) + .topic(topic) + .create(); ) { + Pojo pojo = new Pojo(); + pojo.setName("foo"); + Nested nested = new Nested(); + nested.setAge(23); + + Pojo pojo2 = new Pojo(); + pojo2.setName("foo2"); + nested.setPojo(pojo2); + + pojo.setNested(nested); + pojo.setNestedList(Arrays.asList(nested)); + + KeyValue keyValue = new KeyValue<>(nested, pojo); + + producer.newMessage().value(keyValue).send(); + + // the JMS client reads Schema AVRO as TextMessage + MapMessage message = (MapMessage) consumer.receive(); + + Map key = (Map) message.getObject("key"); + assertEquals(23, key.get("age")); + + Map value = (Map) message.getObject("value"); + + assertEquals("foo", value.get("name")); + Map nestedValue = (Map) value.get("nested"); + assertEquals(23, nestedValue.get("age")); + Map nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + + List> nestedValueList = + (List>) value.get("nestedList"); + nestedValue = nestedValueList.get(0); + assertEquals(23, nestedValue.get("age")); + nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + } } } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueryStringTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueryStringTest.java index 6b2f8540..dbe1b28e 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueryStringTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueryStringTest.java @@ -17,52 +17,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.internal.util.reflection.Whitebox; @Slf4j public class QueryStringTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - c -> { - c.setEntryFilterNames(Collections.emptyList()); - c.setTransactionCoordinatorEnabled(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_entryFilterNames", ""); @Test public void testOverrideReceiverQueueSize() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("consumerConfig", ImmutableMap.of("receiverQueueSize", 18)); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (PulsarConnection connection = factory.createConnection(); ) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java index 957c6446..5e8bd7c0 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java @@ -22,11 +22,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.ArrayList; import java.util.Enumeration; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -41,35 +39,19 @@ import javax.jms.TextMessage; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.TopicStats; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class QueueTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void sendMessageReceiveFromQueue() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -115,8 +97,7 @@ public void sendMessageReceiveFromQueue() throws Exception { @Test public void sendJMSRedeliveryCountTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -154,8 +135,7 @@ public void sendJMSRedeliveryCountTest() throws Exception { @Test public void testQueueBrowsers() throws Exception { int numMessages = 20; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", "false"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -287,7 +267,7 @@ public void testQueueBrowsers() throws Exception { // browse a brand new empty queue String name = "persistent://public/default/test-" + UUID.randomUUID(); - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(name); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(name); Queue destinationEmpty = session.createQueue(name); try (QueueBrowser browser = session.createBrowser(destinationEmpty)) { Enumeration en = browser.getEnumeration(); @@ -301,7 +281,7 @@ public void testQueueBrowsers() throws Exception { // browse a brand new empty partitioned queue String namePartitioned = "persistent://public/default/test-" + UUID.randomUUID(); - cluster.getService().getAdminClient().topics().createPartitionedTopic(namePartitioned, 4); + pulsarContainer.getAdmin().topics().createPartitionedTopic(namePartitioned, 4); Queue destinationEmptyPartitioned = session.createQueue(name); try (QueueBrowser browser = session.createBrowser(destinationEmptyPartitioned)) { Enumeration en = browser.getEnumeration(); @@ -320,8 +300,7 @@ public void testQueueBrowsers() throws Exception { @Test public void useQueueWithoutPulsarAdmin() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.usePulsarAdmin", "false"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -356,8 +335,7 @@ public void useQueueWithoutPulsarAdmin() throws Exception { @Test public void customSubscriptionName() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.queueSubscriptionName", "default-sub-name"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -395,8 +373,7 @@ public void customSubscriptionName() throws Exception { } // verify that we have 3 different subscriptions, with the expected names - TopicStats stats = - cluster.getService().getAdminClient().topics().getStats(fullTopicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(fullTopicName); log.info("Subscriptions {}", stats.getSubscriptions().keySet()); assertNotNull(stats.getSubscriptions().get("default-sub-name")); assertNotNull(stats.getSubscriptions().get("sub1")); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java index 91a2da4d..507d485c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java @@ -24,8 +24,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -43,28 +42,25 @@ import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class SelectorsTestsBase { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); private final boolean useServerSideFiltering; private final boolean enableBatching; @@ -74,24 +70,8 @@ public SelectorsTestsBase(boolean useServerSideFiltering, boolean enableBatching this.enableBatching = enableBatching; } - @BeforeAll - public void before() throws Exception { - cluster = - new PulsarCluster(tempDir, (config) -> config.setTransactionCoordinatorEnabled(false)); - cluster.start(); - } - - @AfterAll - public void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } - private Map buildProperties() { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); - + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.useServerSideFiltering", useServerSideFiltering); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); @@ -169,11 +149,7 @@ public void sendMessageReceiveFromTopicWithSelector() throws Exception { Topic destination = session.createTopic("persistent://public/default/test-" + UUID.randomUUID()); - cluster - .getService() - .getAdminClient() - .topics() - .createNonPartitionedTopic(destination.getTopicName()); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(destination.getTopicName()); try (PulsarMessageConsumer consumer1 = session.createConsumer(destination, "lastMessage=TRUE"); ) { @@ -401,11 +377,7 @@ public void onException(Message message, Exception e) { // no individuallyDeletedMessages PersistentTopicInternalStats internalStats = - cluster - .getService() - .getAdminClient() - .topics() - .getInternalStats(destination.getQueueName()); + pulsarContainer.getAdmin().topics().getInternalStats(destination.getQueueName()); assertEquals(1, internalStats.cursors.size()); ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.values().iterator().next(); @@ -761,13 +733,9 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int String topicName = "topic-with-sub-" + useServerSideFiltering + "_" + enableBatching + "_" + numPartitions; if (numPartitions > 0) { - cluster - .getService() - .getAdminClient() - .topics() - .createPartitionedTopic(topicName, numPartitions); + pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions); } else { - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); } String subscriptionName = "the-sub"; @@ -778,9 +746,8 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int subscriptionProperties.put("jms.filtering", "true"); // create a Subscription with a selector - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription( topicName, subscriptionName, MessageId.earliest, false, subscriptionProperties); @@ -875,13 +842,9 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int + "_" + numPartitions; if (numPartitions > 0) { - cluster - .getService() - .getAdminClient() - .topics() - .createPartitionedTopic(topicName, numPartitions); + pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions); } else { - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); } String subscriptionName = "the-sub"; @@ -892,9 +855,8 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int subscriptionProperties.put("jms.filtering", "true"); // create a Subscription with a selector - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription( topicName, @@ -974,11 +936,11 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int if (numPartitions == 0) { // ensure subscription exists - TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); assertNotNull(stats.getSubscriptions().get(subscriptionName)); } else { PartitionedTopicStats stats = - cluster.getService().getAdminClient().topics().getPartitionedStats(topicName, false); + pulsarContainer.getAdmin().topics().getPartitionedStats(topicName, false); assertNotNull(stats.getSubscriptions().get(subscriptionName)); } } @@ -1009,7 +971,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int String topicName = "sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueueAndAdditionalLocalSelector_" + enableBatching; - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); String subscriptionName = "the-sub"; String selectorOnSubscription = "keepme = TRUE"; @@ -1020,17 +982,18 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int subscriptionProperties.put("jms.filtering", "true"); // create a Subscription with a selector - try (Consumer dummy = - cluster - .getService() - .getClient() - .newConsumer() - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Shared) - .subscriptionMode(SubscriptionMode.Durable) - .subscriptionProperties(subscriptionProperties) - .topic(topicName) - .subscribe()) { + + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); + Consumer dummy = + client + .newConsumer() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionProperties(subscriptionProperties) + .topic(topicName) + .subscribe()) { // in 2.10 there is no PulsarAdmin API to set subscriptions properties // the only way is to create a dummy Consumer } @@ -1116,7 +1079,7 @@ public void onException(Message message, Exception e) { } // ensure subscription exists - TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); assertNotNull(stats.getSubscriptions().get(subscriptionName)); } } @@ -1146,8 +1109,7 @@ public void chunkingTest() throws Exception { ((PulsarMessageConsumer) consumer1).getSubscriptionType()); assertEquals("lastMessage=TRUE", consumer1.getMessageSelector()); - int sizeForChunking = - cluster.getService().getConfiguration().getMaxMessageSize() + 1024; + int sizeForChunking = (1024 * 1024) + 1024; String hugePayload = StringUtils.repeat("a", sizeForChunking); try (MessageProducer producer = session.createProducer(destination); ) { @@ -1191,7 +1153,7 @@ public void sendHugeFilterOnServerSideSubscription() throws Exception { properties.put("jms.enableClientSideEmulation", "false"); String topicName = "sendHugeFilterOnServerSideSubscription_" + enableBatching; - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); String subscriptionName = "the-sub"; StringBuilder huge = new StringBuilder("prop1 IN ("); @@ -1208,17 +1170,17 @@ public void sendHugeFilterOnServerSideSubscription() throws Exception { subscriptionProperties.put("jms.filtering", "true"); // create a Subscription with a selector - try (Consumer dummy = - cluster - .getService() - .getClient() - .newConsumer() - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Shared) - .subscriptionMode(SubscriptionMode.Durable) - .subscriptionProperties(subscriptionProperties) - .topic(topicName) - .subscribe()) { + try (PulsarClient client = + PulsarClient.builder().serviceUrl(pulsarContainer.getBrokerUrl()).build(); + Consumer dummy = + client + .newConsumer() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionProperties(subscriptionProperties) + .topic(topicName) + .subscribe()) { // in 2.10 there is no PulsarAdmin API to set subscriptions properties // the only way is to create a dummy Consumer } @@ -1264,7 +1226,7 @@ public void sendHugeFilterOnServerSideSubscription() throws Exception { } // ensure subscription exists - TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); assertNotNull(stats.getSubscriptions().get(subscriptionName)); } } @@ -1279,7 +1241,7 @@ public void sendHugeFilterOnConsumerMetadata() throws Exception { properties.put("jms.enableClientSideEmulation", "false"); String topicName = "sendHugeFilterOnConsumerMetadata_" + enableBatching; - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); String subscriptionName = "the-sub"; StringBuilder huge = new StringBuilder("prop1 IN ("); @@ -1330,7 +1292,7 @@ public void sendHugeFilterOnConsumerMetadata() throws Exception { } // ensure subscription exists - TopicStats stats = cluster.getService().getAdminClient().topics().getStats(topicName); + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); assertNotNull(stats.getSubscriptions().get(subscriptionName)); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java index 0b3e5628..fef0afd2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java @@ -17,13 +17,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.nio.file.Path; -import java.util.HashMap; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -32,34 +30,19 @@ import javax.jms.Queue; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class SerializableConnectionFactoryTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster(tempDir, (config) -> config.setTransactionCoordinatorEnabled(false)); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); @Test public void test() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory1 = new PulsarConnectionFactory(properties); PulsarConnectionFactory factory2 = new PulsarConnectionFactory(properties); ) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java index 159a491c..5e6a4dd9 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java @@ -22,15 +22,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.EOFException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.UUID; @@ -55,36 +53,20 @@ import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Timeout(value = 1, unit = TimeUnit.MINUTES) public class SimpleTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void sendMessageTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { try (Session session = connection.createSession(); ) { @@ -145,8 +127,7 @@ public void onException(Message message, Exception exception) { @Test public void sendMessageTestJMSContext() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { Destination destination = @@ -214,8 +195,8 @@ public void onException(Message message, Exception exception) { @Test public void sendMessageReceive() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -303,8 +284,7 @@ public void sendMessageReceive() throws Exception { @Test public void sendMessageReceiveJMSContext() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { Destination destination = @@ -361,8 +341,7 @@ public void sendMessageReceiveJMSContext() throws Exception { @Test public void sendMessageReceiveJMSContext2ìMultipleTimes() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { Destination destination = @@ -406,8 +385,7 @@ public void sendMessageReceiveJMSContext() throws Exception { @Test public void sendMessageTestWithDeliveryDelayJMSContext() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { Queue destination = @@ -430,8 +408,7 @@ public void sendMessageTestWithDeliveryDelayJMSContext() throws Exception { @Test public void createSubContextTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext()) { context.createContext(JMSContext.AUTO_ACKNOWLEDGE).close(); @@ -444,8 +421,7 @@ public void createSubContextTest() throws Exception { @Test public void tckTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext(JMSContext.AUTO_ACKNOWLEDGE)) { Destination destination = @@ -534,8 +510,7 @@ public void tckTest() throws Exception { @Test public void tckUsernamePasswordTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.tckUsername", "bob"); properties.put("jms.tckPassword", "shhhh"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -586,8 +561,7 @@ public void tckUsernamePasswordTest() throws Exception { public void systemNameSpaceTest() throws Exception { String simpleName = "test-" + UUID.randomUUID().toString(); - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.systemNamespace", "pulsar/system"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (PulsarConnection connection = factory.createConnection()) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java index c02fcb4a..4b243f05 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java @@ -19,9 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; -import java.util.HashMap; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.List; import java.util.Map; import java.util.UUID; @@ -34,35 +32,17 @@ import javax.jms.Queue; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class TemporaryDestinationsTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - (config) -> { - config.setTransactionCoordinatorEnabled(false); - config.setAllowAutoTopicCreation(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); @Test public void useTemporaryQueueTest() throws Exception { @@ -78,8 +58,7 @@ private void useTemporaryDestinationTest(Function temporar throws Exception { String temporaryDestinationName; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.forceDeleteTemporaryDestinations", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -88,7 +67,7 @@ private void useTemporaryDestinationTest(Function temporar String name = "persistent://public/default/test-" + UUID.randomUUID(); Queue serverAddress = session.createQueue(name); - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(name); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(name); try (MessageProducer producerClient = session.createProducer(serverAddress); ) { @@ -98,9 +77,8 @@ private void useTemporaryDestinationTest(Function temporar // verify topic exists assertTrue( - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .getList("public/default") .contains(temporaryDestinationName)); @@ -137,7 +115,7 @@ private void useTemporaryDestinationTest(Function temporar } } - List topics = cluster.getService().getAdminClient().topics().getList("public/default"); + List topics = pulsarContainer.getAdmin().topics().getList("public/default"); log.info("Topics {}", topics); // verify topic does not exist anymore, as it is deleted on Connection close() diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java index a1a158f6..bc127efc 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java @@ -19,8 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -33,31 +32,15 @@ import javax.jms.Topic; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.SubscriptionType; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @Slf4j public class TimeToLiveTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); private static Stream combinations() { return Stream.of( @@ -72,8 +55,7 @@ private static Stream combinations() { public void sendMessageReceiveFromQueueWithTimeToLive( boolean useServerSideFiltering, boolean enableBatching) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); @@ -146,8 +128,7 @@ public void sendMessageReceiveFromQueueWithTimeToLive( public void sendMessageReceiveFromTopicWithTimeToLive( boolean useServerSideFiltering, boolean enableBatching) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.enableClientSideEmulation", !useServerSideFiltering); properties.put("jms.useServerSideFiltering", useServerSideFiltering); Map producerConfig = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java index 51e7e76c..cb856314 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java @@ -24,8 +24,7 @@ import static org.junit.jupiter.api.Assertions.fail; import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -51,35 +50,18 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class TopicTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void sendMessageReceiveFromTopic() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); @@ -123,8 +105,7 @@ public void sendMessageReceiveFromTopic() throws Exception { @Test public void useTopicSubscriberApiWithSharedSubscription() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.clientId", "the-id"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -200,8 +181,7 @@ public void useTopicSubscriberApiWithSharedSubscription() throws Exception { @Test public void simpleDurableConsumerTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context1 = factory.createContext(); JMSContext context2 = factory.createContext()) { @@ -235,8 +215,7 @@ public void testKeySharedDurableConsumer() throws Exception { } private void testSharedDurableConsumer(SubscriptionType subscriptionType) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.topicSharedSubscriptionType", subscriptionType.name()); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -326,8 +305,7 @@ public void testKeySharedNonDurableConsumer() throws Exception { } private void testSharedNonDurableConsumer(SubscriptionType subscriptionType) throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.topicSharedSubscriptionType", subscriptionType.name()); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -403,8 +381,7 @@ private void testSharedNonDurableConsumer(SubscriptionType subscriptionType) thr @Test public void testUseKeySharedSubscriptionTypeforTopicConsumer() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.useExclusiveSubscriptionsForSimpleConsumers", "false"); properties.put("jms.topicSharedSubscriptionType", SubscriptionType.Key_Shared); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -440,8 +417,7 @@ public void testUseKeySharedSubscriptionTypeforTopicConsumer() throws Exception @Test public void testKeySharedWithBatching() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.topicSharedSubscriptionType", SubscriptionType.Key_Shared); Map producerConfig = new HashMap<>(); producerConfig.put("batcherBuilder", "KEY_BASED"); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index bda6b025..89f47c99 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -20,9 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -51,35 +50,19 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j public class TransactionsTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void sendMessageTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -115,8 +98,8 @@ public void sendMessageTest() throws Exception { @Test public void autoRollbackTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -150,8 +133,8 @@ public void autoRollbackTest() throws Exception { @Test public void rollbackProduceTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -186,8 +169,8 @@ public void rollbackProduceTest() throws Exception { @Test public void consumeTransactionTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -225,8 +208,8 @@ public void consumeTransactionTest() throws Exception { @Test public void multiCommitTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -267,8 +250,8 @@ public void multiCommitTest() throws Exception { @Test public void consumeRollbackTransactionTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); Map consumerConfig = new HashMap<>(); consumerConfig.put("ackReceiptEnabled", true); @@ -311,8 +294,8 @@ public void consumeRollbackTransactionTest() throws Exception { @Test public void consumeRollbackTransaction2Test() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); Map consumerConfig = new HashMap<>(); consumerConfig.put("ackReceiptEnabled", true); @@ -356,8 +339,8 @@ public void consumeRollbackTransaction2Test() throws Exception { public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exception { int numMessages = 10; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection(); @@ -438,8 +421,8 @@ public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exceptio public void rollbackReceivedMessages() throws Exception { int numMessages = 10; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -503,8 +486,8 @@ private static int countMessages(Session producerSession, Queue destination) thr public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception { int numMessages = 10; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection(); @@ -584,8 +567,8 @@ public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception { @Test public void sendMessageJMSContextTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (JMSContext context = factory.createContext(JMSContext.SESSION_TRANSACTED)) { @@ -632,8 +615,8 @@ public void sendMessageJMSContextTest() throws Exception { @Test public void sendMessageWithBatchingTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); Map producerConfig = new HashMap<>(); producerConfig.put("batchingEnabled", true); @@ -708,8 +691,8 @@ public void onException(Message message, Exception e) {} @Test public void emulatedTransactionsTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "false"); properties.put("jms.emulateTransactions", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -744,8 +727,8 @@ public void emulatedTransactionsTest() throws Exception { @Test public void messageListenerTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -808,8 +791,8 @@ public void onMessage(Message message) { @Test public void commitInsideMessageListenerTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { @@ -890,8 +873,8 @@ public void onMessage(Message message) { @Test public void messageListenerWithEmulatedTransactionsTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "false"); properties.put("jms.emulateTransactions", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -955,8 +938,8 @@ public void onMessage(Message message) { @Test public void consumeProduceScenario() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); properties.put("jms.clientId", "my-id"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -1015,8 +998,8 @@ public void consumeProduceScenario() throws Exception { @Test public void testMixedProducesScenario() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); properties.put("jms.clientId", "my-id"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -1090,8 +1073,8 @@ public void testMixedProducesScenario() throws Exception { @Test public void testMixedConsumersOnSharedSubscription() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); properties.put("jms.clientId", "my-id"); properties.put( @@ -1152,8 +1135,8 @@ public void testMixedConsumersOnSharedSubscription() throws Exception { @Test public void sendMessageWithPartitionStickKeyTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("enableTransaction", "true"); properties.put("jms.transactionsStickyPartitions", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java index 63de7bbb..e3a11769 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java @@ -18,9 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; -import java.nio.file.Path; -import java.util.HashMap; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import java.util.Map; import java.util.UUID; import javax.jms.Connection; @@ -29,34 +27,17 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; public class UnsubscribeTest { - - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = new PulsarCluster(tempDir); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void unsubscribeTest() throws Exception { - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (Connection connection = factory.createConnection()) { connection.start(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java index cc17d6e3..a84d2da0 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java @@ -21,10 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.datastax.oss.pulsar.jms.selectors.SelectorSupport; -import com.datastax.oss.pulsar.jms.utils.PulsarCluster; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import com.google.common.collect.ImmutableMap; import java.lang.reflect.Field; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -46,10 +45,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -57,27 +54,11 @@ @Slf4j public class VirtualDestinationsConsumerTest { - @TempDir public static Path tempDir; - private static PulsarCluster cluster; - - @BeforeAll - public static void before() throws Exception { - cluster = - new PulsarCluster( - tempDir, - serviceConfiguration -> { - serviceConfiguration.setTransactionCoordinatorEnabled(false); - serviceConfiguration.setAllowAutoTopicCreation(false); - }); - cluster.start(); - } - - @AfterAll - public static void after() throws Exception { - if (cluster != null) { - cluster.close(); - } - } + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false"); private static Stream combinationsTestMultiTopic() { return Stream.of( @@ -91,8 +72,7 @@ private static Stream combinationsTestMultiTopic() { @MethodSource("combinationsTestMultiTopic") public void testMultiTopic(int numPartitions, boolean useRegExp) throws Exception { int numMessagesPerDestination = 10; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.usePulsarAdmin", "true"); try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { @@ -176,8 +156,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue( String subscriptionName = "the-sub"; String selector = "keepme = TRUE"; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.usePulsarAdmin", "true"); properties.put("jms.useServerSideFiltering", "true"); @@ -193,19 +172,14 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue( for (int i = 0; i < 4; i++) { String topicName = prefix + "-" + i; if (numPartitions > 0) { - cluster - .getService() - .getAdminClient() - .topics() - .createPartitionedTopic(topicName, numPartitions); + pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions); } else { - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); } // create a Subscription with a selector - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription( topicName, subscriptionName, MessageId.earliest, false, subscriptionProperties); @@ -324,8 +298,7 @@ public void sendUsingExistingPulsarSubscriptionWithClientSideFilterForPartitione String subscriptionName = "the-sub"; String selector = "keepme = TRUE"; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.usePulsarAdmin", "true"); properties.put("jms.useServerSideFiltering", "false"); properties.put("jms.enableClientSideEmulation", "true"); @@ -336,19 +309,14 @@ public void sendUsingExistingPulsarSubscriptionWithClientSideFilterForPartitione for (int i = 0; i < 4; i++) { String topicName = prefix + "-" + i; if (numPartitions > 0) { - cluster - .getService() - .getAdminClient() - .topics() - .createPartitionedTopic(topicName, numPartitions); + pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions); } else { - cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName); + pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName); } // create a Subscription with a selector - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription(topicName, subscriptionName, MessageId.earliest, false); destinationsToWrite.add(new PulsarQueue(topicName)); @@ -438,8 +406,7 @@ public void sendUsingExistingPulsarSubscriptionWithClientSideFilterForPartitione @Test public void testPatternConsumerAddingTopicWithServerSideFilters() throws Exception { int numMessagesPerDestination = 10; - Map properties = new HashMap<>(); - properties.put("webServiceUrl", cluster.getAddress()); + Map properties = pulsarContainer.buildJMSConnectionProperties(); properties.put("jms.usePulsarAdmin", "true"); properties.put("jms.useServerSideFiltering", true); // discover new topics every 5 seconds @@ -495,9 +462,8 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti Map subscriptionProperties = new HashMap<>(); subscriptionProperties.put("jms.selector", "keepme=TRUE"); subscriptionProperties.put("jms.filtering", "true"); - cluster - .getService() - .getAdminClient() + pulsarContainer + .getAdmin() .topics() .createSubscription( topicName, "jms-queue", MessageId.earliest, false, subscriptionProperties); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/BookKeeperCluster.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/BookKeeperCluster.java deleted file mode 100644 index 1185b556..00000000 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/BookKeeperCluster.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.pulsar.jms.utils; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.bookie.BookieException; -import org.apache.bookkeeper.bookie.Cookie; -import org.apache.bookkeeper.client.BookKeeperAdmin; -import org.apache.bookkeeper.client.api.BookKeeper; -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.server.EmbeddedServer; -import org.apache.bookkeeper.server.conf.BookieConfiguration; -import org.apache.bookkeeper.util.PortManager; -import org.apache.bookkeeper.zookeeper.ZooKeeperClient; -import org.apache.curator.test.InstanceSpec; -import org.apache.curator.test.TestingServer; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; - -@Slf4j -public final class BookKeeperCluster implements AutoCloseable { - - TestingServer zkServer; - List bookies = new ArrayList<>(); - Map configurations = new HashMap<>(); - Path path; - - public BookKeeperCluster(Path path, int zkPort) throws Exception { - InstanceSpec spec = new InstanceSpec(path.toFile(), zkPort, -1, -1, true, -1, 4000, -1); - zkServer = new TestingServer(spec, true); - // waiting for ZK to be reachable - CountDownLatch latch = new CountDownLatch(1); - ZooKeeper zk = - new ZooKeeper( - zkServer.getConnectString(), - getTimeout(), - (WatchedEvent event) -> { - if (event.getState() == KeeperState.SyncConnected) { - latch.countDown(); - } - }); - try { - if (!latch.await(getTimeout(), TimeUnit.MILLISECONDS)) { - log.info( - "ZK client did not connect withing {0} seconds, maybe the server did not start up", - getTimeout()); - } - } finally { - zk.close(1000); - } - this.path = path; - log.info("Started ZK cluster at " + getZooKeeperAddress()); - } - - public void startBookie() throws Exception { - startBookie(true); - } - - public void startBookie(boolean format) throws Exception { - if (!bookies.isEmpty() && format) { - throw new Exception("bookie already started"); - } - ServerConfiguration conf = new ServerConfiguration(); - conf.setBookiePort(PortManager.nextFreePort()); - conf.setUseHostNameAsBookieID(true); - conf.setAllowEphemeralPorts(true); - Path targetDir = path.resolve("bookie_data_" + bookies.size()); - conf.setMetadataServiceUri(getBookKeeperMetadataURI()); - conf.setLedgerDirNames(new String[] {targetDir.toAbsolutePath().toString()}); - conf.setJournalDirName(targetDir.toAbsolutePath().toString()); - // required for chunking test, this is the default value in bookkeeper.conf in Pulsar - conf.setNettyMaxFrameSizeBytes(5253120); - conf.setFlushInterval(10000); - conf.setGcWaitTime(5); - conf.setJournalFlushWhenQueueEmpty(true); - // conf.setJournalBufferedEntriesThreshold(1); - conf.setAutoRecoveryDaemonEnabled(false); - conf.setEnableLocalTransport(true); - conf.setJournalSyncData(false); - - conf.setAllowLoopback(true); - conf.setProperty("journalMaxGroupWaitMSec", 10); // default 200ms - - try (ZooKeeperClient zkc = - ZooKeeperClient.newBuilder() - .connectString(getZooKeeperAddress()) - .sessionTimeoutMs(getTimeout()) - .build()) { - - boolean rootExists = zkc.exists(getPath(), false) != null; - - if (!rootExists) { - zkc.create(getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } - - if (format) { - BookKeeperAdmin.initNewCluster(conf); - BookKeeperAdmin.format(conf, false, true); - } - - startBookie(conf); - } - - public String getZooKeeperAddress() { - return this.zkServer.getConnectString(); - } - - public int getTimeout() { - return 40000; - } - - public String getPath() { - return "/ledgers"; - } - - @Override - public void close() throws Exception { - for (EmbeddedServer bookie : bookies) { - bookie.getLifecycleComponentStack().close(); - } - try { - if (zkServer != null) { - zkServer.close(); - } - } catch (Throwable t) { - } - } - - public String getBookKeeperMetadataURI() { - return "zk+null://" + getZooKeeperAddress() + getPath(); - } - - public BookKeeper createClient() throws Exception { - ClientConfiguration conf = new ClientConfiguration(); - conf.setEnableDigestTypeAutodetection(true); - conf.setMetadataServiceUri(getBookKeeperMetadataURI()); - return BookKeeper.newBuilder(conf).build(); - } - - public ServerConfiguration getBookieConfiguration(String bookie1) { - return configurations.get(bookie1); - } - - void startBookie(ServerConfiguration conf) throws Exception { - EmbeddedServer bookie = EmbeddedServer.builder(new BookieConfiguration(conf)).build(); - bookie.getLifecycleComponentStack().start(); - bookies.add(bookie); - configurations.put(bookie.getBookieService().getServer().getBookieId().toString(), conf); - } - - private static void stampNewCookie( - Cookie masterCookie, List journalDirectories, List allLedgerDirs) - throws BookieException, IOException { - for (File journalDirectory : journalDirectories) { - System.out.println("STAMPING NEW COOKIE on " + journalDirectory); - masterCookie.writeToDirectory(journalDirectory); - } - for (File dir : allLedgerDirs) { - System.out.println("STAMPING NEW COOKIE on " + dir); - masterCookie.writeToDirectory(dir); - } - } -} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java deleted file mode 100644 index 2ed4ef45..00000000 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.pulsar.jms.utils; - -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; -import java.util.function.Consumer; -import org.apache.bookkeeper.util.PortManager; -import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfo; - -/** Pulsar cluster. */ -public class PulsarCluster implements AutoCloseable { - private final PulsarService service; - private final BookKeeperCluster bookKeeperCluster; - - public PulsarCluster(Path tempDir) throws Exception { - this(tempDir, (config) -> {}); - } - - public PulsarCluster(Path tempDir, Consumer configurationConsumer) - throws Exception { - this.bookKeeperCluster = new BookKeeperCluster(tempDir, PortManager.nextFreePort()); - ServiceConfiguration config = new ServiceConfiguration(); - config.setZookeeperServers(bookKeeperCluster.getZooKeeperAddress()); - config.setClusterName("localhost"); - config.setManagedLedgerDefaultEnsembleSize(1); - config.setManagedLedgerDefaultWriteQuorum(1); - config.setManagedLedgerDefaultAckQuorum(1); - config.setBrokerServicePort(Optional.of(PortManager.nextFreePort())); - config.setAllowAutoTopicCreation(true); - config.setWebSocketServiceEnabled(false); - config.setBrokerDeleteInactiveTopicsEnabled(false); - config.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false); - config.setSystemTopicEnabled(true); - config.setBookkeeperNumberOfChannelsPerBookie(1); - config.setBookkeeperExplicitLacIntervalInMills(500); - config.setTransactionCoordinatorEnabled(true); - config.setBookkeeperMetadataServiceUri(bookKeeperCluster.getBookKeeperMetadataURI()); - config.setWebServicePort(Optional.of(PortManager.nextFreePort())); - config.setBookkeeperUseV2WireProtocol(false); - config.setEntryFilterNames(Arrays.asList("jms")); - config.setEntryFiltersDirectory("target/classes/filters"); - config.setAcknowledgmentAtBatchIndexLevelEnabled(true); - config.setMaxConsumerMetadataSize(1024 * 1024); - configurationConsumer.accept(config); - service = new PulsarService(config); - } - - public PulsarService getService() { - return service; - } - - public String getAddress() { - return service.getWebServiceAddress(); - } - - public void start() throws Exception { - bookKeeperCluster.startBookie(); - service.start(); - service.getAdminClient().clusters().createCluster("localhost", ClusterData.builder().build()); - service - .getAdminClient() - .tenants() - .createTenant( - "public", - TenantInfo.builder() - .adminRoles(Collections.singleton("admin")) - .allowedClusters(Collections.singleton("localhost")) - .build()); - service.getAdminClient().namespaces().createNamespace("public/default"); - - service - .getAdminClient() - .tenants() - .createTenant( - "pulsar", - TenantInfo.builder() - .adminRoles(Collections.singleton("admin")) - .allowedClusters(Collections.singleton("localhost")) - .build()); - - if (service.getConfiguration().isTransactionCoordinatorEnabled()) { - - // run initialize-transaction-coordinator-metadata - PulsarTransactionCoordinatorMetadataSetup.main( - new String[] {"-c", "localhost", "-cs", bookKeeperCluster.getZooKeeperAddress()}); - - // pre-create __transaction_buffer_snapshot for public/default namespace - service - .getAdminClient() - .topics() - .createNonPartitionedTopic("persistent://public/default/__transaction_buffer_snapshot"); - } - } - - public void close() throws Exception { - service.close(); - bookKeeperCluster.close(); - } -} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java new file mode 100644 index 00000000..66c001fb --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java @@ -0,0 +1,128 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +@Slf4j +public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback { + private PulsarContainer pulsarContainer; + private Consumer onContainerReady; + private Map env = new HashMap<>(); + + private Network network; + + private PulsarAdmin admin; + + public PulsarContainerExtension() { + env.put("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true"); + env.put("PULSAR_PREFIX_entryFiltersDirectory", "/pulsar/filters"); + env.put("PULSAR_PREFIX_entryFilterNames", "jms"); + env.put("PULSAR_PREFIX_maxConsumerMetadataSize", (1024 * 1024) + ""); + env.put("PULSAR_PREFIX_transactionCoordinatorEnabled", "true"); + env.put("PULSAR_PREFIX_brokerDeleteInactivePartitionedTopicMetadataEnabled", "false"); + env.put("PULSAR_PREFIX_brokerDeleteInactiveTopicsEnabled", "false"); + } + + @Override + public void afterAll(ExtensionContext extensionContext) { + if (admin != null) { + admin.close(); + } + if (pulsarContainer != null) { + pulsarContainer.close(); + } + if (network != null) { + network.close(); + } + } + + @Override + @SneakyThrows + public void beforeAll(ExtensionContext extensionContext) { + network = Network.newNetwork(); + pulsarContainer = + new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0")) + .withNetwork(network) + .withEnv(env) + .withLogConsumer( + outputFrame -> log.debug("pulsar> {}", outputFrame.getUtf8String().trim())) + .withCopyFileToContainer( + MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters"); + // start Pulsar and wait for it to be ready to accept requests + pulsarContainer.start(); + if (onContainerReady != null) { + onContainerReady.accept(this); + } + admin = + PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080)) + .build(); + } + + public PulsarContainerExtension withOnContainerReady( + Consumer onContainerReady) { + this.onContainerReady = onContainerReady; + return this; + } + + public PulsarContainerExtension withEnv(String key, String value) { + this.env.put(key, value); + return this; + } + + public PulsarContainerExtension withEnv(Map env) { + this.env.putAll(env); + return this; + } + + protected void onContainerReady() {} + + public String getBrokerUrl() { + return pulsarContainer.getPulsarBrokerUrl(); + } + + public String getHttpServiceUrl() { + return pulsarContainer.getHttpServiceUrl(); + } + + public PulsarContainer getPulsarContainer() { + return pulsarContainer; + } + + public Map buildJMSConnectionProperties() { + Map properties = new HashMap<>(); + properties.put("webServiceUrl", getHttpServiceUrl()); + properties.put("brokerServiceUrl", getBrokerUrl()); + return properties; + } + + public PulsarAdmin getAdmin() { + return admin; + } +}