Skip to content

Commit

Permalink
Allow HelixClusterManager to adopt different implementations of Clust…
Browse files Browse the repository at this point in the history
…erChangeHandler (#1344)

1. Introduce ClusterChangeHandler interface which allows different
implementations
2. Move current ClusterChangeHandler to a separate class
(SimpleClusterChangeHandler)
3. Rewrite some APIs in HelixClusterManager to aggregate cluster-wide
infos from all ClusterChangeHandlers
  • Loading branch information
jsjtzyy authored and cgtz committed Jan 4, 2020
1 parent 817325a commit b0e11e8
Show file tree
Hide file tree
Showing 5 changed files with 734 additions and 413 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.RoutingTableChangeListener;
import org.apache.helix.spectator.RoutingTableSnapshot;


/**
* General handler that handles any resource or state changes in cluster. It exposes API(s) for cluster manager to
* access up-to-date cluster info. Each data center has its own {@link ClusterChangeHandler}.
*/
interface ClusterChangeHandler
extends InstanceConfigChangeListener, LiveInstanceChangeListener, IdealStateChangeListener,
RoutingTableChangeListener {

/**
* Set the initial snapshot in this {@link ClusterChangeHandler}.
* @param routingTableSnapshot the snapshot to set
*/
void setRoutingTableSnapshot(RoutingTableSnapshot routingTableSnapshot);

/**
* @return current snapshot held by this {@link ClusterChangeHandler}.
*/
RoutingTableSnapshot getRoutingTableSnapshot();

/**
* @return a map from ambry data node to its disks.
*/
Map<AmbryDataNode, Set<AmbryDisk>> getDataNodeToDisksMap();

/**
* Get ambry data node associated with given instance name.
* @param instanceName associated with ambry node.
* @return requested {@link AmbryDataNode}
*/
AmbryDataNode getDataNode(String instanceName);

/**
* Get {@link AmbryReplica} on given node that belongs to specified partition.
* @param ambryDataNode the node on which the replica resides.
* @param partitionName name of partition which the replica belongs to.
* @return requested {@link AmbryReplica}
*/
AmbryReplica getReplicaId(AmbryDataNode ambryDataNode, String partitionName);

/**
* Get all replicas on given node.
* @param ambryDataNode the node on which replicas reside
* @return a list of {@link AmbryReplica} on given node.
*/
List<AmbryReplica> getReplicaIds(AmbryDataNode ambryDataNode);

/**
* @return all {@link AmbryDataNode} tracked by this {@link ClusterChangeHandler}
*/
List<AmbryDataNode> getAllDataNodes();

/**
* Get all disks belong to given data node.
* @param ambryDataNode the node which the disks belong to.
* @return a set of {@link AmbryDisk} that belongs to given node.
*/
Set<AmbryDisk> getDisks(AmbryDataNode ambryDataNode);

/**
* @return a map from partition name to its corresponding resource name in this {@link ClusterChangeHandler}.
*/
Map<String, String> getPartitionToResourceMap();

/**
* @return number of errors occurred during handling cluster changes.
*/
long getErrorCount();

/**
* Wait for initial notification during startup.
* @throws InterruptedException
*/
void waitForInitNotification() throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ public MetricRegistry getMetricRegistry() {
public void onReplicaEvent(ReplicaId replicaId, ReplicaEventType event) {
staticClusterManager.onReplicaEvent(replicaId, event);
if (helixClusterManager != null) {
AmbryReplica ambryReplica =
helixClusterManager.getReplicaForPartitionOnNode(replicaId.getDataNodeId().getHostname(),
replicaId.getDataNodeId().getPort(), replicaId.getPartitionId().toString());
AmbryReplica ambryReplica = helixClusterManager.getReplicaForPartitionOnNode(replicaId.getDataNodeId(),
replicaId.getPartitionId().toString());
helixClusterManager.onReplicaEvent(ambryReplica, event);
}
}
Expand Down
Loading

0 comments on commit b0e11e8

Please sign in to comment.