Skip to content

Commit

Permalink
[branch-2.10][improve][broker] Do not retain the data in the system t…
Browse files Browse the repository at this point in the history
…opic (#22031)

### Motivation

For some use case, the users need to store all the messages even though these message are acked by all subscription. So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`).  But the data in the system topic does not need for infinite retention.

### Modifications

For system topics, do not retain messages that have already been acknowledged.
  • Loading branch information
liangyepianzhou authored Feb 19, 2024
1 parent a693a26 commit c1d8630
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 15 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/ci-integration-backwards-compatibility.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,19 @@ jobs:
- name: clean disk
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL
echo "::group::Cleaning apt state"
time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove"
time df -BM / /mnt
echo "::endgroup::"
docker rmi $(docker images -q) -f
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
df -h
- name: run install by skip tests
Expand Down
13 changes: 11 additions & 2 deletions .github/workflows/ci-integration-pulsar-io-ora.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,19 @@ jobs:
- name: clean disk
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL
echo "::group::Cleaning apt state"
time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove"
time df -BM / /mnt
echo "::endgroup::"
docker rmi $(docker images -q) -f
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
df -h
- name: run install by skip tests
Expand Down
13 changes: 11 additions & 2 deletions .github/workflows/ci-integration-pulsar-io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,19 @@ jobs:
- name: clean disk
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL
echo "::group::Cleaning apt state"
time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove"
time df -BM / /mnt
echo "::endgroup::"
docker rmi $(docker images -q) -f
echo "::group::Available diskspace"
time df -BM / /mnt
echo "::endgroup::"
df -h
- name: run install by skip tests
Expand Down
3 changes: 1 addition & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ The Apache Software License, Version 2.0
- io.netty-netty-transport-4.1.100.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar
Expand Down Expand Up @@ -498,7 +497,7 @@ The Apache Software License, Version 2.0
- org.apache.curator-curator-framework-5.1.0.jar
- org.apache.curator-curator-recipes-5.1.0.jar
* Apache Yetus
- org.apache.yetus-audience-annotations-0.13.0.jar
- org.apache.yetus-audience-annotations-0.12.0.jar
* Kubernetes Client
- io.kubernetes-client-java-18.0.0.jar
- io.kubernetes-client-java-api-18.0.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -54,6 +55,9 @@ public class ZooKeeperUtil {
private String connectString;

public ZooKeeperUtil() {
String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress();
zkaddr = new InetSocketAddress(loopbackIPAddr, 0);
connectString = loopbackIPAddr + ":" + zooKeeperPort;
}

public ZooKeeper getZooKeeperClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1646,10 +1646,18 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
}

if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
if (EventsTopicNames.checkTopicIsEventsNames(topicName)
|| EventsTopicNames.isTransactionInternalName(topicName)) {
if (log.isDebugEnabled()) {
log.debug("{} Disable data retention policy for system topic.", topicName);
}
retentionPolicies = new RetentionPolicies(0, 0);
} else {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,7 @@ public void testReplicatorWithFailedAck() throws Exception {

MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
Awaitility.await().untilAsserted(() -> assertNotNull(topic.getReplicators().get("r2")));
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception {
Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get();

Assert.assertEquals(pendingAckLogIndex.size(), 0);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(pendingAckLogIndex.size(), 0);
});
maxIndexLag = (long) field4.get(pendingAckStore);
Assert.assertEquals(maxIndexLag, 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc
);
}

@Test
public void testRetentionPolicesForSystemTopic() throws Exception {
String namespace = "my-tenant/my-ns";
String topicPrefix = "persistent://" + namespace + "/";
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1));
// Check event topics and transaction internal topics.
for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) {
checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
}
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX);

// Check common topics.
checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime());
// Specify retention policies for system topic.
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
pulsar.getConfiguration().setSystemTopicEnabled(true);
admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
Awaitility.await().untilAsserted(() -> {
checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
});
}

private void checkSystemTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));

}

private void checkCommonTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
}

private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception {
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTopicPoliciesSystemTopic(String topic) {
if (topic == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down
2 changes: 1 addition & 1 deletion pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ The Apache Software License, Version 2.0
- zookeeper-3.9.1.jar
- zookeeper-jute-3.9.1.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.13.0.jar
- audience-annotations-0.12.0.jar
* Swagger
- swagger-annotations-1.6.10.jar
* Perfmark
Expand Down

0 comments on commit c1d8630

Please sign in to comment.