Skip to content

Commit

Permalink
tune surefire
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Apr 8, 2024
1 parent a9212ec commit ddeb917
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 145 deletions.
13 changes: 11 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<slf4j.version>1.7.30</slf4j.version>
<logback.version>1.2.3</logback.version>
<junit.version>5.7.1</junit.version>
<surefire.version>3.0.0-M5</surefire.version>
<surefire.version>3.1.0</surefire.version>
<jackson.version>2.14.2</jackson.version>
<gson.version>2.8.9</gson.version>
<commons-compress.version>1.21</commons-compress.version>
Expand Down Expand Up @@ -316,7 +316,16 @@ limitations under the License.]]></inlineHeader>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<argLine>${test.additional.args}</argLine>
<forkCount>1</forkCount>
<argLine>
-XX:+ExitOnOutOfMemoryError -Xmx2G -XX:+UseZGC
-Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
-Dpulsar.allocator.out_of_memory_policy=FallbackToHeap
-Dio.netty.tryReflectionSetAccessible=true
${test.additional.args}
</argLine>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<logback.configurationFile>${project.basedir}/src/test/resources/logback-test.xml</logback.configurationFile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testAUTO_ACKNOWLEDGE() throws Exception {
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is automatically acknowledged on receive,
// but, as we are not setting ackReceiptEnabled, the acknowledgement does not wait
// for the server to return success or failure
Expand Down Expand Up @@ -129,7 +129,7 @@ public void onException(Message message, Exception e) {}
// that this test is notably faster
try (MessageConsumer consumer = session.createConsumer(destination); ) {
for (int i = 0; i < 1000; i++) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
log.info("ack {}", i);
// message is automatically acknowledged on receive
}
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testADUPS_OK_ACKNOWLEDGE() throws Exception {
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is automatically acknowledged on receive, but best effort and async
}
// give time for the async ack
Expand Down Expand Up @@ -196,18 +196,18 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception {
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive

// closing the consumer
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
Message receive = consumer.receive();
Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));

Message receive2 = consumer.receive();
Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));

// ack only message1, this automatically acks all the other messages
Expand Down Expand Up @@ -246,18 +246,18 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive

// closing the consumer
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
Message receive = consumer.receive();
Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));

Message receive2 = consumer.receive();
Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));

// ack only message1,
Expand All @@ -267,7 +267,7 @@ public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
try (MessageConsumer consumer = session.createConsumer(destination); ) {

// message2 is still there
Message receive2 = consumer.receive();
Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));

assertNull(consumer.receive(100));
Expand Down Expand Up @@ -338,20 +338,20 @@ public void onException(Message message, Exception e) {}
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
assertEquals("foo", consumer.receive().getStringProperty("test"));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getStringProperty("test"));
// message is not automatically acknowledged on receive

// closing the consumer
}

try (MessageConsumer consumer = session.createConsumer(destination); ) {
// receive and ack
PulsarMessage receive = (PulsarMessage) consumer.receive();
PulsarMessage receive = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));
assertTrue(
receive.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl);

PulsarMessage receive2 = (PulsarMessage) consumer.receive();
PulsarMessage receive2 = (PulsarMessage) consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));
assertTrue(
receive2.getReceivedPulsarMessage().getMessageId() instanceof BatchMessageIdImpl);
Expand All @@ -367,7 +367,7 @@ public void onException(Message message, Exception e) {}
// see PIP-54

// message2 is still there
Message receive2 = consumer.receive();
Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));

