Skip to content

Commit

Permalink
[improve][broker] Add ServiceUnitStateTableView (ExtensibleLoadManage…
Browse files Browse the repository at this point in the history
…rImpl only) (#23301)
  • Loading branch information
heesung-sn authored Sep 20, 2024
1 parent 105192d commit c2a0090
Show file tree
Hide file tree
Showing 37 changed files with 3,721 additions and 1,264 deletions.
9 changes: 9 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,15 @@ loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
# (only used in load balancer extension logics)
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600

# Name of ServiceUnitStateTableView implementation class to use
loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl

# Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and
# system topic table views during migration from one to the other. One could enable this
# syncer before migration and disable it after the migration finishes.
# It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to
# enable it. It accepts `None` to disable it."
loadBalancerServiceUnitTableViewSyncer=None

### --- Replication --- ###

Expand Down
57 changes: 8 additions & 49 deletions pip/pip-378.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Add `ServiceUnitStateTableView` abstraction and make it pluggable, so users can
- Introduce `MetadataStoreTableView` interface to support `ServiceUnitStateMetadataStoreTableViewImpl` implementation.
- `MetadataStoreTableViewImpl` will use shadow hashmap to maintain the metadata tableview. It will initially fill the local tableview by scanning all existing items in the metadata store path. Also, new items will be updated to the tableview via metadata watch notifications.
- Add `BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer` in MetadataCacheConfig to listen the automatic cache async reload. This can be useful to re-sync the the shadow hashmap in MetadataStoreTableViewImpl in case it is out-dated in the worst case(e.g. network or metadata issues).
- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`.
- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncer`.

## Detailed Design

Expand Down Expand Up @@ -243,56 +243,15 @@ public class MetadataCacheConfig<T> {
*/
@Slf4j
public class ServiceUnitStateTableViewSyncer implements Cloneable {
private static final int SYNC_TIMEOUT_IN_SECS = 30;
private volatile ServiceUnitStateTableView systemTopicTableView;
private volatile ServiceUnitStateTableView metadataStoreTableView;

...

public void start(PulsarService pulsar) throws IOException {
if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
return;
}
try {
if (systemTopicTableView == null) {
systemTopicTableView = new ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
this::syncToMetadataStore);
log.info("Successfully started ServiceUnitStateTableViewSyncer::systemTopicTableView");
}

if (metadataStoreTableView == null) {
metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
this::syncToSystemTopic);
log.info("Successfully started ServiceUnitStateTableViewSyncer::metadataStoreTableView");
}

} catch (Throwable e) {
log.error("Failed to start ServiceUnitStateTableViewSyncer", e);
throw e;
}
... // sync SystemTopicTableView and MetadataStoreTableView
}

private void syncToSystemTopic(String key, ServiceUnitStateData data) {
try {
systemTopicTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("SystemTopicTableView failed to sync key:{}, data:{}", key, data, e);
throw new IllegalStateException(e);
}
}

