Skip to content

Commit

Permalink
Fix test and also wait for Pulsar to be ready
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Apr 9, 2024
1 parent d790331 commit ca695f1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.impl.auth.AuthenticationToken;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
Expand Down Expand Up @@ -69,12 +68,12 @@ public class PriorityTest {
@Override
@SneakyThrows
public void accept(PulsarContainerExtension pulsarContainerExtension) {
Set<String> clusters =
new HashSet<>(pulsarContainerExtension.getAdmin().clusters().getClusters());
pulsarContainerExtension
.getAdmin()
.tenants()
.createTenant(
"foo",
TenantInfo.builder().allowedClusters(ImmutableSet.of("pulsar")).build());
.createTenant("foo", TenantInfo.builder().allowedClusters(clusters).build());
pulsarContainerExtension
.getAdmin()
.namespaces()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -463,20 +462,20 @@ public void testPatternConsumerAddingTopicWithServerSideFilters() throws Excepti
factory.getPulsarAdmin().topics().createNonPartitionedTopic(topicName);
// await that the consumer session creates the subscription, then we update it
Awaitility.await()
.untilAsserted(() -> {
List<String> subs = pulsarContainer
.getAdmin()
.topics().getSubscriptions(topicName);
.untilAsserted(
() -> {
List<String> subs =
pulsarContainer.getAdmin().topics().getSubscriptions(topicName);
assertEquals(subs.size(), 1);
assertTrue(subs.contains("jms-queue"));
});
Map<String, String> subscriptionProperties = new HashMap<>();
subscriptionProperties.put("jms.selector", "keepme=TRUE");
subscriptionProperties.put("jms.filtering", "true");
pulsarContainer
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties);
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties);

Queue newDestination = session.createQueue(topicName);
TextMessage nextMessage = session.createTextMessage("new");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package com.datastax.oss.pulsar.jms.utils;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -67,20 +71,28 @@ public void afterAll(ExtensionContext extensionContext) {
@SneakyThrows
public void beforeAll(ExtensionContext extensionContext) {
network = Network.newNetwork();
CountDownLatch pulsarReady = new CountDownLatch(1);
pulsarContainer =
new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE))
.withNetwork(network)
.withEnv(env)
.withLogConsumer(
outputFrame -> log.debug("pulsar> {}", outputFrame.getUtf8String().trim()))
(f) -> {
String text = f.getUtf8String().trim();
if (text.contains("messaging service is ready")) {
pulsarReady.countDown();
}
System.out.println(text);
})
.withCopyFileToContainer(
MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters");
// start Pulsar and wait for it to be ready to accept requests
pulsarContainer.start();
assertTrue(pulsarReady.await(1, TimeUnit.MINUTES));
admin =
PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080))
.build();
PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080))
.build();
if (onContainerReady != null) {
onContainerReady.accept(this);
}
Expand Down

0 comments on commit ca695f1

Please sign in to comment.