assertNull(consumer.receive(100));
Expand All @@ -376,11 +376,11 @@ public void onException(Message message, Exception e) {}
receive2.acknowledge();
} else {
// message1 is still there, because we haven't fully acknowledged the Batch
Message receive = consumer.receive();
Message receive = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", receive.getStringProperty("test"));

// message2 is still there
Message receive2 = consumer.receive();
Message receive2 = consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo2", receive2.getStringProperty("test"));

assertNull(consumer.receive(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,6 @@ private void produce(PulsarSession session, Destination destination) throws JMSE
}

private void consume(MessageConsumer consumer) throws JMSException {
assertNotNull(consumer.receive());
assertNotNull(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void pausedConnectionTest() throws Exception {

// block until the connection is started
// the connection will be started in 5 seconds and the test won't be stuck
assertEquals("foo", consumer.receive().getBody(String.class));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));

connection.stop();

Expand All @@ -107,7 +107,7 @@ public void pausedConnectionTest() throws Exception {
// now we are able to receive all of the remaining messages
assertEquals("foo", consumer.receive(2000).getBody(String.class));
assertEquals("foo", consumer.receiveNoWait().getBody(String.class));
assertEquals("foo", consumer.receive().getBody(String.class));
assertEquals("foo", consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT).getBody(String.class));

} finally {
executeLater.shutdown();
Expand Down Expand Up @@ -144,7 +144,7 @@ public void stopConnectionMustWaitForPendingReceive() throws Exception {
// no message in the topic, so this consumer will hang
beforeReceive.countDown();
log.info("receiving...");
consumerResult.complete(consumer.receive());
consumerResult.complete(consumer.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
} catch (Throwable err) {
consumerResult.completeExceptionally(err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,18 +199,18 @@ private void performTest(
producer.send(session.createTextMessage("foo"));
}

Message message = consumer1.receive();
Message message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());

// message is re-delivered again after ackTimeoutMillis
message = consumer1.receive();
message = consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());

message = consumerDeadLetter.receive();
message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);
// this is another topic, and the JMSXDeliveryCount is only handled on the client side
Expand Down Expand Up @@ -335,7 +335,7 @@ public void onException(Message message, Exception e) {}
// receive all the messages, without acknowledging them.
// they will be redelivered multiple-times
while (true) {
PulsarMessage message = (PulsarMessage) consumer1.receive();
PulsarMessage message = (PulsarMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
int _JMSXDeliveryCount = message.getIntProperty("JMSXDeliveryCount");
counterByMessageId.put(message.getJMSMessageID(), _JMSXDeliveryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void doNotPrecreateQueueSubscriptionTest() throws Exception {

try (MessageConsumer consumer1 = session.createConsumer(destinationWithSubscription)) {
for (int i = 0; i < 10; i++) {
assertNotNull(consumer1.receive());
assertNotNull(consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}

// verify that we have 1 subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void sendMessageReceiveFromQueueWithNoLocal(
}

// we must be able to receive the message from the second connection
TextMessage textMessage = (TextMessage) consumer1.receive();
TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
Expand Down Expand Up @@ -157,7 +157,7 @@ public void sendMessageReceiveFromTopicWithNoLocal(
}

// we must be able to receive the message from the second connection
TextMessage textMessage = (TextMessage) consumer1.receive();
TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ public void sendMessageReceiveFromExclusiveSubscriptionWithSelector(
}

// we must be able to receive the message from the second connection
TextMessage textMessage = (TextMessage) consumer1.receive();
TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ public void sendMessageReceiveFromSharedSubscriptionWithNoLocal(
}

// we must be able to receive the message from the second connection
TextMessage textMessage = (TextMessage) consumer1.receive();
TextMessage textMessage = (TextMessage) consumer1.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("test", textMessage.getText());
}
}
Expand Down Expand Up @@ -318,7 +318,7 @@ public void acknowledgeRejectedMessagesTest(
try (MessageConsumer consumerAllowLocal =
session.createConsumer(destination, null, false); ) {
for (int i = 0; i < 10; i++) {
assertNotNull(consumerAllowLocal.receive());
assertNotNull(consumerAllowLocal.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT));
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ public void overrideDQLConfigurationWithJMSContext() throws Exception {

primaryContext.createProducer().send(destination, "foo");

Message message = consumerWithDLQConfiguration.receive();
Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());

// message is re-delivered again after ackTimeoutMillis
message = consumerWithDLQConfiguration.receive();
message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());

try (JMSConsumer consumerDeadLetter =
primaryContext.createSharedConsumer(destinationDeadletter, "dqlsub"); ) {

message = consumerDeadLetter.receive();
message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);

Expand Down Expand Up @@ -143,21 +143,21 @@ public void overrideDQLConfigurationWithSession() throws Exception {

primarySession.createProducer(destination).send(primarySession.createTextMessage("foo"));

Message message = consumerWithDLQConfiguration.receive();
Message message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(1, message.getIntProperty("JMSXDeliveryCount"));
assertFalse(message.getJMSRedelivered());

// message is re-delivered again after ackTimeoutMillis
message = consumerWithDLQConfiguration.receive();
message = consumerWithDLQConfiguration.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
assertEquals(2, message.getIntProperty("JMSXDeliveryCount"));
assertTrue(message.getJMSRedelivered());

try (MessageConsumer consumerDeadLetter =
primarySession.createSharedConsumer(destinationDeadletter, "dqlsub"); ) {

message = consumerDeadLetter.receive();
message = consumerDeadLetter.receive(com.datastax.oss.pulsar.jms.utils.PulsarCluster.DEFAULT_RECEIVE_TIMEOUT);
assertEquals("foo", message.getBody(String.class));
log.info("DLQ MESSAGE {}", message);

Expand Down
Loading

0 comments on commit ddeb917

Please sign in to comment.