Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer PulsarAdmin creation to solve SpringBoot initialization problems #163

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void run() throws Exception {
PulsarConnectionFactory factory = getFactory();
String topicName = factory.getPulsarTopicName(destination);
log.info("JMS Destination {} maps to Pulsar Topic {}", destination, topicName);
PulsarAdmin pulsarAdmin = factory.getPulsarAdmin();
PulsarAdmin pulsarAdmin = factory.getPulsarAdmin().getPulsarAdmin();
try {
TopicStats stats = pulsarAdmin.topics().getStats(topicName);
Map<String, ? extends SubscriptionStats> subscriptions = stats.getSubscriptions();
Expand Down Expand Up @@ -250,7 +250,7 @@ public void run() throws Exception {
PulsarConnectionFactory factory = getFactory();
String topicName = factory.getPulsarTopicName(destination);
log.info("JMS Destination {} maps to Pulsar Topic {}", destination, topicName);
PulsarAdmin pulsarAdmin = factory.getPulsarAdmin();
PulsarAdmin pulsarAdmin = factory.getPulsarAdmin().getPulsarAdmin();
TopicStats stats = pulsarAdmin.topics().getStats(topicName);
Map<String, ? extends SubscriptionStats> subscriptions = stats.getSubscriptions();
if (subscriptions.isEmpty()) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
8 changes: 4 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TopicStats;

public interface PulsarAdminWrapper {

void close();

void createSubscription(
String fullQualifiedTopicName, String subscriptionName, MessageId earliest)
throws PulsarAdminException;

Map<String, String> getSubscriptionProperties(
String fullQualifiedTopicName, String subscriptionName) throws PulsarAdminException;

PartitionedTopicMetadata getPartitionedTopicMetadata(String fullQualifiedTopicName)
throws PulsarAdminException;

List<Message<byte[]>> peekMessages(
String fullQualifiedTopicName, String queueSubscriptionName, int i)
throws PulsarAdminException;

void deleteSubscription(String fullQualifiedTopicName, String name, boolean b)
throws PulsarAdminException;

List<String> getTopicList(String systemNamespace) throws PulsarAdminException;

List<String> getSubscriptions(String topic) throws PulsarAdminException;

<T> T getPulsarAdmin();

void createNonPartitionedTopic(String name) throws PulsarAdminException;

List<String> getPartitionedTopicList(String systemNamespace) throws PulsarAdminException;

TopicStats getStats(String fullQualifiedTopicName) throws PulsarAdminException;

void deletePartitionedTopic(
String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations)
throws PulsarAdminException;

void deleteTopic(String fullQualifiedTopicName, boolean forceDeleteTemporaryDestinations)
throws PulsarAdminException;

void createPartitionedTopic(String topicName, int i) throws PulsarAdminException;

PartitionedTopicStats getPartitionedTopicStats(String topicName, boolean b)
throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ private ConnectionConsumer buildConnectionConsumer(

private void createPulsarTemporaryTopic(String name) throws JMSException {
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
factory.getPulsarAdmin().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
Expand Down Expand Up @@ -123,7 +122,7 @@ public class PulsarConnectionFactory
private final transient List<Reader<?>> readers = new CopyOnWriteArrayList<>();

private transient PulsarClient pulsarClient;
private transient PulsarAdmin pulsarAdmin;
private transient PulsarAdminWrapper pulsarAdmin;
private transient Map<String, Object> producerConfiguration;
private transient ConsumerConfiguration defaultConsumerConfiguration;
private transient String systemNamespace = "public/default";
Expand Down Expand Up @@ -466,7 +465,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne
String brokenServiceUrl = getAndRemoveString("brokerServiceUrl", "", configurationCopy);

PulsarClient pulsarClient = null;
PulsarAdmin pulsarAdmin = null;
PulsarAdminWrapper pulsarAdmin = null;
try {

// must be the same as
Expand Down Expand Up @@ -515,17 +514,18 @@ private synchronized void ensureInitialized(String connectUsername, String conne
getAndRemoveString("tlsTrustStorePassword", "", configurationCopy);

pulsarAdmin =
PulsarAdmin.builder()
.serviceHttpUrl(webServiceUrl)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsEnableHostnameVerification)
.tlsTrustCertsFilePath(tlsTrustCertsFilePath)
.useKeyStoreTls(useKeyStoreTls)
.tlsTrustStoreType(tlsTrustStoreType)
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustStorePassword(tlsTrustStorePassword)
.authentication(authentication)
.build();
usePulsarAdmin
? RealPulsarAdminWrapperFactory.createPulsarAdmin(
webServiceUrl,
tlsAllowInsecureConnection,
tlsEnableHostnameVerification,
tlsTrustCertsFilePath,
useKeyStoreTls,
tlsTrustStoreType,
tlsTrustStorePath,
tlsTrustStorePassword,
authentication)
: null;

ClientBuilder clientBuilder =
PulsarClient.builder()
Expand Down Expand Up @@ -642,7 +642,7 @@ public synchronized PulsarClient getPulsarClient() {
return pulsarClient;
}

public synchronized PulsarAdmin getPulsarAdmin() throws jakarta.jms.IllegalStateException {
public synchronized PulsarAdminWrapper getPulsarAdmin() throws jakarta.jms.IllegalStateException {
if (!usePulsarAdmin) {
throw new jakarta.jms.IllegalStateException(
"jms.usePulsarAdmin is set to false, this feature is not available");
Expand Down Expand Up @@ -1158,7 +1158,6 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
try {
if (isUsePulsarAdmin()) {
getPulsarAdmin()
.topics()
.createSubscription(fullQualifiedTopicName, subscriptionName, MessageId.earliest);
} else {
// if we cannot use PulsarAdmin,
Expand Down Expand Up @@ -1432,9 +1431,7 @@ public String downloadServerSideFilter(
while (true) {
try {
Map<String, String> subscriptionPropertiesFromBroker =
pulsarAdmin
.topics()
.getSubscriptionProperties(fullQualifiedTopicName, subscriptionName);
pulsarAdmin.getSubscriptionProperties(fullQualifiedTopicName, subscriptionName);
if (subscriptionPropertiesFromBroker != null) {
log.debug("subscriptionPropertiesFromBroker {}", subscriptionPropertiesFromBroker);
boolean filtering = "true".equals(subscriptionPropertiesFromBroker.get("jms.filtering"));
Expand Down Expand Up @@ -1509,7 +1506,7 @@ public List<Reader<?>> createReadersForBrowser(

try {
PartitionedTopicMetadata partitionedTopicMetadata =
getPulsarAdmin().topics().getPartitionedTopicMetadata(fullQualifiedTopicName);
getPulsarAdmin().getPartitionedTopicMetadata(fullQualifiedTopicName);
List<Reader<?>> readers = new ArrayList<>();
if (partitionedTopicMetadata.partitions == 0) {
Reader<?> readerForBrowserForNonPartitionedTopic =
Expand Down Expand Up @@ -1550,7 +1547,7 @@ private Reader<?> createReaderForBrowserForNonPartitionedTopic(

// peekMessages works only for non-partitioned topics
List<Message<byte[]>> messages =
getPulsarAdmin().topics().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1);
getPulsarAdmin().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1);

MessageId seekMessageId;
if (messages.isEmpty()) {
Expand Down Expand Up @@ -1614,15 +1611,15 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
String fullQualifiedTopicName = getPulsarTopicName(destination);
log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name);
try {
pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true);
pulsarAdmin.deleteSubscription(fullQualifiedTopicName, name, true);
somethingDone = true;
} catch (PulsarAdminException.NotFoundException notFound) {
log.error("Cannot unsubscribe {} from {}: not found", name, fullQualifiedTopicName);
}
}
if (!somethingDone) {
// required for TCK, scan for all subscriptions
List<String> allTopics = pulsarAdmin.topics().getList(systemNamespace);
List<String> allTopics = pulsarAdmin.getTopicList(systemNamespace);
for (String topic : allTopics) {
if (topic.endsWith(PENDING_ACK_STORE_SUFFIX)) {
// skip Transaction related system topics
Expand All @@ -1632,7 +1629,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
log.info("Scanning topic {}", topic);
List<String> subscriptions;
try {
subscriptions = pulsarAdmin.topics().getSubscriptions(topic);
subscriptions = pulsarAdmin.getSubscriptions(topic);
log.info("Subscriptions {}", subscriptions);
} catch (PulsarAdminException.NotFoundException notFound) {
log.error("Skipping topic {}", topic);
Expand All @@ -1642,7 +1639,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
log.info("Found subscription {} ", subscription);
if (subscription.equals(name)) {
log.info("deleteSubscription topic {} name {}", topic, name);
pulsarAdmin.topics().deleteSubscription(topic, name, true);
pulsarAdmin.deleteSubscription(topic, name, true);
somethingDone = true;
}
}
Expand Down Expand Up @@ -1858,7 +1855,7 @@ PulsarClient ensureClient() throws JMSException {
return pulsarClient;
}

PulsarAdmin ensurePulsarAdmin() throws JMSException {
PulsarAdminWrapper ensurePulsarAdmin() throws JMSException {
createConnection().close();
if (pulsarAdmin == null) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public JMSDestinationMetadata describe(Destination dest) throws JMSException {

private JMSDestinationMetadata describeDestination(PulsarDestination destination)
throws JMSException {
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin();
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin();
String pulsarTopic = factory.getPulsarTopicName(destination);
String queueSubscription;
if (destination.isQueue()) {
Expand Down Expand Up @@ -335,7 +335,8 @@ public void createSubscription(
properties.put("jms.selector", selector);
}
String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
topics.createSubscription(
topicName,
subscriptionName,
Expand All @@ -358,7 +359,8 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters
destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination");

String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
boolean exists = false;
try {
PartitionedTopicMetadata partitionedTopicMetadata =
Expand Down Expand Up @@ -409,7 +411,8 @@ public void createTopic(Topic destination, int partitions) throws JMSException {
destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination");

String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
try {
PartitionedTopicMetadata partitionedTopicMetadata =
topics.getPartitionedTopicMetadata(topicName);
Expand Down Expand Up @@ -454,7 +457,8 @@ private void doUpdateSubscriptionSelector(
boolean enableFilters, String selector, String topicName, String subscriptionName)
throws JMSException, PulsarAdminException {
validateSelector(enableFilters, selector);
Topics topics = factory.ensurePulsarAdmin().topics();
PulsarAdmin pulsarAdmin = factory.ensurePulsarAdmin().getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
Map<String, String> currentProperties = new HashMap<>();
try {
currentProperties = topics.getSubscriptionProperties(topicName, subscriptionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TopicStats;

Expand All @@ -46,7 +45,7 @@ public final void delete() throws JMSException {
log.info("Deleting {}", this);
String topicName = getInternalTopicName();
String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName);
PulsarAdmin pulsarAdmin;
PulsarAdminWrapper pulsarAdmin;
try {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (IllegalStateException err) {
Expand All @@ -59,7 +58,7 @@ public final void delete() throws JMSException {
err);
return;
}
TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName);
TopicStats stats = pulsarAdmin.getStats(fullQualifiedTopicName);
log.info("Stats {}", stats);

int numConsumers =
Expand All @@ -68,25 +67,20 @@ public final void delete() throws JMSException {
throw new JMSException("Cannot delete a temporary destination with active consumers");
}

if (session
.getFactory()
.getPulsarAdmin()
.topics()
if (pulsarAdmin
.getPartitionedTopicList(session.getFactory().getSystemNamespace())
.stream()
.anyMatch(t -> t.equals(fullQualifiedTopicName))) {
session
.getFactory()
.getPulsarAdmin()
.topics()
.deletePartitionedTopic(
fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations());
} else {
session
.getFactory()
.getPulsarAdmin()
.topics()
.delete(
.deleteTopic(
fullQualifiedTopicName, session.getFactory().isForceDeleteTemporaryDestinations());
}

Expand Down
Loading
Loading