forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-378 Add ServiceUnitStateTableView abstraction (Ext…
…ensibleLoadMangerImpl only) (apache#23300)
- Loading branch information
1 parent
2e98736
commit 590e133
Showing
1 changed file
with
321 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,321 @@ | ||
# PIP-378: Add ServiceUnitStateTableView abstraction (ExtensibleLoadMangerImpl only) | ||
|
||
## Background | ||
|
||
### ExtensibleLoadMangerImpl uses system topics to event-source bundle ownerships | ||
|
||
PIP-192 introduces a new broker load balancer using a persistent system topic to event-source bundle ownerships among brokers. | ||
|
||
PIP-307 introduces graceful ownership change protocol over the system topic (from PIP-192). | ||
|
||
However, using system topics to manage bundle ownerships may not always be the best choice. Users might need an alternative approach to event-source bundle ownerships. | ||
|
||
|
||
## Motivation | ||
|
||
Add `ServiceUnitStateTableView` abstraction and make it pluggable, so users can customize `ServiceUnitStateTableView` implementations and event-source bundles ownerships using other stores. | ||
|
||
## Goals | ||
|
||
### In Scope | ||
|
||
- Add `ServiceUnitStateTableView` interface | ||
- Add `ServiceUnitStateTableViewImpl` implementation that uses Pulsar System topic (compatible with existing behavior) | ||
- Add `ServiceUnitStateMetadataStoreTableViewImpl` implementation that uses Pulsar Metadata Store (new behavior) | ||
- Refactor related code and test code | ||
|
||
## High-Level Design | ||
|
||
- Refactor `ServiceUnitStateChannelImpl` to accept `ServiceUnitStateTableView` interface and `ServiceUnitStateTableViewImpl` system topic implementation. | ||
- Introduce `MetadataStoreTableView` interface to support `ServiceUnitStateMetadataStoreTableViewImpl` implementation. | ||
- `MetadataStoreTableViewImpl` will use shadow hashmap to maintain the metadata tableview. It will initially fill the local tableview by scanning all existing items in the metadata store path. Also, new items will be updated to the tableview via metadata watch notifications. | ||
- Add `BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer` in MetadataCacheConfig to listen the automatic cache async reload. This can be useful to re-sync the the shadow hashmap in MetadataStoreTableViewImpl in case it is out-dated in the worst case(e.g. network or metadata issues). | ||
- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`. | ||
|
||
## Detailed Design | ||
|
||
### Design & Implementation Details | ||
```java | ||
/** | ||
* Given that the ServiceUnitStateChannel event-sources service unit (bundle) ownership states via a persistent store | ||
* and reacts to ownership changes, the ServiceUnitStateTableView provides an interface to the | ||
* ServiceUnitStateChannel's persistent store and its locally replicated ownership view (tableview) with listener | ||
* registration. It initially populates its local table view by scanning existing items in the remote store. The | ||
* ServiceUnitStateTableView receives notifications whenever ownership states are updated in the remote store, and | ||
* upon notification, it applies the updates to its local tableview with the listener logic. | ||
*/ | ||
public interface ServiceUnitStateTableView extends Closeable { | ||
|
||
/** | ||
* Starts the tableview. | ||
* It initially populates its local table view by scanning existing items in the remote store, and it starts | ||
* listening to service unit ownership changes from the remote store. | ||
* @param pulsar pulsar service reference | ||
* @param tailItemListener listener to listen tail(newly updated) items | ||
* @param existingItemListener listener to listen existing items | ||
* @throws IOException if it fails to init the tableview. | ||
*/ | ||
void start(PulsarService pulsar, | ||
BiConsumer<String, ServiceUnitStateData> tailItemListener, | ||
BiConsumer<String, ServiceUnitStateData> existingItemListener) throws IOException; | ||
|
||
|
||
/** | ||
* Closes the tableview. | ||
* @throws IOException if it fails to close the tableview. | ||
*/ | ||
void close() throws IOException; | ||
|
||
/** | ||
* Gets one item from the local tableview. | ||
* @param key the key to get | ||
* @return value if exists. Otherwise, null. | ||
*/ | ||
ServiceUnitStateData get(String key); | ||
|
||
/** | ||
* Tries to put the item in the persistent store. | ||
* If it completes, all peer tableviews (including the local one) will be notified and be eventually consistent | ||
* with this put value. | ||
* | ||
* It ignores put operation if the input value conflicts with the existing one in the persistent store. | ||
* | ||
* @param key the key to put | ||
* @param value the value to put | ||
* @return a future to track the completion of the operation | ||
*/ | ||
CompletableFuture<Void> put(String key, ServiceUnitStateData value); | ||
|
||
/** | ||
* Tries to delete the item from the persistent store. | ||
* All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion. | ||
* | ||
* It ignores delete operation if the key is not present in the persistent store. | ||
* | ||
* @param key the key to delete | ||
* @return a future to track the completion of the operation | ||
*/ | ||
CompletableFuture<Void> delete(String key); | ||
|
||
/** | ||
* Returns the entry set of the items in the local tableview. | ||
* @return entry set | ||
*/ | ||
Set<Map.Entry<String, ServiceUnitStateData>> entrySet(); | ||
|
||
/** | ||
* Returns service units (namespace bundles) owned by this broker. | ||
* @return a set of owned service units (namespace bundles) | ||
*/ | ||
Set<NamespaceBundle> ownedServiceUnits(); | ||
|
||
/** | ||
* Tries to flush any batched or buffered updates. | ||
* @param waitDurationInMillis time to wait until complete. | ||
* @throws ExecutionException | ||
* @throws InterruptedException | ||
* @throws TimeoutException | ||
*/ | ||
void flush(long waitDurationInMillis) throws ExecutionException, InterruptedException, TimeoutException; | ||
} | ||
``` | ||
|
||
```java | ||
/** | ||
* Defines metadata store tableview. | ||
* MetadataStoreTableView initially fills existing items to its local tableview and eventually | ||
* synchronize remote updates to its local tableview from the remote metadata store. | ||
* This abstraction can help replicate metadata in memory from metadata store. | ||
*/ | ||
public interface MetadataStoreTableView<T> { | ||
|
||
class ConflictException extends RuntimeException { | ||
public ConflictException(String msg) { | ||
super(msg); | ||
} | ||
} | ||
|
||
/** | ||
* Starts the tableview by filling existing items to its local tableview from the remote metadata store. | ||
*/ | ||
void start() throws MetadataStoreException; | ||
|
||
/** | ||
* Reads whether a specific key exists in the local tableview. | ||
* | ||
* @param key the key to check | ||
* @return true if exists. Otherwise, false. | ||
*/ | ||
boolean exists(String key); | ||
|
||
/** | ||
* Gets one item from the local tableview. | ||
* <p> | ||
* If the key is not found, return null. | ||
* | ||
* @param key the key to check | ||
* @return value if exists. Otherwise, null. | ||
*/ | ||
T get(String key); | ||
|
||
/** | ||
* Tries to put the item in the persistent store. | ||
* All peer tableviews (including the local one) will be notified and be eventually consistent with this put value. | ||
* <p> | ||
* This operation can fail if the input value conflicts with the existing one. | ||
* | ||
* @param key the key to check on the tableview | ||
* @return a future to track the completion of the operation | ||
* @throws MetadataStoreTableView.ConflictException | ||
* if the input value conflicts with the existing one. | ||
*/ | ||
CompletableFuture<Void> put(String key, T value); | ||
|
||
/** | ||
* Tries to delete the item from the persistent store. | ||
* All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion. | ||
* <p> | ||
* This can fail if the item is not present in the metadata store. | ||
* | ||
* @param key the key to check on the tableview | ||
* @return a future to track the completion of the operation | ||
* @throws MetadataStoreException.NotFoundException | ||
* if the key is not present in the metadata store. | ||
*/ | ||
CompletableFuture<Void> delete(String key); | ||
|
||
/** | ||
* Returns the size of the items in the local tableview. | ||
* @return size | ||
*/ | ||
int size(); | ||
|
||
/** | ||
* Reads whether the local tableview is empty or not. | ||
* @return true if empty. Otherwise, false | ||
*/ | ||
boolean isEmpty(); | ||
|
||
/** | ||
* Returns the entry set of the items in the local tableview. | ||
* @return entry set | ||
*/ | ||
Set<Map.Entry<String, T>> entrySet(); | ||
|
||
/** | ||
* Returns the key set of the items in the local tableview. | ||
* @return key set | ||
*/ | ||
Set<String> keySet(); | ||
|
||
/** | ||
* Returns the values of the items in the local tableview. | ||
* @return values | ||
*/ | ||
Collection<T> values(); | ||
|
||
/** | ||
* Runs the action for each item in the local tableview. | ||
*/ | ||
void forEach(BiConsumer<String, T> action); | ||
} | ||
``` | ||
|
||
```java | ||
public class MetadataCacheConfig<T> { | ||
private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); | ||
|
||
... | ||
|
||
/** | ||
* Specifies cache reload consumer behavior when the cache is refreshed automatically at refreshAfterWriteMillis | ||
* frequency. | ||
*/ | ||
@Builder.Default | ||
private final BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer = null; | ||
``` | ||
|
||
```java | ||
|
||
/** | ||
* ServiceUnitStateTableViewSyncer can be used to sync system topic and metadata store table views to migrate to one | ||
* from the other. | ||
*/ | ||
@Slf4j | ||
public class ServiceUnitStateTableViewSyncer implements Cloneable { | ||
private static final int SYNC_TIMEOUT_IN_SECS = 30; | ||
private volatile ServiceUnitStateTableView systemTopicTableView; | ||
private volatile ServiceUnitStateTableView metadataStoreTableView; | ||
|
||
|
||
public void start(PulsarService pulsar) throws IOException { | ||
if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) { | ||
return; | ||
} | ||
try { | ||
if (systemTopicTableView == null) { | ||
systemTopicTableView = new ServiceUnitStateTableViewImpl(); | ||
systemTopicTableView.start( | ||
pulsar, | ||
this::syncToMetadataStore, | ||
this::syncToMetadataStore); | ||
log.info("Successfully started ServiceUnitStateTableViewSyncer::systemTopicTableView"); | ||
} | ||
|
||
if (metadataStoreTableView == null) { | ||
metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); | ||
metadataStoreTableView.start( | ||
pulsar, | ||
this::syncToSystemTopic, | ||
this::syncToSystemTopic); | ||
log.info("Successfully started ServiceUnitStateTableViewSyncer::metadataStoreTableView"); | ||
} | ||
|
||
} catch (Throwable e) { | ||
log.error("Failed to start ServiceUnitStateTableViewSyncer", e); | ||
throw e; | ||
} | ||
} | ||
|
||
private void syncToSystemTopic(String key, ServiceUnitStateData data) { | ||
try { | ||
systemTopicTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS); | ||
} catch (Throwable e) { | ||
log.error("SystemTopicTableView failed to sync key:{}, data:{}", key, data, e); | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
|
||
private void syncToMetadataStore(String key, ServiceUnitStateData data) { | ||
try { | ||
metadataStoreTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, TimeUnit.SECONDS); | ||
} catch (Throwable e) { | ||
log.error("metadataStoreTableView failed to sync key:{}, data:{}", key, data, e); | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
... | ||
} | ||
|
||
|
||
``` | ||
|
||
### Configuration | ||
|
||
- Add a `loadManagerServiceUnitStateTableViewClassName` configuration to specify `ServiceUnitStateTableView` implementation class name. | ||
- Add a `loadBalancerServiceUnitTableViewSyncerEnabled` configuration to to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration. | ||
|
||
## Backward & Forward Compatibility | ||
|
||
It will ba Backward & Forward compatible as `loadManagerServiceUnitStateTableViewClassName` will be `ServiceUnitStateTableViewImpl`(system topic implementation) by default. | ||
|
||
We will introduce `ServiceUnitStateTableViewSyncer` to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. This syncer can be enabled/disabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`. The admin could enable this syncer before migration and disable it after it is finished. | ||
|
||
## Alternatives | ||
|
||
## General Notes | ||
|
||
## Links | ||
|
||
* Mailing List discussion thread: https://lists.apache.org/thread/v7sod21r56hkt2cjxl9pp348r4jxo6o8 | ||
* Mailing List voting thread: https://lists.apache.org/thread/j453xp0vty8zy2y0ljssjgyvwb47royc |