diff --git a/pulsar-jms-cli/src/main/java/com/datastax/oss/pulsar/jms/cli/Main.java b/pulsar-jms-cli/src/main/java/com/datastax/oss/pulsar/jms/cli/Main.java index 85998385..9a66e160 100644 --- a/pulsar-jms-cli/src/main/java/com/datastax/oss/pulsar/jms/cli/Main.java +++ b/pulsar-jms-cli/src/main/java/com/datastax/oss/pulsar/jms/cli/Main.java @@ -204,7 +204,7 @@ public void run() throws Exception { PulsarConnectionFactory factory = getFactory(); String topicName = factory.getPulsarTopicName(destination); log.info("JMS Destination {} maps to Pulsar Topic {}", destination, topicName); - PulsarAdmin pulsarAdmin = factory.getPulsarAdmin(); + PulsarAdmin pulsarAdmin = factory.getPulsarAdmin().getPulsarAdmin(); try { TopicStats stats = pulsarAdmin.topics().getStats(topicName); Map subscriptions = stats.getSubscriptions(); @@ -250,7 +250,7 @@ public void run() throws Exception { PulsarConnectionFactory factory = getFactory(); String topicName = factory.getPulsarTopicName(destination); log.info("JMS Destination {} maps to Pulsar Topic {}", destination, topicName); - PulsarAdmin pulsarAdmin = factory.getPulsarAdmin(); + PulsarAdmin pulsarAdmin = factory.getPulsarAdmin().getPulsarAdmin(); TopicStats stats = pulsarAdmin.topics().getStats(topicName); Map subscriptions = stats.getSubscriptions(); if (subscriptions.isEmpty()) { diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index d381f4cf..15a86e7c 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index d9b6b1a4..9d953203 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -155,10 +155,10 @@ copy filters - - - - + + + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarAdminWrapper.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarAdminWrapper.java new file mode 100644 index 00000000..e039e8ac --- /dev/null +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarAdminWrapper.java @@ -0,0 +1,71 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.TopicStats; + +public interface PulsarAdminWrapper { + + void close(); + + void createSubscription( + String fullQualifiedTopicName, String subscriptionName, MessageId earliest) + throws PulsarAdminException; + + Map getSubscriptionProperties( + String fullQualifiedTopicName, String subscriptionName) throws PulsarAdminException; + + PartitionedTopicMetadata getPartitionedTopicMetadata(String fullQualifiedTopicName) + throws PulsarAdminException; + + List> peekMessages( + String fullQualifiedTopicName, String queueSubscriptionName, int i) + throws PulsarAdminException; + + void deleteSubscription(String fullQualifiedTopicName, String name, boolean b) + throws PulsarAdminException; + + List getTopicList(String systemNamespace) throws PulsarAdminException; + + List getSubscriptions(String topic) throws PulsarAdminException; + + T getPulsarAdmin(); + + void createNonPartitionedTopic(String name) throws PulsarAdminException; + + List getPartitionedTopicList(String systemNamespace) throws PulsarAdminException; + + TopicStats getStats(String fullQualifiedTopicName) throws PulsarAdminException; + + void deletePartitionedTopic( + String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations) + throws PulsarAdminException; + + void deleteTopic(String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations) + throws PulsarAdminException; + + void createPartitionedTopic(String topicName, int i) throws PulsarAdminException; + + PartitionedTopicStats getPartitionedTopicStats(String topicName, boolean b) + throws PulsarAdminException; +} diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index cd83bc3c..d5e67d71 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -990,7 +990,7 @@ private ConnectionConsumer buildConnectionConsumer( private void createPulsarTemporaryTopic(String name) throws JMSException { try { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); + factory.getPulsarAdmin().createNonPartitionedTopic(name); } catch (IllegalStateException err) { if (!factory.isAllowTemporaryTopicWithoutAdmin()) { throw Utils.handleException(err); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 326bab0a..abf0c80f 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -68,7 +68,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -123,7 +122,7 @@ public class PulsarConnectionFactory private final transient List> readers = new CopyOnWriteArrayList<>(); private transient PulsarClient pulsarClient; - private transient PulsarAdmin pulsarAdmin; + private transient PulsarAdminWrapper pulsarAdmin; private transient Map producerConfiguration; private transient ConsumerConfiguration defaultConsumerConfiguration; private transient String systemNamespace = "public/default"; @@ -466,7 +465,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne String brokenServiceUrl = getAndRemoveString("brokerServiceUrl", "", configurationCopy); PulsarClient pulsarClient = null; - PulsarAdmin pulsarAdmin = null; + PulsarAdminWrapper pulsarAdmin = null; try { // must be the same as @@ -515,17 +514,18 @@ private synchronized void ensureInitialized(String connectUsername, String conne getAndRemoveString("tlsTrustStorePassword", "", configurationCopy); pulsarAdmin = - PulsarAdmin.builder() - .serviceHttpUrl(webServiceUrl) - .allowTlsInsecureConnection(tlsAllowInsecureConnection) - .enableTlsHostnameVerification(tlsEnableHostnameVerification) - .tlsTrustCertsFilePath(tlsTrustCertsFilePath) - .useKeyStoreTls(useKeyStoreTls) - .tlsTrustStoreType(tlsTrustStoreType) - .tlsTrustStorePath(tlsTrustStorePath) - .tlsTrustStorePassword(tlsTrustStorePassword) - .authentication(authentication) - .build(); + usePulsarAdmin + ? RealPulsarAdminWrapperFactory.createPulsarAdmin( + webServiceUrl, + tlsAllowInsecureConnection, + tlsEnableHostnameVerification, + tlsTrustCertsFilePath, + useKeyStoreTls, + tlsTrustStoreType, + tlsTrustStorePath, + tlsTrustStorePassword, + authentication) + : null; ClientBuilder clientBuilder = PulsarClient.builder() @@ -642,7 +642,7 @@ public synchronized PulsarClient getPulsarClient() { return pulsarClient; } - public synchronized PulsarAdmin getPulsarAdmin() throws jakarta.jms.IllegalStateException { + public synchronized PulsarAdminWrapper getPulsarAdmin() throws jakarta.jms.IllegalStateException { if (!usePulsarAdmin) { throw new jakarta.jms.IllegalStateException( "jms.usePulsarAdmin is set to false, this feature is not available"); @@ -1158,7 +1158,6 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc try { if (isUsePulsarAdmin()) { getPulsarAdmin() - .topics() .createSubscription(fullQualifiedTopicName, subscriptionName, MessageId.earliest); } else { // if we cannot use PulsarAdmin, @@ -1432,9 +1431,7 @@ public String downloadServerSideFilter( while (true) { try { Map subscriptionPropertiesFromBroker = - pulsarAdmin - .topics() - .getSubscriptionProperties(fullQualifiedTopicName, subscriptionName); + pulsarAdmin.getSubscriptionProperties(fullQualifiedTopicName, subscriptionName); if (subscriptionPropertiesFromBroker != null) { log.debug("subscriptionPropertiesFromBroker {}", subscriptionPropertiesFromBroker); boolean filtering = "true".equals(subscriptionPropertiesFromBroker.get("jms.filtering")); @@ -1509,7 +1506,7 @@ public List> createReadersForBrowser( try { PartitionedTopicMetadata partitionedTopicMetadata = - getPulsarAdmin().topics().getPartitionedTopicMetadata(fullQualifiedTopicName); + getPulsarAdmin().getPartitionedTopicMetadata(fullQualifiedTopicName); List> readers = new ArrayList<>(); if (partitionedTopicMetadata.partitions == 0) { Reader readerForBrowserForNonPartitionedTopic = @@ -1550,7 +1547,7 @@ private Reader createReaderForBrowserForNonPartitionedTopic( // peekMessages works only for non-partitioned topics List> messages = - getPulsarAdmin().topics().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1); + getPulsarAdmin().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1); MessageId seekMessageId; if (messages.isEmpty()) { @@ -1614,7 +1611,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) String fullQualifiedTopicName = getPulsarTopicName(destination); log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name); try { - pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true); + pulsarAdmin.deleteSubscription(fullQualifiedTopicName, name, true); somethingDone = true; } catch (PulsarAdminException.NotFoundException notFound) { log.error("Cannot unsubscribe {} from {}: not found", name, fullQualifiedTopicName); @@ -1622,7 +1619,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) } if (!somethingDone) { // required for TCK, scan for all subscriptions - List allTopics = pulsarAdmin.topics().getList(systemNamespace); + List allTopics = pulsarAdmin.getTopicList(systemNamespace); for (String topic : allTopics) { if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) { // skip Transaction related system topics @@ -1632,7 +1629,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) log.info("Scanning topic {}", topic); List subscriptions; try { - subscriptions = pulsarAdmin.topics().getSubscriptions(topic); + subscriptions = pulsarAdmin.getSubscriptions(topic); log.info("Subscriptions {}", subscriptions); } catch (PulsarAdminException.NotFoundException notFound) { log.error("Skipping topic {}", topic); @@ -1642,7 +1639,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) log.info("Found subscription {} ", subscription); if (subscription.equals(name)) { log.info("deleteSubscription topic {} name {}", topic, name); - pulsarAdmin.topics().deleteSubscription(topic, name, true); + pulsarAdmin.deleteSubscription(topic, name, true); somethingDone = true; } } @@ -1858,7 +1855,7 @@ PulsarClient ensureClient() throws JMSException { return pulsarClient; } - PulsarAdmin ensurePulsarAdmin() throws JMSException { + PulsarAdminWrapper ensurePulsarAdmin() throws JMSException { createConnection().close(); if (pulsarAdmin == null) { throw new IllegalStateException( diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java index 68d411f6..080c45e9 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java @@ -109,7 +109,7 @@ public JMSDestinationMetadata describe(Destination dest) throws JMSException { private JMSDestinationMetadata describeDestination(PulsarDestination destination) throws JMSException { - PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin(); + PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin(); String pulsarTopic = factory.getPulsarTopicName(destination); String queueSubscription; if (destination.isQueue()) { @@ -335,7 +335,8 @@ public void createSubscription( properties.put("jms.selector", selector); } String topicName = factory.getPulsarTopicName(dest); - Topics topics = factory.ensurePulsarAdmin().topics(); + PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin(); + Topics topics = pulsarAdmin.topics(); topics.createSubscription( topicName, subscriptionName, @@ -358,7 +359,8 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); String topicName = factory.getPulsarTopicName(dest); - Topics topics = factory.ensurePulsarAdmin().topics(); + PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin(); + Topics topics = pulsarAdmin.topics(); boolean exists = false; try { PartitionedTopicMetadata partitionedTopicMetadata = @@ -409,7 +411,8 @@ public void createTopic(Topic destination, int partitions) throws JMSException { destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); String topicName = factory.getPulsarTopicName(dest); - Topics topics = factory.ensurePulsarAdmin().topics(); + PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin(); + Topics topics = pulsarAdmin.topics(); try { PartitionedTopicMetadata partitionedTopicMetadata = topics.getPartitionedTopicMetadata(topicName); @@ -454,7 +457,8 @@ private void doUpdateSubscriptionSelector( boolean enableFilters, String selector, String topicName, String subscriptionName) throws JMSException, PulsarAdminException { validateSelector(enableFilters, selector); - Topics topics = factory.ensurePulsarAdmin().topics(); + PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin(); + Topics topics = pulsarAdmin.topics(); Map currentProperties = new HashMap<>(); try { currentProperties = topics.getSubscriptionProperties(topicName, subscriptionName); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 4c4321b6..515abd3a 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -19,7 +19,6 @@ import jakarta.jms.InvalidDestinationException; import jakarta.jms.JMSException; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.TopicStats; @@ -46,7 +45,7 @@ public final void delete() throws JMSException { log.info("Deleting {}", this); String topicName = getInternalTopicName(); String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName); - PulsarAdmin pulsarAdmin; + PulsarAdminWrapper pulsarAdmin; try { pulsarAdmin = session.getFactory().getPulsarAdmin(); } catch (IllegalStateException err) { @@ -59,7 +58,7 @@ public final void delete() throws JMSException { err); return; } - TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName); + TopicStats stats = pulsarAdmin.getStats(fullQualifiedTopicName); log.info("Stats {}", stats); int numConsumers = @@ -68,25 +67,20 @@ public final void delete() throws JMSException { throw new JMSException("Cannot delete a temporary destination with active consumers"); } - if (session - .getFactory() - .getPulsarAdmin() - .topics() + if (pulsarAdmin .getPartitionedTopicList(session.getFactory().getSystemNamespace()) .stream() .anyMatch(t -> t.equals(fullQualifiedTopicName))) { session .getFactory() .getPulsarAdmin() - .topics() .deletePartitionedTopic( fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations()); } else { session .getFactory() .getPulsarAdmin() - .topics() - .delete( + .deleteTopic( fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations()); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/RealPulsarAdminWrapperFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/RealPulsarAdminWrapperFactory.java new file mode 100644 index 00000000..fd800985 --- /dev/null +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/RealPulsarAdminWrapperFactory.java @@ -0,0 +1,183 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import java.util.List; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.TopicStats; + +public class RealPulsarAdminWrapperFactory { + + public static PulsarAdminWrapper createPulsarAdmin( + String webServiceUrl, + boolean tlsAllowInsecureConnection, + boolean tlsEnableHostnameVerification, + String tlsTrustCertsFilePath, + boolean useKeyStoreTls, + String tlsTrustStoreType, + String tlsTrustStorePath, + String tlsTrustStorePassword, + Authentication authentication) + throws PulsarClientException { + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(PulsarAdmin.class.getClassLoader()); + return new RealPulsarAdminWrapper( + webServiceUrl, + tlsAllowInsecureConnection, + tlsEnableHostnameVerification, + tlsTrustCertsFilePath, + useKeyStoreTls, + tlsTrustStoreType, + tlsTrustStorePath, + tlsTrustStorePassword, + authentication); + } finally { + Thread.currentThread().setContextClassLoader(classLoader); + } + } + + private static class RealPulsarAdminWrapper implements PulsarAdminWrapper { + private final PulsarAdmin pulsarAdmin; + + private RealPulsarAdminWrapper( + String webServiceUrl, + boolean tlsAllowInsecureConnection, + boolean tlsEnableHostnameVerification, + String tlsTrustCertsFilePath, + boolean useKeyStoreTls, + String tlsTrustStoreType, + String tlsTrustStorePath, + String tlsTrustStorePassword, + Authentication authentication) + throws PulsarClientException { + this.pulsarAdmin = + PulsarAdmin.builder() + .serviceHttpUrl(webServiceUrl) + .allowTlsInsecureConnection(tlsAllowInsecureConnection) + .enableTlsHostnameVerification(tlsEnableHostnameVerification) + .tlsTrustCertsFilePath(tlsTrustCertsFilePath) + .useKeyStoreTls(useKeyStoreTls) + .tlsTrustStoreType(tlsTrustStoreType) + .tlsTrustStorePath(tlsTrustStorePath) + .tlsTrustStorePassword(tlsTrustStorePassword) + .authentication(authentication) + .build(); + } + + @Override + public void close() { + pulsarAdmin.close(); + } + + @Override + public void createSubscription( + String fullQualifiedTopicName, + String subscriptionName, + org.apache.pulsar.client.api.MessageId earliest) + throws PulsarAdminException { + pulsarAdmin.topics().createSubscription(fullQualifiedTopicName, subscriptionName, earliest); + } + + @Override + public java.util.Map getSubscriptionProperties( + String fullQualifiedTopicName, String subscriptionName) throws PulsarAdminException { + return pulsarAdmin + .topics() + .getSubscriptionProperties(fullQualifiedTopicName, subscriptionName); + } + + @Override + public org.apache.pulsar.common.partition.PartitionedTopicMetadata getPartitionedTopicMetadata( + String fullQualifiedTopicName) throws PulsarAdminException { + return pulsarAdmin.topics().getPartitionedTopicMetadata(fullQualifiedTopicName); + } + + @Override + public java.util.List> peekMessages( + String fullQualifiedTopicName, String queueSubscriptionName, int i) + throws PulsarAdminException { + return pulsarAdmin.topics().peekMessages(fullQualifiedTopicName, queueSubscriptionName, i); + } + + @Override + public void deleteSubscription(String fullQualifiedTopicName, String name, boolean b) + throws PulsarAdminException { + pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, b); + } + + @Override + public java.util.List getTopicList(String systemNamespace) throws PulsarAdminException { + return pulsarAdmin.topics().getList(systemNamespace); + } + + @Override + public java.util.List getSubscriptions(String topic) throws PulsarAdminException { + return pulsarAdmin.topics().getSubscriptions(topic); + } + + @Override + public void createNonPartitionedTopic(String name) throws PulsarAdminException { + pulsarAdmin.topics().createNonPartitionedTopic(name); + } + + @Override + public List getPartitionedTopicList(String systemNamespace) + throws PulsarAdminException { + return pulsarAdmin.topics().getPartitionedTopicList(systemNamespace); + } + + @Override + public TopicStats getStats(String fullQualifiedTopicName) throws PulsarAdminException { + return pulsarAdmin.topics().getStats(fullQualifiedTopicName); + } + + @Override + public void deletePartitionedTopic( + String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations) + throws PulsarAdminException { + pulsarAdmin + .topics() + .deletePartitionedTopic(fullQualifiedTopicName, forceDeleteTemporaryDestinations); + } + + @Override + public void deleteTopic(String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations) + throws PulsarAdminException { + pulsarAdmin.topics().delete(fullQualifiedTopicName, forceDeleteTemporaryDestinations); + } + + @Override + public void createPartitionedTopic(String topicName, int i) throws PulsarAdminException { + pulsarAdmin.topics().createPartitionedTopic(topicName, i); + } + + @Override + public PartitionedTopicStats getPartitionedTopicStats(String topicName, boolean b) + throws PulsarAdminException { + return pulsarAdmin.topics().getPartitionedStats(topicName, b); + } + + public T getPulsarAdmin() { + return (T) pulsarAdmin; + } + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java index 8ea68997..cc18bf3c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java @@ -32,9 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; @@ -183,11 +181,10 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { // PreconditionFailedException - PulsarAdmin original = factory.getPulsarAdmin(); - PulsarAdmin mockPulsarAdmin = mock(PulsarAdmin.class); - Topics topics = mock(Topics.class); + PulsarAdminWrapper original = factory.getPulsarAdmin(); + PulsarAdminWrapper mockPulsarAdmin = mock(PulsarAdminWrapper.class); AtomicBoolean done = new AtomicBoolean(); - when(topics.getSubscriptionProperties(anyString(), anyString())) + when(mockPulsarAdmin.getSubscriptionProperties(anyString(), anyString())) .thenAnswer( i -> { done.set(true); @@ -197,7 +194,6 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception { throw new PulsarAdminException.PreconditionFailedException( new Exception(), "", 404); }); - when(mockPulsarAdmin.topics()).thenReturn(topics); Whitebox.setInternalState(factory, "pulsarAdmin", mockPulsarAdmin); try (PulsarMessageConsumer consumer1 = diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java index 6f51d12e..edbaad15 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java @@ -176,7 +176,7 @@ void testManyMessagesWithPartitions() throws Exception { try (PulsarConnection connection = factory.createConnection()) { connection.start(); try (PulsarSession session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { - factory.getPulsarAdmin().topics().createPartitionedTopic(topicName, 20); + factory.getPulsarAdmin().createPartitionedTopic(topicName, 20); Queue destination = session.createQueue(topicName); @@ -227,7 +227,7 @@ void testManyMessagesWithPartitions() throws Exception { .untilAsserted( () -> { PartitionedTopicStats partitionedInternalStats = - factory.getPulsarAdmin().topics().getPartitionedStats(topicName, true); + factory.getPulsarAdmin().getPartitionedTopicStats(topicName, true); AtomicLong sum = new AtomicLong(); partitionedInternalStats .getPartitions() diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java index f163d4b7..34bf69ab 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsNonAdminTest.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; -import java.util.Map; import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.JMSException; @@ -27,6 +26,7 @@ import jakarta.jms.MessageConsumer; import jakarta.jms.MessageProducer; import jakarta.jms.Session; +import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index 6ae989e9..a53b63f9 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -1224,7 +1224,7 @@ public void sendMessageWithPartitionStickKeyTest() throws Exception { connection.start(); // create a topic with 4 partitions - factory.getPulsarAdmin().topics().createPartitionedTopic(topicName, 4); + factory.getPulsarAdmin().createPartitionedTopic(topicName, 4); try (Session consumerSession = connection.createSession(); ) { Destination destination = consumerSession.createQueue(topicName); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java index fe5862d7..98246acd 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java @@ -87,9 +87,9 @@ public void testMultiTopic(int numPartitions, boolean useRegExp) throws Exceptio for (int i = 0; i < 4; i++) { String topicName = "test-" + prefix + "-" + i; if (numPartitions > 0) { - factory.getPulsarAdmin().topics().createPartitionedTopic(topicName, numPartitions); + factory.getPulsarAdmin().createPartitionedTopic(topicName, numPartitions); } else { - factory.getPulsarAdmin().topics().createNonPartitionedTopic(topicName); + factory.getPulsarAdmin().createNonPartitionedTopic(topicName); } destinationsToWrite.add(session.createTopic(topicName)); } @@ -424,7 +424,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti List destinationsToWrite = new ArrayList<>(); for (int i = 0; i < 4; i++) { String topicName = "test-" + prefix + "-" + nextDestinationId; - factory.getPulsarAdmin().topics().createNonPartitionedTopic(topicName); + factory.getPulsarAdmin().createNonPartitionedTopic(topicName); destinationsToWrite.add(session.createTopic(topicName)); nextDestinationId = i + 1; } @@ -459,7 +459,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti // add a new topic matching the pattern // the new topic has server side filters on the jms-queue subscription String topicName = "test-" + prefix + "-" + nextDestinationId; - factory.getPulsarAdmin().topics().createNonPartitionedTopic(topicName); + factory.getPulsarAdmin().createNonPartitionedTopic(topicName); // await that the consumer session creates the subscription, then we update it Awaitility.await() .untilAsserted( diff --git a/tck-executor/src/main/java/com/datastax/oss/pulsar/jms/tests/JNDIInitialContextFactory.java b/tck-executor/src/main/java/com/datastax/oss/pulsar/jms/tests/JNDIInitialContextFactory.java index 95c9f679..23dcb6d7 100644 --- a/tck-executor/src/main/java/com/datastax/oss/pulsar/jms/tests/JNDIInitialContextFactory.java +++ b/tck-executor/src/main/java/com/datastax/oss/pulsar/jms/tests/JNDIInitialContextFactory.java @@ -28,6 +28,7 @@ import javax.naming.Context; import javax.naming.NamingException; import javax.naming.spi.InitialContextFactory; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; public class JNDIInitialContextFactory implements InitialContextFactory { @@ -128,7 +129,8 @@ public Object lookup(String name) throws Exception { PulsarConnectionFactory tmp = getAdminConnectionFactory(); System.out.println(this.getClass() + " Cleaning up QUEUE " + topicName); try { - tmp.getPulsarAdmin().topics().delete(topicName, true, true); + PulsarAdmin pulsarAdmin = tmp.getPulsarAdmin().getPulsarAdmin(); + pulsarAdmin.topics().delete(topicName, true, true); } catch (PulsarAdminException.NotFoundException ok) { // Since Pulsar 3.0 we get a 404 when deleting a non existing topic System.out.println(" Cleaning up QUEUE " + topicName + " - not found, ignoring");