diff --git a/pom.xml b/pom.xml
index 29afd8c5..d227aacf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
1.7.30
1.2.3
5.7.1
- 3.0.0-M5
+ 3.1.0
2.14.2
2.8.9
1.21
@@ -316,7 +316,16 @@ limitations under the License.]]>
maven-surefire-plugin
${surefire.version}
- ${test.additional.args}
+ 1
+
+ -XX:+ExitOnOutOfMemoryError -Xmx2G -XX:+UseZGC
+ -Dpulsar.allocator.pooled=true
+ -Dpulsar.allocator.leak_detection=Advanced
+ -Dpulsar.allocator.exit_on_oom=false
+ -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
+ -Dio.netty.tryReflectionSetAccessible=true
+ ${test.additional.args}
+
false
${project.basedir}/src/test/resources/logback-test.xml
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java
index d6828730..cc01d58e 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java
@@ -81,7 +81,7 @@ public void testAUTO_ACKNOWLEDGE() throws Exception {
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is automatically acknowledged on receive,
// but, as we are not setting ackReceiptEnabled, the acknowledgement does not wait
// for the server to return success or failure
@@ -129,7 +129,7 @@ public void onException(Message message, Exception e) {}
// that this test is notably faster
try (MessageConsumer consumer = session.createConsumer(destination); ) {
for (int i = 0; i < 1000; i++) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
log.info("ack {}", i);
// message is automatically acknowledged on receive
}
@@ -160,7 +160,7 @@ public void testADUPS_OK_ACKNOWLEDGE() throws Exception {
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is automatically acknowledged on receive, but best effort and async
}
// give time for the async ack
@@ -196,7 +196,7 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception {
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive
// closing the consumer
@@ -204,10 +204,10 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception {
try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));
- Message receive2 = consumer.receive();
+ Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
// ack only message1, this automatically acks all the other messages
@@ -246,7 +246,7 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive
// closing the consumer
@@ -254,10 +254,10 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));
- Message receive2 = consumer.receive();
+ Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
// ack only message1,
@@ -267,7 +267,7 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
try (MessageConsumer consumer = session.createConsumer(destination); ) {
// message2 is still there
- Message receive2 = consumer.receive();
+ Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
assertNull(consumer.receive(100));
@@ -338,7 +338,7 @@ public void onException(Message message, Exception e) {}
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- assertEquals("foo", consumer.receive().getStringProperty("test"));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive
// closing the consumer
@@ -346,12 +346,12 @@ public void onException(Message message, Exception e) {}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
- PulsarMessage receive = (PulsarMessage) consumer.receive();
+ PulsarMessage receive = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));
assertTrue(
receive.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl);
- PulsarMessage receive2 = (PulsarMessage) consumer.receive();
+ PulsarMessage receive2 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
assertTrue(
receive2.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl);
@@ -367,7 +367,7 @@ public void onException(Message message, Exception e) {}
// see PIP-54
// message2 is still there
- Message receive2 = consumer.receive();
+ Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
assertNull(consumer.receive(100));
@@ -376,11 +376,11 @@ public void onException(Message message, Exception e) {}
receive2.acknowledge();
} else {
// message1 is still there, because we haven't fully acknowledged the Batch
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));
// message2 is still there
- Message receive2 = consumer.receive();
+ Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
assertNull(consumer.receive(100));
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java
index 57bec93c..f336cb6c 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java
@@ -305,6 +305,6 @@ private void produce(PulsarSession session, Destination destination) throws JMSE
}
private void consume(MessageConsumer consumer) throws JMSException {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java
index d4f6b730..67f4f2fe 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java
@@ -94,7 +94,7 @@ public void pausedConnectionTest() throws Exception {
// block until the connection is started
// the connection will be started in 5 seconds and the test won't be stuck
- assertEquals("foo", consumer.receive().getBody(String.class));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
connection.stop();
@@ -107,7 +107,7 @@ public void pausedConnectionTest() throws Exception {
// now we are able to receive all of the remaining messages
assertEquals("foo", consumer.receive(2000).getBody(String.class));
assertEquals("foo", consumer.receiveNoWait().getBody(String.class));
- assertEquals("foo", consumer.receive().getBody(String.class));
+ assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
} finally {
executeLater.shutdown();
@@ -144,7 +144,7 @@ public void stopConnectionMustWaitForPendingReceive() throws Exception {
// no message in the topic, so this consumer will hang
beforeReceive.countDown();
log.info("receiving...");
- consumerResult.complete(consumer.receive());
+ consumerResult.complete(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
} catch (Throwable err) {
consumerResult.completeExceptionally(err);
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java
index 6f147a73..2bc01ba0 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java
@@ -199,18 +199,18 @@ private void performTest(
producer.send(session.createTextMessage("foo"));
}
- Message message = consumer1.receive();
+ Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());
// message is re-delivered again after ackTimeoutMillis
- message = consumer1.receive();
+ message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());
- message = consumerDeadLetter.receive();
+ message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);
// this is another topic, and the JMSXDeliveryCount is only handled on the client side
@@ -335,7 +335,7 @@ public void onException(Message message, Exception e) {}
// receive all the messages, without acknowledging them.
// they will be redelivered multiple-times
while (true) {
- PulsarMessage message = (PulsarMessage) consumer1.receive();
+ PulsarMessage message = (PulsarMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
int _JMSXDeliveryCount = message.getIntProperty("JMSXDeliveryCount");
counterByMessageId.put(message.getJMSMessageID(), _JMSXDeliveryCount);
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java
index 18220099..f42dd78a 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java
@@ -102,7 +102,7 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception {
try (MessageConsumer consumer1 = session.createConsumer(destinationWithSubscription)) {
for (int i = 0; i < 10; i++) {
- assertNotNull(consumer1.receive());
+ assertNotNull(consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
// verify that we have 1 subscription
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java
index 185cd15f..4f370c4c 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java
@@ -109,7 +109,7 @@ public void sendMessageReceiveFromQueueWithNoLocal(
}
// we must be able to receive the message from the second connection
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
@@ -157,7 +157,7 @@ public void sendMessageReceiveFromTopicWithNoLocal(
}
// we must be able to receive the message from the second connection
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
@@ -206,7 +206,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector(
}
// we must be able to receive the message from the second connection
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
@@ -254,7 +254,7 @@ public void sendMessageReceiveFromSharedSubscriptionWithNoLocal(
}
// we must be able to receive the message from the second connection
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
@@ -318,7 +318,7 @@ public void acknowledgeRejectedMessagesTest(
try (MessageConsumer consumerAllowLocal =
session.createConsumer(destination, null, false); ) {
for (int i = 0; i < 10; i++) {
- assertNotNull(consumerAllowLocal.receive());
+ assertNotNull(consumerAllowLocal.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
} else {
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java
index f94d2501..b9b34fa5 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java
@@ -89,13 +89,13 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception {
primaryContext.createProducer().send(destination, "foo");
- Message message = consumerWithDLQConfiguration.receive();
+ Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());
// message is re-delivered again after ackTimeoutMillis
- message = consumerWithDLQConfiguration.receive();
+ message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());
@@ -103,7 +103,7 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception {
try (JMSConsumer consumerDeadLetter =
primaryContext.createSharedConsumer(destinationDeadletter, "dqlsub"); ) {
- message = consumerDeadLetter.receive();
+ message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);
@@ -143,13 +143,13 @@ public void overrideDQLConfigurationWithSession() throws Exception {
primarySession.createProducer(destination).send(primarySession.createTextMessage("foo"));
- Message message = consumerWithDLQConfiguration.receive();
+ Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());
// message is re-delivered again after ackTimeoutMillis
- message = consumerWithDLQConfiguration.receive();
+ message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());
@@ -157,7 +157,7 @@ public void overrideDQLConfigurationWithSession() throws Exception {
try (MessageConsumer consumerDeadLetter =
primarySession.createSharedConsumer(destinationDeadletter, "dqlsub"); ) {
- message = consumerDeadLetter.receive();
+ message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
index d2bf8983..c98407c7 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java
@@ -157,7 +157,7 @@ public void basicTest(int numPartitions, String mapping) throws Exception {
List received = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"got msg {} prio {} from {} actually {}",
msg.getText(),
@@ -272,7 +272,7 @@ public void onException(Message message, Exception e) {
Set receivedTexts = new HashSet<>();
List received = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
received.add(msg.getJMSPriority());
if (msg.getJMSPriority() == LOW_PRIORITY) {
@@ -442,7 +442,7 @@ private static void testMultiTopicConsumer(Session session, int numMessages, Que
List received = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"got msg {} prio {} from {} actually {}",
msg.getText(),
@@ -497,7 +497,7 @@ public void basicPriorityJMSContextTest() throws Exception {
List received = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("got msg {} prio {}", msg.getText(), msg.getJMSPriority());
received.add(msg);
}
@@ -574,7 +574,7 @@ public void onException(Message message, Exception e) {
List received = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
if (i == 0) {
// await all messages in the consumer receive queue
ConsumerBase> consumerBase = ((PulsarMessageConsumer) consumer1).getConsumer();
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java
index abd0937e..98cc720e 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java
@@ -123,7 +123,7 @@ public void sendFromPulsarClientReceiveWithJMS() throws Exception {
producer.newMessage().value("foo").key("bar").send();
// the JMS client reads raw messages always as BytesMessage
- BytesMessage message = (BytesMessage) consumer.receive();
+ BytesMessage message = (BytesMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertArrayEquals(
"foo".getBytes(StandardCharsets.UTF_8), message.getBody(byte[].class));
assertEquals("bar", message.getStringProperty("JMSXGroupID"));
@@ -158,7 +158,7 @@ public void stringSchemaTest() throws Exception {
producer.newMessage().value("foo").key("bar").send();
// the JMS client reads Schema String as TextMessage
- TextMessage message = (TextMessage) consumer.receive();
+ TextMessage message = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getText());
assertEquals("bar", message.getStringProperty("JMSXGroupID"));
}
@@ -191,7 +191,7 @@ public void longSchemaTest() throws Exception {
producer.newMessage().value(23432424L).key("bar").send();
// the JMS client reads Schema INT64 as ObjectMessage
- ObjectMessage message = (ObjectMessage) consumer.receive();
+ ObjectMessage message = (ObjectMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(23432424L, message.getObject());
assertEquals("bar", message.getStringProperty("JMSXGroupID"));
}
@@ -249,7 +249,7 @@ public void avroSchemaTest() throws Exception {
producer.newMessage().value(pojo).key("bar").send();
// the JMS client reads Schema AVRO as TextMessage
- MapMessage message = (MapMessage) consumer.receive();
+ MapMessage message = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getString("name"));
Map nestedValue = (Map) message.getObject("nested");
assertEquals(23, nestedValue.get("age"));
@@ -316,7 +316,7 @@ public void avroKeyValueSchemaTest() throws Exception {
producer.newMessage().value(keyValue).send();
// the JMS client reads Schema AVRO as TextMessage
- MapMessage message = (MapMessage) consumer.receive();
+ MapMessage message = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
Map key = (Map) message.getObject("key");
assertEquals(23, key.get("age"));
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java
index 957c6446..e08987e2 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java
@@ -129,7 +129,7 @@ public void sendJMSRedeliveryCountTest() throws Exception {
}
try (MessageConsumer consumer1 = session.createConsumer(destination); ) {
- Message message = consumer1.receive();
+ Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());
@@ -138,7 +138,7 @@ public void sendJMSRedeliveryCountTest() throws Exception {
// close consumer, message not acked, so it must be redelivered
try (MessageConsumer consumer1 = session.createConsumer(destination); ) {
- Message message = consumer1.receive();
+ Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
// Unfortunately Pulsar does not set properly the redelivery count
@@ -223,7 +223,7 @@ public void testQueueBrowsers() throws Exception {
try (MessageConsumer consumer1 = session.createConsumer(destination); ) {
// consume half queue
for (int i = 0; i < numMessages / 2; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consume {}", msg);
assertEquals("foo-" + i, msg.getText());
}
@@ -242,7 +242,7 @@ public void testQueueBrowsers() throws Exception {
try (MessageConsumer consumer1 = session.createConsumer(destination); ) {
// consume half queue
for (int i = numMessages / 2; i < numMessages; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consume2 {} {}", msg, msg.getJMSMessageID());
assertEquals("foo-" + i, msg.getText());
}
@@ -342,7 +342,7 @@ public void useQueueWithoutPulsarAdmin() throws Exception {
// even without using PulsarAdmin
try (MessageConsumer consumer1 = session.createConsumer(destination); ) {
for (int i = 0; i < 10; i++) {
- assertEquals("foo-" + i, consumer1.receive().getBody(String.class));
+ assertEquals("foo-" + i, consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
}
// no more messages
@@ -389,9 +389,9 @@ public void customSubscriptionName() throws Exception {
// assert that all the consumers receive all the messages
for (int i = 0; i < 10; i++) {
- assertNotNull(consumer1.receive());
- assertNotNull(consumer2.receive());
- assertNotNull(consumer3.receive());
+ assertNotNull(consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
+ assertNotNull(consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
+ assertNotNull(consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
// verify that we have 3 different subscriptions, with the expected names
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java
index 91a2da4d..81104d0c 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java
@@ -136,7 +136,7 @@ public void sendMessageReceiveFromQueue() throws Exception {
}
}
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-9", textMessage.getText());
if (useServerSideFiltering) {
@@ -192,7 +192,7 @@ public void sendMessageReceiveFromTopicWithSelector() throws Exception {
}
}
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-9", textMessage.getText());
if (useServerSideFiltering) {
@@ -244,7 +244,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector() throws Exc
}
}
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-9", textMessage.getText());
if (useServerSideFiltering) {
@@ -294,7 +294,7 @@ public void sendMessageReceiveFromSharedSubscriptionWithSelector() throws Except
}
}
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-9", textMessage.getText());
if (useServerSideFiltering) {
@@ -369,7 +369,7 @@ public void onException(Message message, Exception e) {
CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get();
for (String text : expected) {
- PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive();
+ PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(text, textMessage.getText());
// ensure that it is a batch message
@@ -473,7 +473,7 @@ public void onException(Message message, Exception e) {
CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get();
for (String text : expected) {
- PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive();
+ PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(text, textMessage.getText());
// ensure that it is a batch message
@@ -581,7 +581,7 @@ public void onException(Message message, Exception e) {
while (!expected1.isEmpty()) {
log.info(
"{} messages left for consumer1: {}", expected1.size(), expected1);
- PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive();
+ PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"consumer1 received {} {}",
textMessage.getText(),
@@ -614,7 +614,7 @@ public void onException(Message message, Exception e) {
while (!expected2.isEmpty()) {
log.info(
"{} messages left for consumer2: {}", expected2.size(), expected2);
- PulsarTextMessage textMessage = (PulsarTextMessage) consumer2.receive();
+ PulsarTextMessage textMessage = (PulsarTextMessage) consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"consumer2 received {} {}",
textMessage.getText(),
@@ -707,7 +707,7 @@ public void onException(Message message, Exception e) {
CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get();
for (String text : expected) {
- PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive();
+ PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(text, textMessage.getText());
// ensure that it is a batch message
@@ -811,7 +811,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int
// with a non partitioned topic we can expect some order (even if it is not required)
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-" + i, textMessage.getText());
}
}
@@ -820,7 +820,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int
List received = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
received.add(textMessage.getText());
}
}
@@ -941,7 +941,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int
// with a non partitioned topic we can expect some order (even if it is not required)
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-" + i, textMessage.getText());
}
}
@@ -950,7 +950,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int
List received = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
received.add(textMessage.getText());
}
}
@@ -1078,7 +1078,7 @@ public void onException(Message message, Exception e) {
}
for (int i = 0; i < 20; i++) {
if ((i % 2 == 0) && (i % 3 == 0)) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"received {} {}",
textMessage.getText(),
@@ -1160,7 +1160,7 @@ public void chunkingTest() throws Exception {
}
}
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(hugePayload + "-9", textMessage.getText());
if (useServerSideFiltering) {
@@ -1247,7 +1247,7 @@ public void sendHugeFilterOnServerSideSubscription() throws Exception {
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-" + i, textMessage.getText());
}
}
@@ -1316,7 +1316,7 @@ public void sendHugeFilterOnConsumerMetadata() throws Exception {
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-" + i, textMessage.getText());
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java
index 0b3e5628..726837a5 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java
@@ -82,7 +82,7 @@ public void test() throws Exception {
try (MessageConsumer consumer = session2.createConsumer(queue)) {
connection2.start();
- assertEquals("foo0", consumer.receive().getBody(String.class));
+ assertEquals("foo0", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
}
// serialise
@@ -103,7 +103,7 @@ public void test() throws Exception {
con.start();
// consume from previously created Queue
- assertEquals("foo" + i, consumer.receive().getBody(String.class));
+ assertEquals("foo" + i, consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
i++;
// use the Pulsar Producer
@@ -113,7 +113,7 @@ public void test() throws Exception {
// use Pulsar Consumer
try (MessageConsumer consumer2 = session.createConsumer(queue2); ) {
- assertEquals("bar", consumer2.receive().getBody(String.class));
+ assertEquals("bar", consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java
index 159a491c..04482fa2 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java
@@ -259,18 +259,18 @@ public void sendMessageReceive() throws Exception {
producer.send(simpleMessage2, DeliveryMode.PERSISTENT, 3, 0);
}
- TextMessage msg = (TextMessage) consumer.receive();
+ TextMessage msg = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", msg.getText());
- ObjectMessage msg2 = (ObjectMessage) consumer.receive();
+ ObjectMessage msg2 = (ObjectMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("bar", msg2.getObject());
- BytesMessage msg3 = (BytesMessage) consumer.receive();
+ BytesMessage msg3 = (BytesMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(1234, msg3.readInt());
- StreamMessage msg4 = (StreamMessage) consumer.receive();
+ StreamMessage msg4 = (StreamMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(1234l, msg4.readLong());
- MapMessage msg5 = (MapMessage) consumer.receive();
+ MapMessage msg5 = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(true, msg5.getBoolean("foo"));
assertEquals("test", msg5.getString("bar"));
- Message msg6 = consumer.receive();
+ Message msg6 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals((byte) 1, msg6.getByteProperty("a"));
assertEquals(123232323233L, msg6.getLongProperty("b"));
@@ -290,7 +290,7 @@ public void sendMessageReceive() throws Exception {
// we are serializing Object properties as strings
assertEquals(1.3d, msg6.getObjectProperty("i"));
- Message msg7 = consumer.receive();
+ Message msg7 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(DeliveryMode.PERSISTENT, msg7.getJMSDeliveryMode());
assertArrayEquals(new byte[] {1, 2, 3}, msg7.getJMSCorrelationIDAsBytes());
@@ -342,7 +342,7 @@ public void sendMessageReceiveJMSContext() throws Exception {
consumer.receiveBody(StringBuffer.class, 1000).toString());
assertArrayEquals(new byte[] {1, 2, 3}, consumer.receiveBody(byte[].class));
assertEquals(Collections.singletonMap("a", "b"), consumer.receiveBody(Map.class));
- Message msg6 = consumer.receive();
+ Message msg6 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals((byte) 1, msg6.getByteProperty("a"));
assertEquals(123232323233L, msg6.getLongProperty("b"));
assertEquals(1232323, msg6.getIntProperty("c"));
@@ -376,7 +376,7 @@ public void sendMessageReceiveJMSContext() throws Exception {
TextMessage expTextMessage = context.createTextMessage(message);
expTextMessage.setStringProperty("COM_SUN_JMS_TESTNAME", "queueReceiveTests");
producer.send(destination, expTextMessage);
- TextMessage actTextMessage = (TextMessage) consumer.receive();
+ TextMessage actTextMessage = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertNotNull(actTextMessage);
;
assertEquals(actTextMessage.getText(), expTextMessage.getText());
@@ -391,7 +391,7 @@ public void sendMessageReceiveJMSContext() throws Exception {
producer.send(destination, expTextMessage);
actTextMessage = (TextMessage) consumer.receiveNoWait();
if (actTextMessage == null) {
- actTextMessage = (TextMessage) consumer.receive();
+ actTextMessage = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
}
assertNotNull(actTextMessage);
assertEquals(actTextMessage.getText(), expTextMessage.getText());
@@ -611,9 +611,9 @@ public void systemNameSpaceTest() throws Exception {
}
try (MessageConsumer consumer = session.createConsumer(destination); ) {
- TextMessage msg1 = (TextMessage) consumer.receive();
+ TextMessage msg1 = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo1", msg1.getText());
- TextMessage msg2 = (TextMessage) consumer.receive();
+ TextMessage msg2 = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", msg2.getText());
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java
index c02fcb4a..07c0af8d 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java
@@ -115,7 +115,7 @@ private void useTemporaryDestinationTest(Function temporar
// on the server, receive the request
try (MessageConsumer serverSideConsumer = session.createConsumer(serverAddress)) {
- Message message = serverSideConsumer.receive();
+ Message message = serverSideConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("request", message.getBody(String.class));
Destination jmsReplyTo = message.getJMSReplyTo();
@@ -129,7 +129,7 @@ private void useTemporaryDestinationTest(Function temporar
}
// on the client receive the response
- Message theResponse = consumerClient.receive();
+ Message theResponse = consumerClient.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("response", theResponse.getBody(String.class));
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java
index a1a158f6..893d7f29 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java
@@ -202,7 +202,7 @@ public void sendMessageReceiveFromTopicWithTimeToLive(
// only foo-1, foo-3, foo-5... can be received
for (int i = 0; i < 5; i++) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo-" + (i * 2 + 1), textMessage.getText());
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java
index 51e7e76c..17db0190 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java
@@ -100,13 +100,13 @@ public void sendMessageReceiveFromTopic() throws Exception {
// all of the two consumers receive all of the messages, in order
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer2.receive();
+ TextMessage msg = (TextMessage) consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer2, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -163,14 +163,14 @@ public void useTopicSubscriberApiWithSharedSubscription() throws Exception {
// consumer1 receives all messages, in order
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
// let consumer2a receive the first half of the message
for (int i = 0; i < 5; i++) {
- TextMessage msg = (TextMessage) consumer2a.receive();
+ TextMessage msg = (TextMessage) consumer2a.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer2a, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -182,7 +182,7 @@ public void useTopicSubscriberApiWithSharedSubscription() throws Exception {
try (TopicSubscriber consumer2b =
session.createDurableSubscriber(destination, "subscription2")) {
for (int i = 5; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer2b.receive();
+ TextMessage msg = (TextMessage) consumer2b.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer2b, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -267,14 +267,14 @@ private void testSharedDurableConsumer(SubscriptionType subscriptionType) throws
// consumer1 receives all messages, in order
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
// consumer3, receive a few messages, then close the consumer
for (int i = 0; i < 5; i++) {
- TextMessage msg = (TextMessage) consumer3.receive();
+ TextMessage msg = (TextMessage) consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer3, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -284,7 +284,7 @@ private void testSharedDurableConsumer(SubscriptionType subscriptionType) throws
try (MessageConsumer consumer3b =
session.createSharedDurableConsumer(destination, "subscription3"); ) {
for (int i = 5; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer3b.receive();
+ TextMessage msg = (TextMessage) consumer3b.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer3b, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -357,14 +357,14 @@ private void testSharedNonDurableConsumer(SubscriptionType subscriptionType) thr
// consumer1 receives all messages, in order
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
// consumer3, receive a few messages, then close the consumer
for (int i = 0; i < 5; i++) {
- TextMessage msg = (TextMessage) consumer3.receive();
+ TextMessage msg = (TextMessage) consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
@@ -428,7 +428,7 @@ public void testUseKeySharedSubscriptionTypeforTopicConsumer() throws Exception
// consumer1 receives all messages, in order
for (int i = 0; i < 10; i++) {
- TextMessage msg = (TextMessage) consumer1.receive();
+ TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("consumer {} received {}", consumer1, msg.getText());
assertEquals("foo-" + i, msg.getText());
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java
index bda6b025..1be2350b 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java
@@ -103,8 +103,8 @@ public void sendMessageTest() throws Exception {
transaction.commit();
// message is now visible to consumers
- assertNotNull(consumer.receive());
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
}
@@ -206,7 +206,7 @@ public void consumeTransactionTest() throws Exception {
producer.send(textMsg);
}
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getBody(String.class));
}
@@ -245,11 +245,11 @@ public void multiCommitTest() throws Exception {
producer.send(producerSession.createTextMessage("foo1"));
}
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo0", receive.getBody(String.class));
transaction.commit();
- receive = consumer.receive();
+ receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo1", receive.getBody(String.class));
transaction.commit();
}
@@ -290,7 +290,7 @@ public void consumeRollbackTransactionTest() throws Exception {
producer.send(textMsg);
}
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getBody(String.class));
}
@@ -300,7 +300,7 @@ public void consumeRollbackTransactionTest() throws Exception {
// the consumer rolledback the transaction, now we can receive the message from
// another client
try (MessageConsumer consumer = producerSession.createConsumer(destination); ) {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
}
@@ -334,7 +334,7 @@ public void consumeRollbackTransaction2Test() throws Exception {
producer.send(textMsg);
}
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getBody(String.class));
// rollback before closing Consumer
@@ -344,7 +344,7 @@ public void consumeRollbackTransaction2Test() throws Exception {
// the consumer rolledback the transaction, now we can receive the message from
// another client
try (MessageConsumer consumer = producerSession.createConsumer(destination); ) {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
}
@@ -390,7 +390,7 @@ public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exceptio
}
// transactional consumer, receives but it does not commit
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo0", receive.getBody(String.class));
// the QueueBrowser still sees the message
@@ -411,7 +411,7 @@ public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exceptio
try (Session secondSession = connection2.createSession();
MessageConsumer consumer = secondSession.createConsumer(destination); ) {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
// it looks like peekMessage is not following the subscription in realtime
Thread.sleep(2000);
@@ -462,18 +462,18 @@ public void rollbackReceivedMessages() throws Exception {
}
}
- TextMessage receive = (TextMessage) consumer.receive();
+ TextMessage receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("receive and commit {}", receive.getText());
assertEquals(numMessages, countMessages(producerSession, destination));
transaction.commit();
assertEquals(numMessages - 1, countMessages(producerSession, destination));
- receive = (TextMessage) consumer.receive();
+ receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("receive and rollback {}", receive.getText());
transaction.rollback();
assertEquals(numMessages - 1, countMessages(producerSession, destination));
- receive = (TextMessage) consumer.receive();
+ receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info("receive {}", receive.getText());
assertEquals(numMessages - 1, countMessages(producerSession, destination));
@@ -537,7 +537,7 @@ public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception {
}
// transactional consumer, receives but it does not commit
- Message receive = consumer.receive();
+ Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo0", receive.getBody(String.class));
// the QueueBrowser still sees the message
@@ -558,7 +558,7 @@ public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception {
try (Session secondSession = connection2.createSession();
MessageConsumer consumer = secondSession.createConsumer(destination); ) {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
// it looks like peekMessage is not following the subscription in realtime
Thread.sleep(2000);
@@ -682,13 +682,13 @@ public void onException(Message message, Exception e) {}
// message is now visible to consumers
// verify that the two messages are part of the same batch
- PulsarMessage message1 = (PulsarMessage) consumer.receive();
+ PulsarMessage message1 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
org.apache.pulsar.client.api.Message> receivedPulsarMessage1 =
message1.getReceivedPulsarMessage();
BatchMessageIdImpl messageId1 =
(BatchMessageIdImpl) receivedPulsarMessage1.getMessageId();
- PulsarMessage message2 = (PulsarMessage) consumer.receive();
+ PulsarMessage message2 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
org.apache.pulsar.client.api.Message> receivedPulsarMessage2 =
message2.getReceivedPulsarMessage();
BatchMessageIdImpl messageId2 =
@@ -731,8 +731,8 @@ public void emulatedTransactionsTest() throws Exception {
transaction.commit();
- assertNotNull(consumer.receive());
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
consumerSession.commit();
}
@@ -989,7 +989,7 @@ public void consumeProduceScenario() throws Exception {
transaction.commit();
}
- Message receive = transactionConsumer.receive();
+ Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getBody(String.class));
log.info("received {}", receive.getJMSMessageID());
assertEquals(sentMessageID, receive.getJMSMessageID());
@@ -997,7 +997,7 @@ public void consumeProduceScenario() throws Exception {
;
transaction.commit();
- Message receiveOtherSession = consumerOtherSession.receive();
+ Message receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receiveOtherSession.getBody(String.class));
log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID());
assertEquals(sentMessageID, receiveOtherSession.getJMSMessageID());
@@ -1059,12 +1059,12 @@ public void testMixedProducesScenario() throws Exception {
transaction.commit();
}
- Message receive = transactionConsumer.receive();
+ Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo1", receive.getBody(String.class));
log.info("received {}", receive.getJMSMessageID());
assertEquals(sentMessageID, receive.getJMSMessageID());
- Message receive2 = transactionConsumer.receive();
+ Message receive2 = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getBody(String.class));
log.info("received {}", receive2.getJMSMessageID());
assertEquals(sentMessageID2, receive2.getJMSMessageID());
@@ -1072,12 +1072,12 @@ public void testMixedProducesScenario() throws Exception {
transaction.commit();
- Message receiveOtherSession = consumerOtherSession.receive();
+ Message receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo1", receiveOtherSession.getBody(String.class));
log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID());
assertEquals(sentMessageID, receiveOtherSession.getJMSMessageID());
- receiveOtherSession = consumerOtherSession.receive();
+ receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receiveOtherSession.getBody(String.class));
log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID());
assertEquals(sentMessageID2, receiveOtherSession.getJMSMessageID());
@@ -1129,8 +1129,8 @@ public void testMixedConsumersOnSharedSubscription() throws Exception {
// the message is automatically negativelity acknoledged
// and the broker dispatches the message again
// so eventually the message will be received by the first consumer
- Message receive = transactionConsumer.receive();
- Message receive2 = consumerOtherSession.receive();
+ Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
+ Message receive2 = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
receive2.acknowledge();
transaction.rollback();
@@ -1138,7 +1138,7 @@ public void testMixedConsumersOnSharedSubscription() throws Exception {
transactionConsumer.close();
// the first message one is to be delivered again
- Message receive3 = consumerOtherSession.receive();
+ Message receive3 = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
receive3.acknowledge();
assertEquals(receive.getBody(String.class), receive3.getBody(String.class));
@@ -1195,7 +1195,7 @@ public void sendMessageWithPartitionStickKeyTest() throws Exception {
Map> messagesByPartition = new HashMap<>();
for (int i = 0; i < 10 * 4; i++) {
- PulsarMessage message = (PulsarMessage) consumer.receive();
+ PulsarMessage message = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
String receivedTopicName = message.getReceivedPulsarMessage().getTopicName();
log.info("message {} {}", receivedTopicName, message);
messagesByPartition
@@ -1233,7 +1233,7 @@ public void sendMessageWithPartitionStickKeyTest() throws Exception {
}
for (int i = 0; i < 8; i++) {
- PulsarMessage message = (PulsarMessage) consumer.receive();
+ PulsarMessage message = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
String receivedTopicName = message.getReceivedPulsarMessage().getTopicName();
log.info("messageAfter {} {}", receivedTopicName, message);
messagesByPartition
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java
index 63de7bbb..4aff5d71 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java
@@ -71,13 +71,13 @@ public void unsubscribeTest() throws Exception {
producer.send(textMsg);
producer.send(textMsg);
producer.send(textMsg);
- assertNotNull(consumer.receive());
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
// leave two messages to be consumed
}
try (MessageConsumer consumer =
session.createSharedDurableConsumer(destination, "sub1")) {
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
// // deleting the subscription
session.unsubscribe("sub1");
@@ -89,7 +89,7 @@ public void unsubscribeTest() throws Exception {
assertNull(consumer.receive(1000));
producer.send(textMsg);
- assertNotNull(consumer.receive());
+ assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java
index cc17d6e3..71f7ef43 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java
@@ -143,7 +143,7 @@ public void testMultiTopic(int numPartitions, boolean useRegExp) throws Exceptio
try (MessageConsumer consumer = session.createConsumer(destination); ) {
for (int i = 0; i < count; i++) {
- String payload = consumer.receive().getBody(String.class);
+ String payload = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class);
log.info("Received {}, remaining {}", payload, payloads.size());
assertFalse(payloads.remove(payload));
}
@@ -272,7 +272,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(
List received = new ArrayList<>();
for (int i = 0; i < 10 * destinationsToWrite.size(); i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
log.info(
"Received {} from {}", textMessage.getText(), textMessage.getJMSDestination());
received.add(textMessage.getText());
@@ -401,7 +401,7 @@ public void sendUsingExistingPulsarSubscriptionWithClientSideFilterForPartitione
List received = new ArrayList<>();
for (int i = 0; i < 10 * destinationsToWrite.size(); i++) {
if (i % 2 == 0) {
- TextMessage textMessage = (TextMessage) consumer1.receive();
+ TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
received.add(textMessage.getText());
totalReceived++;
}
@@ -481,7 +481,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti
try (MessageConsumer consumer = session.createConsumer(destination); ) {
for (int i = 0; i < count; i++) {
- String payload = consumer.receive().getBody(String.class);
+ String payload = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class);
log.info("Received {}, remaining {}", payload, payloads.size());
assertFalse(payloads.remove(payload));
}
@@ -508,7 +508,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti
producer.send(newDestination, nextMessage);
log.info("id: {}", nextMessage.getJMSMessageID());
- TextMessage received = (TextMessage) consumer.receive();
+ TextMessage received = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("new", received.getText());
Field selectorSupportOnSubscriptions =
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java
index b51e88e3..055fce94 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java
@@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup;
@@ -29,6 +30,8 @@
/** Pulsar cluster. */
public class PulsarCluster implements AutoCloseable {
+
+ public static final long DEFAULT_RECEIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(2);
private final PulsarService service;
private final BookKeeperCluster bookKeeperCluster;
@@ -85,9 +88,7 @@ public void start() throws Exception {
.clusters()
.createCluster(
"localhost",
- ClusterData.builder()
- .brokerServiceUrl(service.getBrokerServiceUrl())
- .build());
+ ClusterData.builder().brokerServiceUrl(service.getBrokerServiceUrl()).build());
service
.getAdminClient()
.tenants()