Skip to content

Commit

Permalink
Added the java4.x client cluster filter pull test content (#51)
Browse files Browse the repository at this point in the history
* Added the java4.x client cluster filter pull test content

* Improved the test content, adding the test content of batchproducer, transactions, loadbacing, offset, retry and other scenarios

* Refine old tests and add retry test scenarios

* Refine old tests and normalize non-standard code
  • Loading branch information
Zintoki authored Oct 20, 2023
1 parent b035dd0 commit 51a01c1
Show file tree
Hide file tree
Showing 48 changed files with 6,310 additions and 345 deletions.
7 changes: 6 additions & 1 deletion java/e2e-v4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>logback-core</artifactId>
<version>1.3.0-beta0</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -144,7 +149,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
<argLine>-Xmx1024m</argLine>
<rerunFailingTestsCount>1</rerunFailingTestsCount>
<trimStackTrace>false</trimStackTrace>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,156 @@

package org.apache.rocketmq.client.rmq;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.AbstractMQConsumer;
import org.apache.rocketmq.listener.AbstractListener;
import org.apache.rocketmq.listener.rmq.concurrent.RMQIdempotentListener;
import org.apache.rocketmq.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.listener.rmq.concurrent.RMQOrderListener;
import org.apache.rocketmq.utils.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RMQNormalConsumer extends AbstractMQConsumer {
private static Logger logger = LoggerFactory.getLogger(RMQNormalConsumer.class);

private DefaultMQPushConsumer consumer;
private DefaultMQPushConsumer pushConsumer;
private DefaultLitePullConsumer litePullConsumer;
private DefaultMQPullConsumer pullConsumer;
private AbstractListener listener = null;

public RMQNormalConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
this.pushConsumer = consumer;
}

public RMQNormalConsumer(DefaultLitePullConsumer consumer) {
this.litePullConsumer = consumer;
}

public RMQNormalConsumer(DefaultMQPullConsumer consumer) {
this.pullConsumer = consumer;
}

public void subscribeAndStart(String topic, String tag, RMQNormalListener listener) {
Assertions.assertNotNull(pushConsumer);
this.listener = listener;
try {
consumer.subscribe(topic, tag);
consumer.setMessageListener(listener);
consumer.start();
pushConsumer.subscribe(topic, tag);
pushConsumer.setMessageListener(listener);
pushConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPushConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQPushConsumer started - topic: {}, tag: {}", topic, tag);
}

public void subscribeAndStart(String topic, String tag, RMQIdempotentListener listener) {
Assertions.assertNotNull(pushConsumer);
this.listener = listener;
try {
pushConsumer.subscribe(topic, tag);
pushConsumer.setMessageListener(listener);
pushConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPushConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQPushConsumer started - topic: {}, tag: {}", topic, tag);
}

public void subscribeAndStartLitePull(String topic, String tag) {
Assertions.assertNotNull(litePullConsumer);
try {
litePullConsumer.subscribe(topic, tag);
litePullConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQLitePullConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQLitePullConsumer started - topic: {}, tag: {}", topic, tag);
}

public void startLitePullAssignMode() {
Assertions.assertNotNull(litePullConsumer);
try {
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQLitePullConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQLitePullConsumer Assign Mode started");
}

public void startDefaultPull() {
Assertions.assertNotNull(pullConsumer);
try {
pullConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPullConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQPullConsumer started");
}

public void subscribeAndStartLitePull(String topic, MessageSelector messageSelector) {
Assertions.assertNotNull(litePullConsumer);
try {
litePullConsumer.subscribe(topic, messageSelector);
litePullConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPushConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQLitePullConsumer started - topic: {}, sql: {}", topic, messageSelector.getExpression());
}

public void subscribeAndStart(String topic, String tag, RMQOrderListener listener) {
Assertions.assertNotNull(pushConsumer);
this.listener = listener;
try {
consumer.subscribe(topic, tag);
consumer.setMessageListener(listener);
consumer.start();
pushConsumer.subscribe(topic, tag);
pushConsumer.setMessageListener(listener);
pushConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPushConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQPushConsumer started - topic: {}, tag: {}", topic, tag);
}

public void subscribeAndStart(String topic, MessageSelector messageSelector, RMQNormalListener listener) {
Assertions.assertNotNull(pushConsumer);
this.listener = listener;
try {
consumer.subscribe(topic, messageSelector);
consumer.setMessageListener(listener);
consumer.start();
pushConsumer.subscribe(topic, messageSelector);
pushConsumer.setMessageListener(listener);
pushConsumer.start();
} catch (MQClientException e) {
logger.info("Start DefaultMQPushConsumer failed, {}", e.getMessage());
}
logger.info("DefaultMQPushConsumer started - topic: {}, messageSelector: {}", topic, messageSelector.getExpression());
logger.info("DefaultMQPushConsumer started - topic: {}, messageSelector: {}", topic,
messageSelector.getExpression());
TestUtils.waitForSeconds(5);
}

@Override
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
if (pushConsumer != null) {
pushConsumer.shutdown();
logger.info("DefaultMQPushConsumer shutdown !!!");
}
if (litePullConsumer != null) {
litePullConsumer.shutdown();
logger.info("DefaultLitePullConsumer shutdown !!!");
}
}

public DefaultMQPushConsumer getConsumer() {
return consumer;
public DefaultMQPushConsumer getPushConsumer() {
return pushConsumer;
}

public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
public void setPushConsumer(DefaultMQPushConsumer consumer) {
this.pushConsumer = consumer;
}

public AbstractListener getListener() {
Expand All @@ -98,4 +176,20 @@ public AbstractListener getListener() {
public void setListener(AbstractListener listener) {
this.listener = listener;
}

public DefaultLitePullConsumer getLitePullConsumer() {
return litePullConsumer;
}

public void setLitePullConsumer(DefaultLitePullConsumer consumer) {
this.litePullConsumer = consumer;
}

public DefaultMQPullConsumer getPullConsumer() {
return pullConsumer;
}

public void setPullConsumer(DefaultMQPullConsumer consumer) {
this.pullConsumer = consumer;
}
}
Loading

0 comments on commit 51a01c1

Please sign in to comment.