private void syncToMetadataStore(String key, ServiceUnitStateData data) {
try {
metadataStoreTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("metadataStoreTableView failed to sync key:{}, data:{}", key, data, e);
throw new IllegalStateException(e);
}
public void close() throws IOException {
... // stop syncer
}
...
}
Expand All @@ -302,14 +261,14 @@ public class ServiceUnitStateTableViewSyncer implements Cloneable {

### Configuration

- Add a `loadManagerServiceUnitStateTableViewClassName` configuration to specify `ServiceUnitStateTableView` implementation class name.
- Add a `loadBalancerServiceUnitTableViewSyncerEnabled` configuration to to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration.
- Add a `loadManagerServiceUnitStateTableViewClassName` static configuration to specify `ServiceUnitStateTableView` implementation class name.
- Add a `loadBalancerServiceUnitTableViewSyncer` dynamic configuration to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration.

## Backward & Forward Compatibility

It will ba Backward & Forward compatible as `loadManagerServiceUnitStateTableViewClassName` will be `ServiceUnitStateTableViewImpl`(system topic implementation) by default.

We will introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. This syncer can be enabled/disabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`. The admin could enable this syncer before migration and disable it after it is finished.
We will introduce `ServiceUnitStateTableViewSyncer` dynamic config to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. The admin could enable this syncer before migration and disable it after it is finished.

## Alternatives

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,25 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean loadBalancerMultiPhaseBundleUnload = true;

@FieldContext(
dynamic = false,
category = CATEGORY_LOAD_BALANCER,
doc = "Name of ServiceUnitStateTableView implementation class to use"
)
private String loadManagerServiceUnitStateTableViewClassName =
"org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and "
+ "system topic table views during migration from one to the other. One could enable this"
+ " syncer before migration and disable it after the migration finishes. "
+ "It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to "
+ "enable it. It accepts `None` to disable it."
)
private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = ServiceUnitTableViewSyncerType.None;

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down Expand Up @@ -3810,4 +3829,14 @@ public Map<String, String> lookupProperties() {
});
return map;
}

public boolean isLoadBalancerServiceUnitTableViewSyncerEnabled() {
return loadBalancerServiceUnitTableViewSyncer != ServiceUnitTableViewSyncerType.None;
}

public enum ServiceUnitTableViewSyncerType {
None,
MetadataStoreToSystemTopicSyncer,
SystemTopicToMetadataStoreSyncer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -41,20 +40,17 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewSyncer;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand Down Expand Up @@ -172,6 +168,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private TopBundleLoadDataReporter topBundleLoadDataReporter;

@Getter
protected ServiceUnitStateTableViewSyncer serviceUnitStateTableViewSyncer;

private volatile ScheduledFuture brokerLoadDataReportTask;
private volatile ScheduledFuture topBundlesLoadDataReportTask;

Expand Down Expand Up @@ -209,46 +208,18 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
/**
* Get all the bundles that are owned by this broker.
*/
@Deprecated
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
return CompletableFuture.completedFuture(getOwnedServiceUnits());
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (!started) {
log.warn("Failed to get owned service units, load manager is not started.");
return CompletableFuture.completedFuture(Collections.emptySet());
return Collections.emptySet();
}

String brokerId = brokerRegistry.getBrokerId();
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())
&& stateData.dstBroker().equals(brokerId);
}).map(entry -> {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
// Add heartbeat and SLA monitor namespace bundle.
NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
NamespaceName heartbeatNamespaceV2 = NamespaceService
.getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
NamespaceName slaMonitorNamespace = NamespaceService
.getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespace)
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespaceV2))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace V2 bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(slaMonitorNamespace))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get SLA Monitor namespace bundle.", e);
return null;
}).thenApply(__ -> ownedServiceUnits);
return serviceUnitStateChannel.getOwnedServiceUnits();
}

@Override
Expand Down Expand Up @@ -317,14 +288,14 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
public static boolean configureSystemTopics(PulsarService pulsar, long target) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
if (threshold == null || target != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, target);
log.info("Set compaction threshold: {} bytes for system topic {}.", target, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
Expand Down Expand Up @@ -432,6 +403,7 @@ public void start() throws PulsarServerException {
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
this.serviceUnitStateTableViewSyncer = new ServiceUnitStateTableViewSyncer();

pulsar.runWhenReadyForIncomingRequests(() -> {
try {
Expand Down Expand Up @@ -799,10 +771,11 @@ public void close() throws PulsarServerException {
monitorTask.cancel(true);
}

this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
this.brokerLoadDataStore.shutdown();
this.topBundlesLoadDataStore.shutdown();
this.unloadScheduler.close();
this.splitScheduler.close();
this.serviceUnitStateTableViewSyncer.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
Expand Down Expand Up @@ -857,6 +830,9 @@ synchronized void playLeader() {
topBundlesLoadDataStore.init();
unloadScheduler.start();
serviceUnitStateChannel.scheduleOwnershipMonitor();
if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
serviceUnitStateTableViewSyncer.start(pulsar);
}
break;
} catch (Throwable e) {
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
Expand Down Expand Up @@ -906,6 +882,7 @@ synchronized void playFollower() {
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
Expand Down Expand Up @@ -977,19 +954,27 @@ protected void monitor() {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
configuredSystemTopics = configureSystemTopics(pulsar);
configuredSystemTopics = configureSystemTopics(pulsar, COMPACTION_THRESHOLD);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
playLeader();
}

if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
serviceUnitStateTableViewSyncer.start(pulsar);
} else {
serviceUnitStateTableViewSyncer.close();
}

} else {
if (role != Follower) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the follower role.", role, isChannelOwner);
playFollower();
}
serviceUnitStateTableViewSyncer.close();
}
} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
Expand Down
Loading

0 comments on commit c2a0090

Please sign in to comment.