Skip to content

Commit

Permalink
[improvement] Do not throw error for Temporary Topic/Queue creation a…
Browse files Browse the repository at this point in the history
…nd deletion without admin privileges with "jms.allowTemporaryTopicWithoutAdmin" (#154)

* Added jms.allowTemporaryTopicWithoutAdmin parameter for allowing temporary topic/queue creation and deletion flow without admin privileges.

* Added tests for jms.allowTemporaryTopicWithoutAdmin parameter.

---------
  • Loading branch information
sandeep-ctds authored Nov 4, 2024
1 parent c051a55 commit 0664b59
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,11 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session);
temporaryDestinations.add(res);
return res;
Expand All @@ -926,11 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID();
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (Exception err) {
throw Utils.handleException(err);
}
createPulsarTemporaryTopic(name);
PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session);
temporaryDestinations.add(res);
return res;
Expand Down Expand Up @@ -996,6 +988,22 @@ private ConnectionConsumer buildConnectionConsumer(
return connectionConsumer;
}

private void createPulsarTemporaryTopic(String name) throws JMSException {
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
throw Utils.handleException(err);
}
}

void refreshServerSideSelectors() {
sessions.forEach(
s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class PulsarConnectionFactory
private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private transient long waitForServerStartupTimeout = 60000;
private transient boolean usePulsarAdmin = true;
private transient boolean allowTemporaryTopicWithoutAdmin = false;
private transient boolean precreateQueueSubscription = true;
private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
private transient boolean initialized;
Expand Down Expand Up @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne
this.usePulsarAdmin =
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));

this.allowTemporaryTopicWithoutAdmin =
Boolean.parseBoolean(
getAndRemoveString(
"jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy));

this.precreateQueueSubscription =
Boolean.parseBoolean(
getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy));
Expand Down Expand Up @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() {
return acknowledgeRejectedMessages;
}

public boolean isAllowTemporaryTopicWithoutAdmin() {
return allowTemporaryTopicWithoutAdmin;
}

public synchronized boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.datastax.oss.pulsar.jms;

import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.TopicStats;

Expand All @@ -44,8 +46,20 @@ public final void delete() throws JMSException {
log.info("Deleting {}", this);
String topicName = getInternalTopicName();
String fullQualifiedTopicName = session.getFactory().applySystemNamespace(topicName);
TopicStats stats =
session.getFactory().getPulsarAdmin().topics().getStats(fullQualifiedTopicName);
PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (IllegalStateException err) {
if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true",
this,
err);
return;
}
TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName);
log.info("Stats {}", stats);

int numConsumers =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

@Slf4j
public class TemporaryDestinationsNonAdminTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned")
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false");

@Test
public void allowTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
properties.put("jms.allowTemporaryTopicWithoutAdmin", "true");
useTemporaryDestinationNonAdminTest(properties, false);
}

@Test
public void forbidTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
useTemporaryDestinationNonAdminTest(properties, true);
}

@NotNull
private static Map<String, Object> getJmsProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.forceDeleteTemporaryDestinations", "true");
properties.put("jms.usePulsarAdmin", "false");
return properties;
}

private void useTemporaryDestinationNonAdminTest(
Map<String, Object> properties, boolean expectAdminErrors) throws Exception {

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession()) {
if (expectAdminErrors) {
assertThrows(JMSException.class, session::createTemporaryTopic);
return;
}
Destination clientAddress = session.createTemporaryTopic();
testProducerAndConsumer(session, clientAddress);
}
}
}
}

private static void testProducerAndConsumer(Session session, Destination clientAddress)
throws JMSException {
try (MessageProducer producerClient = session.createProducer(clientAddress)) {
// subscribe on the temporary queue
try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) {

String testMessage = "message";
// produce a message
producerClient.send(session.createTextMessage(testMessage));

// on the consumer receive the message
Message theResponse = consumerClient.receive();
assertEquals(testMessage, theResponse.getBody(String.class));
}
}
}
}

0 comments on commit 0664b59

Please sign in to comment.