diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index 593bfdf3..9ed65c20 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -912,11 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID(); - try { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); - } catch (Exception err) { - throw Utils.handleException(err); - } + createPulsarTemporaryTopic(name); PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session); temporaryDestinations.add(res); return res; @@ -926,11 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce checkNotClosed(); String name = "persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID(); - try { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); - } catch (Exception err) { - throw Utils.handleException(err); - } + createPulsarTemporaryTopic(name); PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session); temporaryDestinations.add(res); return res; @@ -996,6 +988,22 @@ private ConnectionConsumer buildConnectionConsumer( return connectionConsumer; } + private void createPulsarTemporaryTopic(String name) throws JMSException { + try { + factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); + } catch (IllegalStateException err) { + if (!factory.isAllowTemporaryTopicWithoutAdmin()) { + throw Utils.handleException(err); + } + log.warn( + "Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true", + name, + err); + } catch (Exception err) { + throw Utils.handleException(err); + } + } + void refreshServerSideSelectors() { sessions.forEach( s -> { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 8cbc0bca..df306905 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -148,6 +148,7 @@ public class PulsarConnectionFactory private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared; private transient long waitForServerStartupTimeout = 60000; private transient boolean usePulsarAdmin = true; + private transient boolean allowTemporaryTopicWithoutAdmin = false; private transient boolean precreateQueueSubscription = true; private transient int precreateQueueSubscriptionConsumerQueueSize = 0; private transient boolean initialized; @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne this.usePulsarAdmin = Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy)); + this.allowTemporaryTopicWithoutAdmin = + Boolean.parseBoolean( + getAndRemoveString( + "jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy)); + this.precreateQueueSubscription = Boolean.parseBoolean( getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy)); @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() { return acknowledgeRejectedMessages; } + public boolean isAllowTemporaryTopicWithoutAdmin() { + return allowTemporaryTopicWithoutAdmin; + } + public synchronized boolean isClosed() { return closed; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 7d766202..9015c75a 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -15,9 +15,11 @@ */ package com.datastax.oss.pulsar.jms; +import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.TopicStats; @@ -44,8 +46,20 @@ public final void delete() throws JMSException { log.info("Deleting {}", this); String topicName = getInternalTopicName(); String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName); - TopicStats stats = - session.getFactory().getPulsarAdmin().topics().getStats(fullQualifiedTopicName); + PulsarAdmin pulsarAdmin; + try { + pulsarAdmin = session.getFactory().getPulsarAdmin(); + } catch (IllegalStateException err) { + if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) { + throw Utils.handleException(err); + } + log.warn( + "Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true", + this, + err); + return; + } + TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName); log.info("Stats {}", stats); int numConsumers = diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java new file mode 100644 index 00000000..87965adc --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java @@ -0,0 +1,100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +@Slf4j +public class TemporaryDestinationsNonAdminTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true") + .withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned") + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false"); + + @Test + public void allowTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + properties.put("jms.allowTemporaryTopicWithoutAdmin", "true"); + useTemporaryDestinationNonAdminTest(properties, false); + } + + @Test + public void forbidTemporaryTopicWithoutAdminTest() throws Exception { + Map properties = getJmsProperties(); + useTemporaryDestinationNonAdminTest(properties, true); + } + + @NotNull + private static Map getJmsProperties() { + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.forceDeleteTemporaryDestinations", "true"); + properties.put("jms.usePulsarAdmin", "false"); + return properties; + } + + private void useTemporaryDestinationNonAdminTest( + Map properties, boolean expectAdminErrors) throws Exception { + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) { + try (Connection connection = factory.createConnection()) { + connection.start(); + try (Session session = connection.createSession()) { + if (expectAdminErrors) { + assertThrows(JMSException.class, session::createTemporaryTopic); + return; + } + Destination clientAddress = session.createTemporaryTopic(); + testProducerAndConsumer(session, clientAddress); + } + } + } + } + + private static void testProducerAndConsumer(Session session, Destination clientAddress) + throws JMSException { + try (MessageProducer producerClient = session.createProducer(clientAddress)) { + // subscribe on the temporary queue + try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) { + + String testMessage = "message"; + // produce a message + producerClient.send(session.createTextMessage(testMessage)); + + // on the consumer receive the message + Message theResponse = consumerClient.receive(); + assertEquals(testMessage, theResponse.getBody(String.class)); + } + } + } +}