From 594ec35a6440d58a8c2595dce26da18f00187af3 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Thu, 26 Dec 2024 15:20:55 +0800 Subject: [PATCH 1/5] [improve][broker]Optimize the partition size of the system theme to reduce resource consumption --- conf/broker.conf | 5 ++++- conf/standalone.conf | 5 ++++- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index af335c141534f..3a108089d75af 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -204,6 +204,9 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Default number of partitions for the system theme +systemTopicDefaultNumPartitions=1 + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true @@ -243,7 +246,7 @@ messageExpiryCheckIntervalInMinutes=5 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 -# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. +# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. # For non-partitioned topics, consistent hashing is used by default. activeConsumerFailoverConsistentHashing=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 90cf3b57ff941..fa28a3734b878 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -152,7 +152,7 @@ maxMessageSizeCheckIntervalInSeconds=60 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 -# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. +# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. # For non-partitioned topics, consistent hashing is used by default. activeConsumerFailoverConsistentHashing=false @@ -1187,6 +1187,9 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Default number of partitions for the system theme +systemTopicDefaultNumPartitions=1 + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker transactionCoordinatorEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0f7ae00713dce..e397a59cf62d7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2167,6 +2167,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " if allowAutoTopicCreationType is partitioned." ) private int defaultNumPartitions = 1; + @FieldContext( + category = CATEGORY_STORAGE_ML, + dynamic = true, + doc = "Default number of partitions for the system theme." + ) + private int systemTopicDefaultNumPartitions = 1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "The class of the managed ledger storage" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c79d839097e68..10566ddeaa6a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3291,12 +3291,14 @@ private CompletableFuture createDefaultPartitionedTopi Optional policies) { final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); + final int systemTopicDefaultNumPartitions = pulsar.getConfiguration().getSystemTopicDefaultNumPartitions(); checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, "Number of partitions should be less than or equal to " + maxPartitions); - PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); + PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata( + isSystemTopic(topicName) ? systemTopicDefaultNumPartitions : defaultNumPartitions); return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions) .thenCompose(__ -> { From d04fe96f064ed9b56e9d289bdea3c5548b77be20 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Thu, 26 Dec 2024 15:58:36 +0800 Subject: [PATCH 2/5] [improve][broker]Optimize the partition size of the system theme to reduce resource consumption --- conf/broker.conf | 2 +- conf/standalone.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 3a108089d75af..ddc8eb7d5cce4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -246,7 +246,7 @@ messageExpiryCheckIntervalInMinutes=5 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 -# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. +# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. # For non-partitioned topics, consistent hashing is used by default. activeConsumerFailoverConsistentHashing=false diff --git a/conf/standalone.conf b/conf/standalone.conf index fa28a3734b878..f12fbce7e42f6 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -152,7 +152,7 @@ maxMessageSizeCheckIntervalInSeconds=60 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 -# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. +# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type. # For non-partitioned topics, consistent hashing is used by default. activeConsumerFailoverConsistentHashing=false From 586d1fc43475b525d19e67ab35088f1c5970d44c Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 31 Dec 2024 21:11:11 +0800 Subject: [PATCH 3/5] [improve][broker]Optimize the partition size of the system theme to reduce resource consumption --- .../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index e7bfa3278e36d..265d10300c48a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -80,6 +80,7 @@ protected void setup() throws Exception { conf.setAllowAutoTopicCreation(false); conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); conf.setDefaultNumPartitions(PARTITIONS); + conf.setSystemTopicDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); conf.setTransactionCoordinatorEnabled(true); From 5b5243898ed4598faaacd5b936edce9a74f9c5bd Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 6 Jan 2025 10:24:41 +0800 Subject: [PATCH 4/5] [improve][broker]Optimize the partition size of the system theme to reduce resource consumption --- conf/broker.conf | 2 +- conf/standalone.conf | 2 +- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index ddc8eb7d5cce4..6d72fb969784f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -204,7 +204,7 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 -# Default number of partitions for the system theme +# Default number of partitions for the system topics systemTopicDefaultNumPartitions=1 # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. diff --git a/conf/standalone.conf b/conf/standalone.conf index f12fbce7e42f6..c27bf12637ec0 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1187,7 +1187,7 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 -# Default number of partitions for the system theme +# Default number of partitions for the system topics systemTopicDefaultNumPartitions=1 ### --- Transaction config variables --- ### diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e397a59cf62d7..394e02196a5c1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2170,7 +2170,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_STORAGE_ML, dynamic = true, - doc = "Default number of partitions for the system theme." + doc = "Default number of partitions for the system topics." ) private int systemTopicDefaultNumPartitions = 1; @FieldContext( From a1d41faf4bc5cc2da6d95067f027b8c892d6dba6 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 7 Jan 2025 21:38:04 +0800 Subject: [PATCH 5/5] [improve][broker]Optimize the partition size of the system theme to reduce resource consumption --- .../pulsar/broker/service/BrokerService.java | 10 +++++----- .../data/AutoTopicCreationOverride.java | 4 ++++ .../impl/AutoTopicCreationOverrideImpl.java | 19 ++++++++++++++++++- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 10566ddeaa6a1..3f54b8add723f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3291,14 +3291,12 @@ private CompletableFuture createDefaultPartitionedTopi Optional policies) { final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - final int systemTopicDefaultNumPartitions = pulsar.getConfiguration().getSystemTopicDefaultNumPartitions(); checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, "Number of partitions should be less than or equal to " + maxPartitions); - PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata( - isSystemTopic(topicName) ? systemTopicDefaultNumPartitions : defaultNumPartitions); + PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions) .thenCompose(__ -> { @@ -3573,9 +3571,11 @@ public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Op public int getDefaultNumPartitions(final TopicName topicName, final Optional policies) { AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { - return autoTopicCreationOverride.getDefaultNumPartitions(); + return isSystemTopic(topicName) ? autoTopicCreationOverride.getSystemTopicDefaultNumPartitions() + : autoTopicCreationOverride.getDefaultNumPartitions(); } else { - return pulsar.getConfiguration().getDefaultNumPartitions(); + return isSystemTopic(topicName) ? pulsar.getConfiguration().getSystemTopicDefaultNumPartitions() + : pulsar.getConfiguration().getDefaultNumPartitions(); } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java index 24e3a57d2a33c..ea3a96408871d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java @@ -29,6 +29,8 @@ public interface AutoTopicCreationOverride { String getTopicType(); Integer getDefaultNumPartitions(); + + Integer getSystemTopicDefaultNumPartitions(); interface Builder { Builder allowAutoTopicCreation(boolean allowTopicCreation); @@ -37,6 +39,8 @@ interface Builder { Builder defaultNumPartitions(Integer defaultNumPartition); + Builder systemTopicDefaultNumPartitions(Integer systemTopicDefaultNumPartitions); + AutoTopicCreationOverride build(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java index 52cf1f1829b8d..ce6133dd05f25 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java @@ -35,6 +35,7 @@ public final class AutoTopicCreationOverrideImpl implements AutoTopicCreationOve private boolean allowAutoTopicCreation; private String topicType; private Integer defaultNumPartitions; + private Integer systemTopicDefaultNumPartitions; public static ValidateResult validateOverride(AutoTopicCreationOverride override) { if (override == null) { @@ -51,6 +52,14 @@ public static ValidateResult validateOverride(AutoTopicCreationOverride override if (override.getDefaultNumPartitions() <= 0) { return ValidateResult.fail("[defaultNumPartitions] cannot be less than 1 for partition type."); } + if (override.getSystemTopicDefaultNumPartitions() == null) { + return ValidateResult.fail( + "[systemTopicDefaultNumPartitions] cannot be null when the type is partitioned."); + } + if (override.getSystemTopicDefaultNumPartitions() <= 0) { + return ValidateResult.fail( + "[systemTopicDefaultNumPartitions] cannot be less than 1 for partition type."); + } } else if (TopicType.NON_PARTITIONED.toString().equals(override.getTopicType())) { if (override.getDefaultNumPartitions() != null) { return ValidateResult.fail("[defaultNumPartitions] is not allowed to be" @@ -69,6 +78,7 @@ public static class AutoTopicCreationOverrideImplBuilder implements AutoTopicCre private boolean allowAutoTopicCreation; private String topicType; private Integer defaultNumPartitions; + private Integer systemTopicDefaultNumPartitions; public AutoTopicCreationOverrideImplBuilder allowAutoTopicCreation(boolean allowAutoTopicCreation) { this.allowAutoTopicCreation = allowAutoTopicCreation; @@ -85,8 +95,15 @@ public AutoTopicCreationOverrideImplBuilder defaultNumPartitions(Integer default return this; } + public AutoTopicCreationOverrideImplBuilder systemTopicDefaultNumPartitions( + Integer systemTopicDefaultNumPartitions) { + this.systemTopicDefaultNumPartitions = systemTopicDefaultNumPartitions; + return this; + } + public AutoTopicCreationOverrideImpl build() { - return new AutoTopicCreationOverrideImpl(allowAutoTopicCreation, topicType, defaultNumPartitions); + return new AutoTopicCreationOverrideImpl(allowAutoTopicCreation, topicType, defaultNumPartitions, + systemTopicDefaultNumPartitions); } } }