Skip to content

Commit

Permalink
formatting change
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep-ctds committed Nov 4, 2024
1 parent dcd478c commit 8890105
Showing 1 changed file with 52 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
Expand All @@ -35,66 +35,66 @@
@Slf4j
public class TemporaryDestinationsNonAdminTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned")
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false");
@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "true")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreationType", "non-partitioned")
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false");

@Test
public void allowTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
properties.put("jms.allowTemporaryTopicWithoutAdmin", "true");
useTemporaryDestinationNonAdminTest(properties, false);
}
@Test
public void allowTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
properties.put("jms.allowTemporaryTopicWithoutAdmin", "true");
useTemporaryDestinationNonAdminTest(properties, false);
}

@Test
public void forbidTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
useTemporaryDestinationNonAdminTest(properties, true);
}
@Test
public void forbidTemporaryTopicWithoutAdminTest() throws Exception {
Map<String, Object> properties = getJmsProperties();
useTemporaryDestinationNonAdminTest(properties, true);
}

@NotNull
private static Map<String, Object> getJmsProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.forceDeleteTemporaryDestinations", "true");
properties.put("jms.usePulsarAdmin", "false");
return properties;
}
@NotNull
private static Map<String, Object> getJmsProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.forceDeleteTemporaryDestinations", "true");
properties.put("jms.usePulsarAdmin", "false");
return properties;
}

private void useTemporaryDestinationNonAdminTest(Map<String, Object> properties, boolean expectAdminErrors)
throws Exception {
private void useTemporaryDestinationNonAdminTest(
Map<String, Object> properties, boolean expectAdminErrors) throws Exception {

try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession()) {
if (expectAdminErrors) {
assertThrows(JMSException.class, session::createTemporaryTopic);
return;
}
Destination clientAddress = session.createTemporaryTopic();
testProducerAndConsumer(session, clientAddress);
}
}
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties)) {
try (Connection connection = factory.createConnection()) {
connection.start();
try (Session session = connection.createSession()) {
if (expectAdminErrors) {
assertThrows(JMSException.class, session::createTemporaryTopic);
return;
}
Destination clientAddress = session.createTemporaryTopic();
testProducerAndConsumer(session, clientAddress);
}
}
}
}

private static void testProducerAndConsumer(Session session, Destination clientAddress)
throws JMSException {
try (MessageProducer producerClient = session.createProducer(clientAddress)) {
// subscribe on the temporary queue
try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) {
private static void testProducerAndConsumer(Session session, Destination clientAddress)
throws JMSException {
try (MessageProducer producerClient = session.createProducer(clientAddress)) {
// subscribe on the temporary queue
try (MessageConsumer consumerClient = session.createConsumer(clientAddress)) {

String testMessage = "message";
// produce a message
producerClient.send(session.createTextMessage(testMessage));
String testMessage = "message";
// produce a message
producerClient.send(session.createTextMessage(testMessage));

// on the consumer receive the message
Message theResponse = consumerClient.receive();
assertEquals(testMessage, theResponse.getBody(String.class));
}
}
// on the consumer receive the message
Message theResponse = consumerClient.receive();
assertEquals(testMessage, theResponse.getBody(String.class));
}
}
}
}

0 comments on commit 8890105

Please sign in to comment.