diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 8decde1c999ca..94a936ded4b3d 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -603,6 +603,9 @@ jobs: - name: Metrics group: METRICS + - name: Upgrade + group: UPGRADE + steps: - name: checkout uses: actions/checkout@v4 @@ -648,6 +651,18 @@ jobs: run: | $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-java-test-image + - name: Pull apachepulsar/pulsar:2.10.6 + run: | + docker pull apachepulsar/pulsar:2.10.6 + + - name: Pull apachepulsar/pulsar:3.0.5 + run: | + docker pull apachepulsar/pulsar:3.0.5 + + - name: Pull alpine:3.20.1 + run: | + docker pull alpine:3.20.1 + - name: Run setup commands if: ${{ matrix.setup }} run: | diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 2d82fce08878d..63b92d4e0a798 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -177,6 +177,10 @@ test_group_standalone() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-standalone.xml -DintegrationTests } +test_group_upgrade() { + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-upgrade.xml -DintegrationTests +} + test_group_transaction() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 90f08a9639471..35fb453c4bb8e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -72,22 +72,28 @@ public class PulsarCluster { * @return the built pulsar cluster */ public static PulsarCluster forSpec(PulsarClusterSpec spec) { + return forSpec(spec, Network.newNetwork()); + } + + public static PulsarCluster forSpec(PulsarClusterSpec spec, Network network) { + checkArgument(network != null, "Network should not be null"); CSContainer csContainer = null; if (!spec.enableOxia) { csContainer = new CSContainer(spec.clusterName) - .withNetwork(Network.newNetwork()) + .withNetwork(network) .withNetworkAliases(CSContainer.NAME); } - return new PulsarCluster(spec, csContainer, false); + return new PulsarCluster(spec, network, csContainer, false); } public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) { - return new PulsarCluster(spec, csContainer, true); + return new PulsarCluster(spec, csContainer.getNetwork(), csContainer, true); } @Getter private final PulsarClusterSpec spec; + public boolean closeNetworkOnExit = true; @Getter private final String clusterName; private final Network network; @@ -108,19 +114,18 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai private final String metadataStoreUrl; private final String configurationMetadataStoreUrl; - private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) { - + private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer csContainer, boolean sharedCsContainer) { this.spec = spec; this.sharedCsContainer = sharedCsContainer; this.clusterName = spec.clusterName(); - if (csContainer != null ) { + if (network != null) { + this.network = network; + } else if (csContainer != null) { this.network = csContainer.getNetwork(); } else { this.network = Network.newNetwork(); } - - if (spec.enableOxia) { this.zkContainer = null; this.oxiaContainer = new OxiaContainer(clusterName); @@ -203,7 +208,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95") .withEnv("diskUsageThreshold", "0.99") .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97") - .withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize)); + .withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize)) + .withEnv("ledgerDirectories", "data/bookkeeper/" + name + "/ledgers") + .withEnv("journalDirectory", "data/bookkeeper/" + name + "/journal"); if (spec.bookkeeperEnvs != null) { bookieContainer.withEnv(spec.bookkeeperEnvs); } @@ -262,10 +269,27 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s } )); + if (spec.dataContainer != null) { + if (!sharedCsContainer && csContainer != null) { + csContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE); + } + if (zkContainer != null) { + zkContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE); + } + proxyContainer.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE); + + bookieContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE)); + brokerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE)); + workerContainers.values().forEach(c -> c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE)); + } + spec.classPathVolumeMounts.forEach((key, value) -> { if (zkContainer != null) { zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); } + if (!sharedCsContainer && csContainer != null) { + csContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); + } proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE)); @@ -323,6 +347,10 @@ public Map> getExternalServices() { } public void start() throws Exception { + start(true); + } + + public void start(boolean doInit) throws Exception { if (!spec.enableOxia) { // start the local zookeeper @@ -338,7 +366,7 @@ public void start() throws Exception { oxiaContainer.start(); } - { + if (doInit) { // Run cluster metadata initialization @Cleanup PulsarInitMetadataContainer init = new PulsarInitMetadataContainer( @@ -453,10 +481,12 @@ public synchronized void stop() { oxiaContainer.stop(); } - try { - network.close(); - } catch (Exception e) { - log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e); + if (closeNetworkOnExit) { + try { + network.close(); + } catch (Exception e) { + log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e); + } } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 8a991be49fad0..ca45c9b7c9b82 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -124,6 +124,12 @@ public class PulsarClusterSpec { @Builder.Default Map classPathVolumeMounts = new TreeMap<>(); + /** + * Data container + */ + @Builder.Default + GenericContainer dataContainer = null; + /** * Pulsar Test Image Name * diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java index 93e2221ab2493..8b99f21373560 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java @@ -142,6 +142,10 @@ protected void beforeStartCluster() throws Exception { } protected void setupCluster(PulsarClusterSpec spec) throws Exception { + setupCluster(spec, true); + } + + protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws Exception { incrementSetupNumber(); log.info("Setting up cluster {} with {} bookies, {} brokers", spec.clusterName(), spec.numBookies(), spec.numBrokers()); @@ -150,7 +154,7 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception { beforeStartCluster(); - pulsarCluster.start(); + pulsarCluster.start(doInit); pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java new file mode 100644 index 0000000000000..8aa99ece51ba3 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.upgrade; + +import com.github.dockerjava.api.model.Bind; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testng.annotations.Test; +import java.util.stream.Stream; +import static java.util.stream.Collectors.joining; +import static org.testng.Assert.assertEquals; + +/** + * Test upgrading/downgrading Pulsar cluster from major releases. + */ +@Slf4j +public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase { + + @Test(timeOut=600_000) + public void upgradeFrom_2_10_6() throws Exception { + testUpgradeDowngrade("apachepulsar/pulsar:2.10.6", PulsarContainer.DEFAULT_IMAGE_NAME); + } + + @Test(timeOut=600_000) + public void upgradeFrom_3_0_5() throws Exception { + testUpgradeDowngrade("apachepulsar/pulsar:3.0.5", PulsarContainer.DEFAULT_IMAGE_NAME); + } + + private void testUpgradeDowngrade(String imageOld, String imageNew) throws Exception { + final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) + .filter(s -> !s.isEmpty()) + .collect(joining("-")); + String topicName = generateTopicName("testupdown", true); + + @Cleanup + Network network = Network.newNetwork(); + @Cleanup + GenericContainer alpine = new GenericContainer<>("alpine:3.20.1") + .withExposedPorts(80) + .withNetwork(network) + .withNetworkAliases("shared-storage") + .withEnv("MAGIC_NUMBER", "42") + .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd + .getHostConfig() + .withBinds(Bind.parse("/pulsar/data:/pulsar/data"))) + .withCommand("/bin/sh", "-c", + "mkdir -p /pulsar/data && " + + "chmod -R ug+rwx /pulsar/data && " + + "chown -R 10000:0 /pulsar/data && " + + "rm -rf /pulsar/data/* && " + + "while true; do echo \"$MAGIC_NUMBER\" | nc -l -p 80; done"); + alpine.start(); + + PulsarClusterSpec specOld = PulsarClusterSpec.builder() + .numBookies(2) + .numBrokers(1) + .clusterName(clusterName) + .dataContainer(alpine) + .pulsarTestImage(imageOld) + .build(); + + PulsarClusterSpec specNew = PulsarClusterSpec.builder() + .numBookies(2) + .numBrokers(1) + .clusterName(clusterName) + .dataContainer(alpine) + .pulsarTestImage(imageNew) + .build(); + + log.info("Setting up OLD cluster {} with {} bookies, {} brokers using {}", + specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld); + + pulsarCluster = PulsarCluster.forSpec(specNew, network); + pulsarCluster.closeNetworkOnExit = false; + pulsarCluster.start(true); + + try { + log.info("setting retention"); + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", "public/default"); + + publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 10); + } finally { + pulsarCluster.stop(); + } + + log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers using {}", + specNew.clusterName(), specNew.numBookies(), specNew.numBrokers(), imageNew); + + pulsarCluster = PulsarCluster.forSpec(specNew, network); + pulsarCluster.closeNetworkOnExit = false; + pulsarCluster.start(false); + + try { + publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 20); + } finally { + pulsarCluster.stop(); + } + + log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers using {}", + specOld.clusterName(), specOld.numBookies(), specOld.numBrokers(), imageOld); + + pulsarCluster = PulsarCluster.forSpec(specOld, network); + pulsarCluster.closeNetworkOnExit = false; + pulsarCluster.start(false); + + try { + publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 30); + } finally { + pulsarCluster.stop(); + alpine.stop(); + network.close(); + } + } + + private void publishAndConsume(String topicName, String serviceUrl, int numProduce, int numConsume) throws Exception { + log.info("publishAndConsume: topic name: {}", topicName); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl) + .build(); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topicName) + .create(); + + log.info("Publishing {} messages", numProduce); + for (int i = numConsume - numProduce; i < numConsume; i++) { + log.info("Publishing message: {}", "smoke-message-" + i); + producer.send("smoke-message-" + i); + } + + @Cleanup + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("my-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + consumer.seek(MessageId.earliest); + + log.info("Consuming {} messages", numConsume); + for (int i = 0; i < numConsume; i++) { + log.info("Waiting for message: {}", i); + Message m = consumer.receive(); + log.info("Received message: {}", m.getValue()); + assertEquals("smoke-message-" + i, m.getValue()); + } + } +} diff --git a/tests/integration/src/test/resources/pulsar-upgrade.xml b/tests/integration/src/test/resources/pulsar-upgrade.xml index a52db54753372..dc966b160ba17 100644 --- a/tests/integration/src/test/resources/pulsar-upgrade.xml +++ b/tests/integration/src/test/resources/pulsar-upgrade.xml @@ -22,7 +22,7 @@ - +