Skip to content

Commit

Permalink
Upgrade/Downgrade test
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Jun 25, 2024
1 parent 2da4ee8 commit 58dcddc
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize))
.withEnv("ledgerDirectories", "data/bookkeeper/" + name + "/ledgers")
.withEnv("journalDirectory", "data/bookkeeper/" + name + "/journal");
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
}
Expand Down Expand Up @@ -266,6 +268,7 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
if (zkContainer != null) {
zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
}
csContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);

bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
Expand Down Expand Up @@ -323,6 +326,10 @@ public Map<String, GenericContainer<?>> getExternalServices() {
}

public void start() throws Exception {
start(true);
}

public void start(boolean doInit) throws Exception {

if (!spec.enableOxia) {
// start the local zookeeper
Expand All @@ -338,7 +345,7 @@ public void start() throws Exception {
oxiaContainer.start();
}

{
if (doInit) {
// Run cluster metadata initialization
@Cleanup
PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ protected void beforeStartCluster() throws Exception {
}

protected void setupCluster(PulsarClusterSpec spec) throws Exception {
setupCluster(spec, true);
}

protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws Exception {
incrementSetupNumber();
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
Expand All @@ -150,7 +154,7 @@ protected void setupCluster(PulsarClusterSpec spec) throws Exception {

beforeStartCluster();

pulsarCluster.start();
pulsarCluster.start(doInit);

pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.upgrade;

import com.google.common.collect.ImmutableMap;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;
import java.util.stream.Stream;
import static java.util.stream.Collectors.joining;
import static org.testng.Assert.assertEquals;

/**
* Test upgrading/downgrading Pulsar cluster.
*/
@Slf4j
public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase {

@Test(timeOut=600_000)
public void test_210() throws Exception {
testUpgradeDowngrade("pulsar2.10-upgrade", "apachepulsar/pulsar:2.10.0", PulsarContainer.DEFAULT_IMAGE_NAME);
}

@Test(timeOut=600_000)
public void test_300() throws Exception {
testUpgradeDowngrade("pulsar3.0-upgrade", "apachepulsar/pulsar:3.0.5", PulsarContainer.DEFAULT_IMAGE_NAME);
}

private void testUpgradeDowngrade(String path, String imageOld, String imageNew) throws Exception {
final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
.filter(s -> !s.isEmpty())
.collect(joining("-"));
String topicName = generateTopicName("testupdown", true);

PulsarClusterSpec specOld = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(1)
.clusterName(clusterName)
.classPathVolumeMounts(
ImmutableMap.<String, String> builder()
.put(path, "/pulsar/data")
.build())
.pulsarTestImage(imageOld)
.build();

PulsarClusterSpec specNew = PulsarClusterSpec.builder()
.numBookies(2)
.numBrokers(1)
.clusterName(clusterName)
.classPathVolumeMounts(
ImmutableMap.<String, String> builder()
.put(path, "/pulsar/data")
.build())
.pulsarTestImage(imageNew)
.build();

log.info("Setting up OLD cluster {} with {} bookies, {} brokers",
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers());

setupCluster(specOld, true);

try {
log.info("setting retention");
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-retention", "--size", "100M", "--time", "100m", "public/default");

publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 10);
} finally {
pulsarCluster.stop();
}

log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers",
specNew.clusterName(), specNew.numBookies(), specNew.numBrokers());

pulsarCluster = PulsarCluster.forSpec(specNew);
pulsarCluster.start(false);

try {
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 20);
} finally {
pulsarCluster.stop();
}

log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers",
specOld.clusterName(), specOld.numBookies(), specOld.numBrokers());

pulsarCluster = PulsarCluster.forSpec(specOld);
pulsarCluster.start(false);

try {
publishAndConsume(topicName, pulsarCluster.getPlainTextServiceUrl(), 10, 30);
} finally {
//pulsarCluster.stop();
tearDownCluster();
}
}

private void publishAndConsume(String topicName, String serviceUrl, int numProduce, int numConsume) throws Exception {
log.info("publishAndConsume: topic name: {}", topicName);

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName)
.create();

log.info("Publishing {} messages", numProduce);
for (int i = numConsume - numProduce; i < numConsume; i++) {
log.info("Publishing message: {}", "smoke-message-" + i);
producer.send("smoke-message-" + i);
}

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
consumer.seek(MessageId.earliest);

log.info("Consuming {} messages", numConsume);
for (int i = 0; i < numConsume; i++) {
log.info("Waiting for message: {}", i);
Message<String> m = consumer.receive();
log.info("Received message: {}", m.getValue());
assertEquals("smoke-message-" + i, m.getValue());
}
}
}
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-upgrade.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<test name="pulsar-upgrade-test-suite" preserve-order="true" >
<classes>
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarZKDowngradeTest" />
<class name="org.apache.pulsar.tests.integration.upgrade.PulsarUpgradeDowngradeTest" />
</classes>
</test>
</suite>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
to use for the upgrade test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
to use for the upgrade test

0 comments on commit 58dcddc

Please sign in to comment.