From ddeb9171a9d647a19b6408d51322b19c1268a5b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 8 Apr 2024 19:33:10 +0200 Subject: [PATCH] tune surefire --- pom.xml | 13 +++- .../pulsar/jms/AcknowledgementModeTest.java | 32 +++++----- .../pulsar/jms/BasicServerSideFilterTest.java | 2 +- .../oss/pulsar/jms/ConnectionPausedTest.java | 6 +- .../oss/pulsar/jms/DeadLetterQueueTest.java | 8 +-- .../jms/NoAutoCreateSubscriptionTest.java | 2 +- .../datastax/oss/pulsar/jms/NoLocalTest.java | 10 +-- .../OverrideConsumerConfigurationTest.java | 12 ++-- .../datastax/oss/pulsar/jms/PriorityTest.java | 10 +-- .../oss/pulsar/jms/PulsarInteropTest.java | 10 +-- .../datastax/oss/pulsar/jms/QueueTest.java | 16 ++--- .../oss/pulsar/jms/SelectorsTestsBase.java | 34 +++++----- .../SerializableConnectionFactoryTest.java | 6 +- .../datastax/oss/pulsar/jms/SimpleTest.java | 24 +++---- .../pulsar/jms/TemporaryDestinationsTest.java | 4 +- .../oss/pulsar/jms/TimeToLiveTest.java | 2 +- .../datastax/oss/pulsar/jms/TopicTest.java | 22 +++---- .../oss/pulsar/jms/TransactionsTest.java | 62 +++++++++---------- .../oss/pulsar/jms/UnsubscribeTest.java | 8 +-- .../jms/VirtualDestinationsConsumerTest.java | 10 +-- .../oss/pulsar/jms/utils/PulsarCluster.java | 7 ++- 21 files changed, 155 insertions(+), 145 deletions(-) diff --git a/pom.xml b/pom.xml index 29afd8c5..d227aacf 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ 1.7.30 1.2.3 5.7.1 - 3.0.0-M5 + 3.1.0 2.14.2 2.8.9 1.21 @@ -316,7 +316,16 @@ limitations under the License.]]> maven-surefire-plugin ${surefire.version} - ${test.additional.args} + 1 + + -XX:+ExitOnOutOfMemoryError -Xmx2G -XX:+UseZGC + -Dpulsar.allocator.pooled=true + -Dpulsar.allocator.leak_detection=Advanced + -Dpulsar.allocator.exit_on_oom=false + -Dpulsar.allocator.out_of_memory_policy=FallbackToHeap + -Dio.netty.tryReflectionSetAccessible=true + ${test.additional.args} + false ${project.basedir}/src/test/resources/logback-test.xml diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java index d6828730..cc01d58e 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/AcknowledgementModeTest.java @@ -81,7 +81,7 @@ public void testAUTO_ACKNOWLEDGE() throws Exception { } try (MessageConsumer consumer = session.createConsumer(destination); ) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); // message is automatically acknowledged on receive, // but, as we are not setting ackReceiptEnabled, the acknowledgement does not wait // for the server to return success or failure @@ -129,7 +129,7 @@ public void onException(Message message, Exception e) {} // that this test is notably faster try (MessageConsumer consumer = session.createConsumer(destination); ) { for (int i = 0; i < 1000; i++) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); log.info("ack {}", i); // message is automatically acknowledged on receive } @@ -160,7 +160,7 @@ public void testADUPS_OK_ACKNOWLEDGE() throws Exception { } try (MessageConsumer consumer = session.createConsumer(destination); ) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); // message is automatically acknowledged on receive, but best effort and async } // give time for the async ack @@ -196,7 +196,7 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception { } try (MessageConsumer consumer = session.createConsumer(destination); ) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); // message is not automatically acknowledged on receive // closing the consumer @@ -204,10 +204,10 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception { try (MessageConsumer consumer = session.createConsumer(destination); ) { // receive and ack - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getStringProperty("test")); - Message receive2 = consumer.receive(); + Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); // ack only message1, this automatically acks all the other messages @@ -246,7 +246,7 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception { } try (MessageConsumer consumer = session.createConsumer(destination); ) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); // message is not automatically acknowledged on receive // closing the consumer @@ -254,10 +254,10 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception { try (MessageConsumer consumer = session.createConsumer(destination); ) { // receive and ack - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getStringProperty("test")); - Message receive2 = consumer.receive(); + Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); // ack only message1, @@ -267,7 +267,7 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception { try (MessageConsumer consumer = session.createConsumer(destination); ) { // message2 is still there - Message receive2 = consumer.receive(); + Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); assertNull(consumer.receive(100)); @@ -338,7 +338,7 @@ public void onException(Message message, Exception e) {} } try (MessageConsumer consumer = session.createConsumer(destination); ) { - assertEquals("foo", consumer.receive().getStringProperty("test")); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test")); // message is not automatically acknowledged on receive // closing the consumer @@ -346,12 +346,12 @@ public void onException(Message message, Exception e) {} try (MessageConsumer consumer = session.createConsumer(destination); ) { // receive and ack - PulsarMessage receive = (PulsarMessage) consumer.receive(); + PulsarMessage receive = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getStringProperty("test")); assertTrue( receive.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl); - PulsarMessage receive2 = (PulsarMessage) consumer.receive(); + PulsarMessage receive2 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); assertTrue( receive2.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl); @@ -367,7 +367,7 @@ public void onException(Message message, Exception e) {} // see PIP-54 // message2 is still there - Message receive2 = consumer.receive(); + Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); assertNull(consumer.receive(100)); @@ -376,11 +376,11 @@ public void onException(Message message, Exception e) {} receive2.acknowledge(); } else { // message1 is still there, because we haven't fully acknowledged the Batch - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getStringProperty("test")); // message2 is still there - Message receive2 = consumer.receive(); + Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getStringProperty("test")); assertNull(consumer.receive(100)); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java index 57bec93c..f336cb6c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/BasicServerSideFilterTest.java @@ -305,6 +305,6 @@ private void produce(PulsarSession session, Destination destination) throws JMSE } private void consume(MessageConsumer consumer) throws JMSException { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java index d4f6b730..67f4f2fe 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionPausedTest.java @@ -94,7 +94,7 @@ public void pausedConnectionTest() throws Exception { // block until the connection is started // the connection will be started in 5 seconds and the test won't be stuck - assertEquals("foo", consumer.receive().getBody(String.class)); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); connection.stop(); @@ -107,7 +107,7 @@ public void pausedConnectionTest() throws Exception { // now we are able to receive all of the remaining messages assertEquals("foo", consumer.receive(2000).getBody(String.class)); assertEquals("foo", consumer.receiveNoWait().getBody(String.class)); - assertEquals("foo", consumer.receive().getBody(String.class)); + assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); } finally { executeLater.shutdown(); @@ -144,7 +144,7 @@ public void stopConnectionMustWaitForPendingReceive() throws Exception { // no message in the topic, so this consumer will hang beforeReceive.countDown(); log.info("receiving..."); - consumerResult.complete(consumer.receive()); + consumerResult.complete(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } catch (Throwable err) { consumerResult.completeExceptionally(err); } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java index 6f147a73..2bc01ba0 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/DeadLetterQueueTest.java @@ -199,18 +199,18 @@ private void performTest( producer.send(session.createTextMessage("foo")); } - Message message = consumer1.receive(); + Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(1, message.getIntProperty("JMSXDeliveryCount")); assertFalse(message.getJMSRedelivered()); // message is re-delivered again after ackTimeoutMillis - message = consumer1.receive(); + message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(2, message.getIntProperty("JMSXDeliveryCount")); assertTrue(message.getJMSRedelivered()); - message = consumerDeadLetter.receive(); + message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); log.info("DLQ MESSAGE {}", message); // this is another topic, and the JMSXDeliveryCount is only handled on the client side @@ -335,7 +335,7 @@ public void onException(Message message, Exception e) {} // receive all the messages, without acknowledging them. // they will be redelivered multiple-times while (true) { - PulsarMessage message = (PulsarMessage) consumer1.receive(); + PulsarMessage message = (PulsarMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); int _JMSXDeliveryCount = message.getIntProperty("JMSXDeliveryCount"); counterByMessageId.put(message.getJMSMessageID(), _JMSXDeliveryCount); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java index 18220099..f42dd78a 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoAutoCreateSubscriptionTest.java @@ -102,7 +102,7 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception { try (MessageConsumer consumer1 = session.createConsumer(destinationWithSubscription)) { for (int i = 0; i < 10; i++) { - assertNotNull(consumer1.receive()); + assertNotNull(consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } // verify that we have 1 subscription diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java index 185cd15f..4f370c4c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/NoLocalTest.java @@ -109,7 +109,7 @@ public void sendMessageReceiveFromQueueWithNoLocal( } // we must be able to receive the message from the second connection - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("test", textMessage.getText()); } } @@ -157,7 +157,7 @@ public void sendMessageReceiveFromTopicWithNoLocal( } // we must be able to receive the message from the second connection - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("test", textMessage.getText()); } } @@ -206,7 +206,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector( } // we must be able to receive the message from the second connection - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("test", textMessage.getText()); } } @@ -254,7 +254,7 @@ public void sendMessageReceiveFromSharedSubscriptionWithNoLocal( } // we must be able to receive the message from the second connection - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("test", textMessage.getText()); } } @@ -318,7 +318,7 @@ public void acknowledgeRejectedMessagesTest( try (MessageConsumer consumerAllowLocal = session.createConsumer(destination, null, false); ) { for (int i = 0; i < 10; i++) { - assertNotNull(consumerAllowLocal.receive()); + assertNotNull(consumerAllowLocal.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } } else { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java index f94d2501..b9b34fa5 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/OverrideConsumerConfigurationTest.java @@ -89,13 +89,13 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception { primaryContext.createProducer().send(destination, "foo"); - Message message = consumerWithDLQConfiguration.receive(); + Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(1, message.getIntProperty("JMSXDeliveryCount")); assertFalse(message.getJMSRedelivered()); // message is re-delivered again after ackTimeoutMillis - message = consumerWithDLQConfiguration.receive(); + message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(2, message.getIntProperty("JMSXDeliveryCount")); assertTrue(message.getJMSRedelivered()); @@ -103,7 +103,7 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception { try (JMSConsumer consumerDeadLetter = primaryContext.createSharedConsumer(destinationDeadletter, "dqlsub"); ) { - message = consumerDeadLetter.receive(); + message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); log.info("DLQ MESSAGE {}", message); @@ -143,13 +143,13 @@ public void overrideDQLConfigurationWithSession() throws Exception { primarySession.createProducer(destination).send(primarySession.createTextMessage("foo")); - Message message = consumerWithDLQConfiguration.receive(); + Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(1, message.getIntProperty("JMSXDeliveryCount")); assertFalse(message.getJMSRedelivered()); // message is re-delivered again after ackTimeoutMillis - message = consumerWithDLQConfiguration.receive(); + message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(2, message.getIntProperty("JMSXDeliveryCount")); assertTrue(message.getJMSRedelivered()); @@ -157,7 +157,7 @@ public void overrideDQLConfigurationWithSession() throws Exception { try (MessageConsumer consumerDeadLetter = primarySession.createSharedConsumer(destinationDeadletter, "dqlsub"); ) { - message = consumerDeadLetter.receive(); + message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); log.info("DLQ MESSAGE {}", message); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index d2bf8983..c98407c7 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -157,7 +157,7 @@ public void basicTest(int numPartitions, String mapping) throws Exception { List received = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "got msg {} prio {} from {} actually {}", msg.getText(), @@ -272,7 +272,7 @@ public void onException(Message message, Exception e) { Set receivedTexts = new HashSet<>(); List received = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); received.add(msg.getJMSPriority()); if (msg.getJMSPriority() == LOW_PRIORITY) { @@ -442,7 +442,7 @@ private static void testMultiTopicConsumer(Session session, int numMessages, Que List received = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "got msg {} prio {} from {} actually {}", msg.getText(), @@ -497,7 +497,7 @@ public void basicPriorityJMSContextTest() throws Exception { List received = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("got msg {} prio {}", msg.getText(), msg.getJMSPriority()); received.add(msg); } @@ -574,7 +574,7 @@ public void onException(Message message, Exception e) { List received = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); if (i == 0) { // await all messages in the consumer receive queue ConsumerBase consumerBase = ((PulsarMessageConsumer) consumer1).getConsumer(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java index abd0937e..98cc720e 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java @@ -123,7 +123,7 @@ public void sendFromPulsarClientReceiveWithJMS() throws Exception { producer.newMessage().value("foo").key("bar").send(); // the JMS client reads raw messages always as BytesMessage - BytesMessage message = (BytesMessage) consumer.receive(); + BytesMessage message = (BytesMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertArrayEquals( "foo".getBytes(StandardCharsets.UTF_8), message.getBody(byte[].class)); assertEquals("bar", message.getStringProperty("JMSXGroupID")); @@ -158,7 +158,7 @@ public void stringSchemaTest() throws Exception { producer.newMessage().value("foo").key("bar").send(); // the JMS client reads Schema String as TextMessage - TextMessage message = (TextMessage) consumer.receive(); + TextMessage message = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getText()); assertEquals("bar", message.getStringProperty("JMSXGroupID")); } @@ -191,7 +191,7 @@ public void longSchemaTest() throws Exception { producer.newMessage().value(23432424L).key("bar").send(); // the JMS client reads Schema INT64 as ObjectMessage - ObjectMessage message = (ObjectMessage) consumer.receive(); + ObjectMessage message = (ObjectMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(23432424L, message.getObject()); assertEquals("bar", message.getStringProperty("JMSXGroupID")); } @@ -249,7 +249,7 @@ public void avroSchemaTest() throws Exception { producer.newMessage().value(pojo).key("bar").send(); // the JMS client reads Schema AVRO as TextMessage - MapMessage message = (MapMessage) consumer.receive(); + MapMessage message = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getString("name")); Map nestedValue = (Map) message.getObject("nested"); assertEquals(23, nestedValue.get("age")); @@ -316,7 +316,7 @@ public void avroKeyValueSchemaTest() throws Exception { producer.newMessage().value(keyValue).send(); // the JMS client reads Schema AVRO as TextMessage - MapMessage message = (MapMessage) consumer.receive(); + MapMessage message = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); Map key = (Map) message.getObject("key"); assertEquals(23, key.get("age")); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java index 957c6446..e08987e2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/QueueTest.java @@ -129,7 +129,7 @@ public void sendJMSRedeliveryCountTest() throws Exception { } try (MessageConsumer consumer1 = session.createConsumer(destination); ) { - Message message = consumer1.receive(); + Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); assertEquals(1, message.getIntProperty("JMSXDeliveryCount")); assertFalse(message.getJMSRedelivered()); @@ -138,7 +138,7 @@ public void sendJMSRedeliveryCountTest() throws Exception { // close consumer, message not acked, so it must be redelivered try (MessageConsumer consumer1 = session.createConsumer(destination); ) { - Message message = consumer1.receive(); + Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", message.getBody(String.class)); // Unfortunately Pulsar does not set properly the redelivery count @@ -223,7 +223,7 @@ public void testQueueBrowsers() throws Exception { try (MessageConsumer consumer1 = session.createConsumer(destination); ) { // consume half queue for (int i = 0; i < numMessages / 2; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consume {}", msg); assertEquals("foo-" + i, msg.getText()); } @@ -242,7 +242,7 @@ public void testQueueBrowsers() throws Exception { try (MessageConsumer consumer1 = session.createConsumer(destination); ) { // consume half queue for (int i = numMessages / 2; i < numMessages; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consume2 {} {}", msg, msg.getJMSMessageID()); assertEquals("foo-" + i, msg.getText()); } @@ -342,7 +342,7 @@ public void useQueueWithoutPulsarAdmin() throws Exception { // even without using PulsarAdmin try (MessageConsumer consumer1 = session.createConsumer(destination); ) { for (int i = 0; i < 10; i++) { - assertEquals("foo-" + i, consumer1.receive().getBody(String.class)); + assertEquals("foo-" + i, consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); } // no more messages @@ -389,9 +389,9 @@ public void customSubscriptionName() throws Exception { // assert that all the consumers receive all the messages for (int i = 0; i < 10; i++) { - assertNotNull(consumer1.receive()); - assertNotNull(consumer2.receive()); - assertNotNull(consumer3.receive()); + assertNotNull(consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); + assertNotNull(consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); + assertNotNull(consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } // verify that we have 3 different subscriptions, with the expected names diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java index 91a2da4d..81104d0c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java @@ -136,7 +136,7 @@ public void sendMessageReceiveFromQueue() throws Exception { } } - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-9", textMessage.getText()); if (useServerSideFiltering) { @@ -192,7 +192,7 @@ public void sendMessageReceiveFromTopicWithSelector() throws Exception { } } - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-9", textMessage.getText()); if (useServerSideFiltering) { @@ -244,7 +244,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector() throws Exc } } - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-9", textMessage.getText()); if (useServerSideFiltering) { @@ -294,7 +294,7 @@ public void sendMessageReceiveFromSharedSubscriptionWithSelector() throws Except } } - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-9", textMessage.getText()); if (useServerSideFiltering) { @@ -369,7 +369,7 @@ public void onException(Message message, Exception e) { CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get(); for (String text : expected) { - PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(); + PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(text, textMessage.getText()); // ensure that it is a batch message @@ -473,7 +473,7 @@ public void onException(Message message, Exception e) { CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get(); for (String text : expected) { - PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(); + PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(text, textMessage.getText()); // ensure that it is a batch message @@ -581,7 +581,7 @@ public void onException(Message message, Exception e) { while (!expected1.isEmpty()) { log.info( "{} messages left for consumer1: {}", expected1.size(), expected1); - PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(); + PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "consumer1 received {} {}", textMessage.getText(), @@ -614,7 +614,7 @@ public void onException(Message message, Exception e) { while (!expected2.isEmpty()) { log.info( "{} messages left for consumer2: {}", expected2.size(), expected2); - PulsarTextMessage textMessage = (PulsarTextMessage) consumer2.receive(); + PulsarTextMessage textMessage = (PulsarTextMessage) consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "consumer2 received {} {}", textMessage.getText(), @@ -707,7 +707,7 @@ public void onException(Message message, Exception e) { CompletableFuture.allOf(handles.toArray(new CompletableFuture[0])).get(); for (String text : expected) { - PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(); + PulsarTextMessage textMessage = (PulsarTextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(text, textMessage.getText()); // ensure that it is a batch message @@ -811,7 +811,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int // with a non partitioned topic we can expect some order (even if it is not required) for (int i = 0; i < 10; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-" + i, textMessage.getText()); } } @@ -820,7 +820,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForTopic(int List received = new ArrayList<>(); for (int i = 0; i < 10; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); received.add(textMessage.getText()); } } @@ -941,7 +941,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int // with a non partitioned topic we can expect some order (even if it is not required) for (int i = 0; i < 10; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-" + i, textMessage.getText()); } } @@ -950,7 +950,7 @@ private void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue(int List received = new ArrayList<>(); for (int i = 0; i < 10; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); received.add(textMessage.getText()); } } @@ -1078,7 +1078,7 @@ public void onException(Message message, Exception e) { } for (int i = 0; i < 20; i++) { if ((i % 2 == 0) && (i % 3 == 0)) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "received {} {}", textMessage.getText(), @@ -1160,7 +1160,7 @@ public void chunkingTest() throws Exception { } } - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(hugePayload + "-9", textMessage.getText()); if (useServerSideFiltering) { @@ -1247,7 +1247,7 @@ public void sendHugeFilterOnServerSideSubscription() throws Exception { for (int i = 0; i < 1000; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-" + i, textMessage.getText()); } } @@ -1316,7 +1316,7 @@ public void sendHugeFilterOnConsumerMetadata() throws Exception { for (int i = 0; i < 1000; i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-" + i, textMessage.getText()); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java index 0b3e5628..726837a5 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SerializableConnectionFactoryTest.java @@ -82,7 +82,7 @@ public void test() throws Exception { try (MessageConsumer consumer = session2.createConsumer(queue)) { connection2.start(); - assertEquals("foo0", consumer.receive().getBody(String.class)); + assertEquals("foo0", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); } // serialise @@ -103,7 +103,7 @@ public void test() throws Exception { con.start(); // consume from previously created Queue - assertEquals("foo" + i, consumer.receive().getBody(String.class)); + assertEquals("foo" + i, consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); i++; // use the Pulsar Producer @@ -113,7 +113,7 @@ public void test() throws Exception { // use Pulsar Consumer try (MessageConsumer consumer2 = session.createConsumer(queue2); ) { - assertEquals("bar", consumer2.receive().getBody(String.class)); + assertEquals("bar", consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class)); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java index 159a491c..04482fa2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SimpleTest.java @@ -259,18 +259,18 @@ public void sendMessageReceive() throws Exception { producer.send(simpleMessage2, DeliveryMode.PERSISTENT, 3, 0); } - TextMessage msg = (TextMessage) consumer.receive(); + TextMessage msg = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", msg.getText()); - ObjectMessage msg2 = (ObjectMessage) consumer.receive(); + ObjectMessage msg2 = (ObjectMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("bar", msg2.getObject()); - BytesMessage msg3 = (BytesMessage) consumer.receive(); + BytesMessage msg3 = (BytesMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(1234, msg3.readInt()); - StreamMessage msg4 = (StreamMessage) consumer.receive(); + StreamMessage msg4 = (StreamMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(1234l, msg4.readLong()); - MapMessage msg5 = (MapMessage) consumer.receive(); + MapMessage msg5 = (MapMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(true, msg5.getBoolean("foo")); assertEquals("test", msg5.getString("bar")); - Message msg6 = consumer.receive(); + Message msg6 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals((byte) 1, msg6.getByteProperty("a")); assertEquals(123232323233L, msg6.getLongProperty("b")); @@ -290,7 +290,7 @@ public void sendMessageReceive() throws Exception { // we are serializing Object properties as strings assertEquals(1.3d, msg6.getObjectProperty("i")); - Message msg7 = consumer.receive(); + Message msg7 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals(DeliveryMode.PERSISTENT, msg7.getJMSDeliveryMode()); assertArrayEquals(new byte[] {1, 2, 3}, msg7.getJMSCorrelationIDAsBytes()); @@ -342,7 +342,7 @@ public void sendMessageReceiveJMSContext() throws Exception { consumer.receiveBody(StringBuffer.class, 1000).toString()); assertArrayEquals(new byte[] {1, 2, 3}, consumer.receiveBody(byte[].class)); assertEquals(Collections.singletonMap("a", "b"), consumer.receiveBody(Map.class)); - Message msg6 = consumer.receive(); + Message msg6 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals((byte) 1, msg6.getByteProperty("a")); assertEquals(123232323233L, msg6.getLongProperty("b")); assertEquals(1232323, msg6.getIntProperty("c")); @@ -376,7 +376,7 @@ public void sendMessageReceiveJMSContext() throws Exception { TextMessage expTextMessage = context.createTextMessage(message); expTextMessage.setStringProperty("COM_SUN_JMS_TESTNAME", "queueReceiveTests"); producer.send(destination, expTextMessage); - TextMessage actTextMessage = (TextMessage) consumer.receive(); + TextMessage actTextMessage = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertNotNull(actTextMessage); ; assertEquals(actTextMessage.getText(), expTextMessage.getText()); @@ -391,7 +391,7 @@ public void sendMessageReceiveJMSContext() throws Exception { producer.send(destination, expTextMessage); actTextMessage = (TextMessage) consumer.receiveNoWait(); if (actTextMessage == null) { - actTextMessage = (TextMessage) consumer.receive(); + actTextMessage = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); } assertNotNull(actTextMessage); assertEquals(actTextMessage.getText(), expTextMessage.getText()); @@ -611,9 +611,9 @@ public void systemNameSpaceTest() throws Exception { } try (MessageConsumer consumer = session.createConsumer(destination); ) { - TextMessage msg1 = (TextMessage) consumer.receive(); + TextMessage msg1 = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo1", msg1.getText()); - TextMessage msg2 = (TextMessage) consumer.receive(); + TextMessage msg2 = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", msg2.getText()); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java index c02fcb4a..07c0af8d 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TemporaryDestinationsTest.java @@ -115,7 +115,7 @@ private void useTemporaryDestinationTest(Function temporar // on the server, receive the request try (MessageConsumer serverSideConsumer = session.createConsumer(serverAddress)) { - Message message = serverSideConsumer.receive(); + Message message = serverSideConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("request", message.getBody(String.class)); Destination jmsReplyTo = message.getJMSReplyTo(); @@ -129,7 +129,7 @@ private void useTemporaryDestinationTest(Function temporar } // on the client receive the response - Message theResponse = consumerClient.receive(); + Message theResponse = consumerClient.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("response", theResponse.getBody(String.class)); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java index a1a158f6..893d7f29 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TimeToLiveTest.java @@ -202,7 +202,7 @@ public void sendMessageReceiveFromTopicWithTimeToLive( // only foo-1, foo-3, foo-5... can be received for (int i = 0; i < 5; i++) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo-" + (i * 2 + 1), textMessage.getText()); } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java index 51e7e76c..17db0190 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TopicTest.java @@ -100,13 +100,13 @@ public void sendMessageReceiveFromTopic() throws Exception { // all of the two consumers receive all of the messages, in order for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer2.receive(); + TextMessage msg = (TextMessage) consumer2.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer2, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -163,14 +163,14 @@ public void useTopicSubscriberApiWithSharedSubscription() throws Exception { // consumer1 receives all messages, in order for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } // let consumer2a receive the first half of the message for (int i = 0; i < 5; i++) { - TextMessage msg = (TextMessage) consumer2a.receive(); + TextMessage msg = (TextMessage) consumer2a.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer2a, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -182,7 +182,7 @@ public void useTopicSubscriberApiWithSharedSubscription() throws Exception { try (TopicSubscriber consumer2b = session.createDurableSubscriber(destination, "subscription2")) { for (int i = 5; i < 10; i++) { - TextMessage msg = (TextMessage) consumer2b.receive(); + TextMessage msg = (TextMessage) consumer2b.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer2b, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -267,14 +267,14 @@ private void testSharedDurableConsumer(SubscriptionType subscriptionType) throws // consumer1 receives all messages, in order for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } // consumer3, receive a few messages, then close the consumer for (int i = 0; i < 5; i++) { - TextMessage msg = (TextMessage) consumer3.receive(); + TextMessage msg = (TextMessage) consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer3, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -284,7 +284,7 @@ private void testSharedDurableConsumer(SubscriptionType subscriptionType) throws try (MessageConsumer consumer3b = session.createSharedDurableConsumer(destination, "subscription3"); ) { for (int i = 5; i < 10; i++) { - TextMessage msg = (TextMessage) consumer3b.receive(); + TextMessage msg = (TextMessage) consumer3b.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer3b, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -357,14 +357,14 @@ private void testSharedNonDurableConsumer(SubscriptionType subscriptionType) thr // consumer1 receives all messages, in order for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } // consumer3, receive a few messages, then close the consumer for (int i = 0; i < 5; i++) { - TextMessage msg = (TextMessage) consumer3.receive(); + TextMessage msg = (TextMessage) consumer3.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } @@ -428,7 +428,7 @@ public void testUseKeySharedSubscriptionTypeforTopicConsumer() throws Exception // consumer1 receives all messages, in order for (int i = 0; i < 10; i++) { - TextMessage msg = (TextMessage) consumer1.receive(); + TextMessage msg = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("consumer {} received {}", consumer1, msg.getText()); assertEquals("foo-" + i, msg.getText()); } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index bda6b025..1be2350b 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -103,8 +103,8 @@ public void sendMessageTest() throws Exception { transaction.commit(); // message is now visible to consumers - assertNotNull(consumer.receive()); - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } } @@ -206,7 +206,7 @@ public void consumeTransactionTest() throws Exception { producer.send(textMsg); } - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getBody(String.class)); } @@ -245,11 +245,11 @@ public void multiCommitTest() throws Exception { producer.send(producerSession.createTextMessage("foo1")); } - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo0", receive.getBody(String.class)); transaction.commit(); - receive = consumer.receive(); + receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo1", receive.getBody(String.class)); transaction.commit(); } @@ -290,7 +290,7 @@ public void consumeRollbackTransactionTest() throws Exception { producer.send(textMsg); } - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getBody(String.class)); } @@ -300,7 +300,7 @@ public void consumeRollbackTransactionTest() throws Exception { // the consumer rolledback the transaction, now we can receive the message from // another client try (MessageConsumer consumer = producerSession.createConsumer(destination); ) { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } } @@ -334,7 +334,7 @@ public void consumeRollbackTransaction2Test() throws Exception { producer.send(textMsg); } - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getBody(String.class)); // rollback before closing Consumer @@ -344,7 +344,7 @@ public void consumeRollbackTransaction2Test() throws Exception { // the consumer rolledback the transaction, now we can receive the message from // another client try (MessageConsumer consumer = producerSession.createConsumer(destination); ) { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } } @@ -390,7 +390,7 @@ public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exceptio } // transactional consumer, receives but it does not commit - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo0", receive.getBody(String.class)); // the QueueBrowser still sees the message @@ -411,7 +411,7 @@ public void consumeAutoRollbackTransactionTestWithQueueBrowser() throws Exceptio try (Session secondSession = connection2.createSession(); MessageConsumer consumer = secondSession.createConsumer(destination); ) { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); // it looks like peekMessage is not following the subscription in realtime Thread.sleep(2000); @@ -462,18 +462,18 @@ public void rollbackReceivedMessages() throws Exception { } } - TextMessage receive = (TextMessage) consumer.receive(); + TextMessage receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("receive and commit {}", receive.getText()); assertEquals(numMessages, countMessages(producerSession, destination)); transaction.commit(); assertEquals(numMessages - 1, countMessages(producerSession, destination)); - receive = (TextMessage) consumer.receive(); + receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("receive and rollback {}", receive.getText()); transaction.rollback(); assertEquals(numMessages - 1, countMessages(producerSession, destination)); - receive = (TextMessage) consumer.receive(); + receive = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info("receive {}", receive.getText()); assertEquals(numMessages - 1, countMessages(producerSession, destination)); @@ -537,7 +537,7 @@ public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception { } // transactional consumer, receives but it does not commit - Message receive = consumer.receive(); + Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo0", receive.getBody(String.class)); // the QueueBrowser still sees the message @@ -558,7 +558,7 @@ public void consumeRollbackTransactionTestWithQueueBrowser() throws Exception { try (Session secondSession = connection2.createSession(); MessageConsumer consumer = secondSession.createConsumer(destination); ) { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); // it looks like peekMessage is not following the subscription in realtime Thread.sleep(2000); @@ -682,13 +682,13 @@ public void onException(Message message, Exception e) {} // message is now visible to consumers // verify that the two messages are part of the same batch - PulsarMessage message1 = (PulsarMessage) consumer.receive(); + PulsarMessage message1 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); org.apache.pulsar.client.api.Message receivedPulsarMessage1 = message1.getReceivedPulsarMessage(); BatchMessageIdImpl messageId1 = (BatchMessageIdImpl) receivedPulsarMessage1.getMessageId(); - PulsarMessage message2 = (PulsarMessage) consumer.receive(); + PulsarMessage message2 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); org.apache.pulsar.client.api.Message receivedPulsarMessage2 = message2.getReceivedPulsarMessage(); BatchMessageIdImpl messageId2 = @@ -731,8 +731,8 @@ public void emulatedTransactionsTest() throws Exception { transaction.commit(); - assertNotNull(consumer.receive()); - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); consumerSession.commit(); } @@ -989,7 +989,7 @@ public void consumeProduceScenario() throws Exception { transaction.commit(); } - Message receive = transactionConsumer.receive(); + Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receive.getBody(String.class)); log.info("received {}", receive.getJMSMessageID()); assertEquals(sentMessageID, receive.getJMSMessageID()); @@ -997,7 +997,7 @@ public void consumeProduceScenario() throws Exception { ; transaction.commit(); - Message receiveOtherSession = consumerOtherSession.receive(); + Message receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo", receiveOtherSession.getBody(String.class)); log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID()); assertEquals(sentMessageID, receiveOtherSession.getJMSMessageID()); @@ -1059,12 +1059,12 @@ public void testMixedProducesScenario() throws Exception { transaction.commit(); } - Message receive = transactionConsumer.receive(); + Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo1", receive.getBody(String.class)); log.info("received {}", receive.getJMSMessageID()); assertEquals(sentMessageID, receive.getJMSMessageID()); - Message receive2 = transactionConsumer.receive(); + Message receive2 = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receive2.getBody(String.class)); log.info("received {}", receive2.getJMSMessageID()); assertEquals(sentMessageID2, receive2.getJMSMessageID()); @@ -1072,12 +1072,12 @@ public void testMixedProducesScenario() throws Exception { transaction.commit(); - Message receiveOtherSession = consumerOtherSession.receive(); + Message receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo1", receiveOtherSession.getBody(String.class)); log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID()); assertEquals(sentMessageID, receiveOtherSession.getJMSMessageID()); - receiveOtherSession = consumerOtherSession.receive(); + receiveOtherSession = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("foo2", receiveOtherSession.getBody(String.class)); log.info("receivedOtherSession {}", receiveOtherSession.getJMSMessageID()); assertEquals(sentMessageID2, receiveOtherSession.getJMSMessageID()); @@ -1129,8 +1129,8 @@ public void testMixedConsumersOnSharedSubscription() throws Exception { // the message is automatically negativelity acknoledged // and the broker dispatches the message again // so eventually the message will be received by the first consumer - Message receive = transactionConsumer.receive(); - Message receive2 = consumerOtherSession.receive(); + Message receive = transactionConsumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); + Message receive2 = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); receive2.acknowledge(); transaction.rollback(); @@ -1138,7 +1138,7 @@ public void testMixedConsumersOnSharedSubscription() throws Exception { transactionConsumer.close(); // the first message one is to be delivered again - Message receive3 = consumerOtherSession.receive(); + Message receive3 = consumerOtherSession.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); receive3.acknowledge(); assertEquals(receive.getBody(String.class), receive3.getBody(String.class)); @@ -1195,7 +1195,7 @@ public void sendMessageWithPartitionStickKeyTest() throws Exception { Map> messagesByPartition = new HashMap<>(); for (int i = 0; i < 10 * 4; i++) { - PulsarMessage message = (PulsarMessage) consumer.receive(); + PulsarMessage message = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); String receivedTopicName = message.getReceivedPulsarMessage().getTopicName(); log.info("message {} {}", receivedTopicName, message); messagesByPartition @@ -1233,7 +1233,7 @@ public void sendMessageWithPartitionStickKeyTest() throws Exception { } for (int i = 0; i < 8; i++) { - PulsarMessage message = (PulsarMessage) consumer.receive(); + PulsarMessage message = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); String receivedTopicName = message.getReceivedPulsarMessage().getTopicName(); log.info("messageAfter {} {}", receivedTopicName, message); messagesByPartition diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java index 63de7bbb..4aff5d71 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UnsubscribeTest.java @@ -71,13 +71,13 @@ public void unsubscribeTest() throws Exception { producer.send(textMsg); producer.send(textMsg); producer.send(textMsg); - assertNotNull(consumer.receive()); - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); // leave two messages to be consumed } try (MessageConsumer consumer = session.createSharedDurableConsumer(destination, "sub1")) { - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } // // deleting the subscription session.unsubscribe("sub1"); @@ -89,7 +89,7 @@ public void unsubscribeTest() throws Exception { assertNull(consumer.receive(1000)); producer.send(textMsg); - assertNotNull(consumer.receive()); + assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT)); } } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java index cc17d6e3..71f7ef43 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/VirtualDestinationsConsumerTest.java @@ -143,7 +143,7 @@ public void testMultiTopic(int numPartitions, boolean useRegExp) throws Exceptio try (MessageConsumer consumer = session.createConsumer(destination); ) { for (int i = 0; i < count; i++) { - String payload = consumer.receive().getBody(String.class); + String payload = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class); log.info("Received {}, remaining {}", payload, payloads.size()); assertFalse(payloads.remove(payload)); } @@ -272,7 +272,7 @@ public void sendUsingExistingPulsarSubscriptionWithServerSideFilterForQueue( List received = new ArrayList<>(); for (int i = 0; i < 10 * destinationsToWrite.size(); i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); log.info( "Received {} from {}", textMessage.getText(), textMessage.getJMSDestination()); received.add(textMessage.getText()); @@ -401,7 +401,7 @@ public void sendUsingExistingPulsarSubscriptionWithClientSideFilterForPartitione List received = new ArrayList<>(); for (int i = 0; i < 10 * destinationsToWrite.size(); i++) { if (i % 2 == 0) { - TextMessage textMessage = (TextMessage) consumer1.receive(); + TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); received.add(textMessage.getText()); totalReceived++; } @@ -481,7 +481,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti try (MessageConsumer consumer = session.createConsumer(destination); ) { for (int i = 0; i < count; i++) { - String payload = consumer.receive().getBody(String.class); + String payload = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class); log.info("Received {}, remaining {}", payload, payloads.size()); assertFalse(payloads.remove(payload)); } @@ -508,7 +508,7 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti producer.send(newDestination, nextMessage); log.info("id: {}", nextMessage.getJMSMessageID()); - TextMessage received = (TextMessage) consumer.receive(); + TextMessage received = (TextMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT); assertEquals("new", received.getText()); Field selectorSupportOnSubscriptions = diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java index b51e88e3..055fce94 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarCluster.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup; @@ -29,6 +30,8 @@ /** Pulsar cluster. */ public class PulsarCluster implements AutoCloseable { + + public static final long DEFAULT_RECEIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(2); private final PulsarService service; private final BookKeeperCluster bookKeeperCluster; @@ -85,9 +88,7 @@ public void start() throws Exception { .clusters() .createCluster( "localhost", - ClusterData.builder() - .brokerServiceUrl(service.getBrokerServiceUrl()) - .build()); + ClusterData.builder().brokerServiceUrl(service.getBrokerServiceUrl()).build()); service .getAdminClient() .tenants()