Skip to content

Commit

Permalink
tests: use pulsar server in docker
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Apr 9, 2024
1 parent bcb6fc6 commit a703872
Show file tree
Hide file tree
Showing 30 changed files with 660 additions and 1,413 deletions.
17 changes: 9 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@
<hawtbuf.version>1.11</hawtbuf.version>
<curator.version>5.1.0</curator.version>
<slf4j.version>1.7.30</slf4j.version>
<logback.version>1.2.3</logback.version>
<junit.version>5.7.1</junit.version>
<surefire.version>3.0.0-M5</surefire.version>
<surefire.version>3.1.0</surefire.version>
<jackson.version>2.14.2</jackson.version>
<gson.version>2.8.9</gson.version>
<commons-compress.version>1.21</commons-compress.version>
<awaitility.version>4.0.3</awaitility.version>
<maven.antrun.plugin.version>3.1.0</maven.antrun.plugin.version>
<snakeyaml.version>2.0</snakeyaml.version>
<testcontainers.version>1.18.3</testcontainers.version>
<!-- required for running tests on JDK11+ -->
<test.additional.args>--add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
<!--Mockito-->--add-opens java.base/java.io=ALL-UNNAMED
Expand Down Expand Up @@ -214,6 +214,13 @@
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
Expand Down Expand Up @@ -320,19 +327,13 @@ limitations under the License.]]></inlineHeader>
<configuration>
<argLine>${test.additional.args}</argLine>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<logback.configurationFile>${project.basedir}/src/test/resources/logback-test.xml</logback.configurationFile>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<logback.configurationFile>${project.basedir}/src/test/resources/logback-test.xml</logback.configurationFile>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
Expand Down
21 changes: 13 additions & 8 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,28 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>${pulsar.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import com.datastax.oss.pulsar.jms.utils.PulsarCluster;
import java.nio.file.Path;
import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -38,36 +37,20 @@
import javax.jms.TextMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.extension.RegisterExtension;

@Timeout(30)
@Slf4j
public class AcknowledgementModeTest {

@TempDir public static Path tempDir;
private static PulsarCluster cluster;

@BeforeAll
public static void before() throws Exception {
cluster = new PulsarCluster(tempDir);
cluster.start();
}

@AfterAll
public static void after() throws Exception {
if (cluster != null) {
cluster.close();
}
}
@RegisterExtension
static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension();

@Test
public void testAUTO_ACKNOWLEDGE() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();
Expand Down Expand Up @@ -97,8 +80,7 @@ public void testAUTO_ACKNOWLEDGE() throws Exception {

@Test
public void testAUTO_ACKNOWLEDGE_ackReceipt() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("ackReceiptEnabled", true);
properties.put("consumerConfig", consumerConfig);
Expand Down Expand Up @@ -145,8 +127,7 @@ public void onException(Message message, Exception e) {}

@Test
public void testADUPS_OK_ACKNOWLEDGE() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();
Expand Down Expand Up @@ -176,8 +157,7 @@ public void testADUPS_OK_ACKNOWLEDGE() throws Exception {

@Test()
public void testACLIENT_ACKNOWLEDGE() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();
Expand Down Expand Up @@ -225,8 +205,7 @@ public void testACLIENT_ACKNOWLEDGE() throws Exception {

@Test
public void testINDIVIDUAL_ACKNOWLEDGE() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (Connection connection = factory.createConnection()) {
connection.start();
Expand Down Expand Up @@ -290,8 +269,7 @@ public void testINDIVIDUAL_ACKNOWLEDGEWithBatchingWithoutBatchIndexAckEnabled()

private void testINDIVIDUAL_ACKNOWLEDGEWithBatching(boolean batchIndexAckEnabled)
throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("batchingEnabled", "true");
producerConfig.put("batchingMaxPublishDelayMicros", "1000000");
Expand Down Expand Up @@ -403,8 +381,7 @@ public void onException(Message message, Exception e) {}

@Test
public void testAutoNackWrongType() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (JMSContext session = factory.createContext()) {
Queue destination =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

import com.datastax.oss.pulsar.jms.utils.PulsarCluster;
import java.nio.file.Path;
import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -40,45 +39,24 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.internal.util.reflection.Whitebox;

@Slf4j
public class BasicServerSideFilterTest {

@TempDir public static Path tempDir;
private static PulsarCluster cluster;

public BasicServerSideFilterTest() {}

@BeforeAll
public static void before() throws Exception {
cluster =
new PulsarCluster(
tempDir,
(config) -> {
config.setTransactionCoordinatorEnabled(false);
config.setAllowAutoTopicCreation(false);
});
cluster.start();
}

@AfterAll
public static void after() throws Exception {
if (cluster != null) {
cluster.close();
}
}
@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false")
.withEnv("PULSAR_PREFIX_allowAutoTopicCreation", "false");

static int refreshServerSideFiltersPeriod = 10;

private Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put("webServiceUrl", cluster.getAddress());
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();

properties.put("jms.useServerSideFiltering", "true");
properties.put("jms.refreshServerSideFiltersPeriod", refreshServerSideFiltersPeriod);
Expand All @@ -105,15 +83,11 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
String topicName = "topic-with-sub-" + UUID.randomUUID();
String topicName2 = "topic-with-sub-" + UUID.randomUUID();
if (numPartitions > 0) {
cluster
.getService()
.getAdminClient()
.topics()
.createPartitionedTopic(topicName, numPartitions);

cluster
.getService()
.getAdminClient()

pulsarContainer.getAdmin().topics().createPartitionedTopic(topicName, numPartitions);

pulsarContainer
.getAdmin()
.namespaces()
.setAutoTopicCreation(
"public/default",
Expand All @@ -123,10 +97,9 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
.allowAutoTopicCreation(true)
.build());
} else {
cluster.getService().getAdminClient().topics().createNonPartitionedTopic(topicName);
cluster
.getService()
.getAdminClient()
pulsarContainer.getAdmin().topics().createNonPartitionedTopic(topicName);
pulsarContainer
.getAdmin()
.namespaces()
.setAutoTopicCreation(
"public/default",
Expand All @@ -144,9 +117,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
subscriptionProperties.put("jms.filtering", "true");

// create a Subscription with a selector
cluster
.getService()
.getAdminClient()
pulsarContainer
.getAdmin()
.topics()
.createSubscription(
topicName, subscriptionName, MessageId.earliest, false, subscriptionProperties);
Expand All @@ -172,7 +144,7 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
}

// unload the topic
cluster.getService().getAdminClient().topics().unload(topicName);
pulsarContainer.getAdmin().topics().unload(topicName);

try (PulsarMessageConsumer consumer1 =
session.createSharedDurableConsumer(destination, subscriptionName, null); ) {
Expand Down Expand Up @@ -252,9 +224,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
subscriptionProperties.put("jms.selector", newSelector);
subscriptionProperties.put("jms.filtering", "true");

cluster
.getService()
.getAdminClient()
pulsarContainer
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, subscriptionName, subscriptionProperties);

Expand All @@ -275,9 +246,8 @@ public void downloadSubscriptionProperties(int numPartitions) throws Exception {
subscriptionProperties.put("jms.selector", newSelector);
subscriptionProperties.put("jms.filtering", "false");

cluster
.getService()
.getAdminClient()
pulsarContainer
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, subscriptionName, subscriptionProperties);

Expand Down
Loading

0 comments on commit a703872

Please sign in to comment.