diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java index 95af76f8..d516b076 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java @@ -65,22 +65,22 @@ public XinfraMonitor(Map allClusterProps) throws Exception { _services = new ConcurrentHashMap<>(); for (Map.Entry clusterProperty : allClusterProps.entrySet()) { - String name = clusterProperty.getKey(); + String clusterName = clusterProperty.getKey(); Map props = clusterProperty.getValue(); if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG)) - throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG); + throw new IllegalArgumentException(clusterName + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG); String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG); Class aClass = Class.forName(className); if (App.class.isAssignableFrom(aClass)) { - App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name); - _apps.put(name, clusterApp); + App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, clusterName); + _apps.put(clusterName, clusterApp); } else if (Service.class.isAssignableFrom(aClass)) { ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY) .getConstructor(Map.class, String.class) - .newInstance(props, name); + .newInstance(props, clusterName); Service service = serviceFactory.createService(); - _services.put(name, service); + _services.put(clusterName, service); } else { throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName()); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java index 258e9a16..a44b0827 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java @@ -55,13 +55,13 @@ public class SingleClusterMonitor implements App { private static final int SERVICES_INITIAL_CAPACITY = 4; private final TopicManagementService _topicManagementService; - private final String _name; + private final String _clusterName; private final List _allServices; private final boolean _isTopicManagementServiceEnabled; - public SingleClusterMonitor(Map props, String name) throws Exception { + public SingleClusterMonitor(Map props, String clusterName) throws Exception { ConsumerFactory consumerFactory = new ConsumerFactoryImpl(props); - _name = name; + _clusterName = clusterName; LOG.info("SingleClusterMonitor properties: {}", prettyPrint(props)); TopicManagementServiceConfig config = new TopicManagementServiceConfig(props); _isTopicManagementServiceEnabled = @@ -69,7 +69,8 @@ public SingleClusterMonitor(Map props, String name) throws Excep _allServices = new ArrayList<>(SERVICES_INITIAL_CAPACITY); CompletableFuture topicPartitionResult; if (_isTopicManagementServiceEnabled) { - _topicManagementService = new TopicManagementService(props, name); + String topicManagementServiceName = String.format("Topic-management-service-for-%s", clusterName); + _topicManagementService = new TopicManagementService(props, topicManagementServiceName); topicPartitionResult = _topicManagementService.topicPartitionResult(); // block on the MultiClusterTopicManagementService to complete. @@ -80,10 +81,9 @@ public SingleClusterMonitor(Map props, String name) throws Excep _topicManagementService = null; topicPartitionResult = new CompletableFuture<>(); topicPartitionResult.complete(null); - } - ProduceService produceService = new ProduceService(props, name); - ConsumeService consumeService = new ConsumeService(name, topicPartitionResult, consumerFactory); + ProduceService produceService = new ProduceService(props, clusterName); + ConsumeService consumeService = new ConsumeService(clusterName, topicPartitionResult, consumerFactory); _allServices.add(produceService); _allServices.add(consumeService); } @@ -126,7 +126,7 @@ public void start() throws Exception { } } - LOG.info(_name + "/SingleClusterMonitor started!"); + LOG.info(_clusterName + "/SingleClusterMonitor started!"); } @Override @@ -134,7 +134,7 @@ public void stop() { for (Service service : _allServices) { service.stop(); } - LOG.info(_name + "/SingleClusterMonitor stopped."); + LOG.info(_clusterName + "/SingleClusterMonitor stopped."); } @Override diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index 067cd46f..56c3ddc2 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -164,14 +164,14 @@ private void createDeleteClusterTopic() { try { int brokerCount = _adminClient.describeCluster().nodes().get().size(); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); Set brokers = new HashSet<>(); for (Node broker : _adminClient.describeCluster().nodes().get()) { BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null); brokers.add(brokerMetadata); } - if (!blackListedBrokers.isEmpty()) { - brokers.removeIf(broker -> blackListedBrokers.contains(broker.id())); + Set excludedBrokers = _topicFactory.getExcludedBrokers(_adminClient); + if (!excludedBrokers.isEmpty()) { + brokers.removeIf(broker -> excludedBrokers.contains(broker.id())); } // map from partition id to replica ids (i.e. broker ids). diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java index e6869da3..e1ad2f4d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java @@ -15,6 +15,7 @@ import com.linkedin.xinfra.monitor.services.configs.MultiClusterTopicManagementServiceConfig; import com.linkedin.xinfra.monitor.services.configs.TopicManagementServiceConfig; import com.linkedin.xinfra.monitor.topicfactory.TopicFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,6 +27,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -41,7 +43,6 @@ import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.ElectLeadersResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; @@ -248,7 +249,7 @@ static class TopicManagementHelper { private final int _minPartitionNum; private final Properties _topicProperties; private boolean _preferredLeaderElectionRequested; - private final int _requestTimeoutMs; + private final Duration _requestTimeout; private final List _bootstrapServers; // package private for unit testing @@ -261,6 +262,7 @@ static class TopicManagementHelper { @SuppressWarnings("unchecked") TopicManagementHelper(Map props) throws Exception { + TopicManagementServiceConfig config = new TopicManagementServiceConfig(props); AdminClientConfig adminClientConfig = new AdminClientConfig(props); String topicFactoryClassName = config.getString(TopicManagementServiceConfig.TOPIC_FACTORY_CLASS_CONFIG); @@ -273,7 +275,7 @@ static class TopicManagementHelper { _minPartitionsToBrokersRatio = config.getDouble(TopicManagementServiceConfig.PARTITIONS_TO_BROKERS_RATIO_CONFIG); _minPartitionNum = config.getInt(TopicManagementServiceConfig.MIN_PARTITION_NUM_CONFIG); _preferredLeaderElectionRequested = false; - _requestTimeoutMs = adminClientConfig.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); + _requestTimeout = Duration.ofMillis(adminClientConfig.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)); _bootstrapServers = adminClientConfig.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); _topicProperties = new Properties(); if (props.containsKey(TopicManagementServiceConfig.TOPIC_PROPS_CONFIG)) { @@ -288,9 +290,17 @@ static class TopicManagementHelper { TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap(); _topicFactory = (TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig); - _adminClient = constructAdminClient(props); LOGGER.info("{} configs: {}", _adminClient.getClass().getSimpleName(), props); + logConfigurationValues(); + } + + private void logConfigurationValues() { + LOGGER.info("TopicManagementHelper for cluster with Zookeeper connect {} is configured with " + + "[topic={}, topicCreationEnabled={}, topicAddPartitionEnabled={}, " + + "topicReassignPartitionAndElectLeaderEnabled={}, minPartitionsToBrokersRatio={}, " + + "minPartitionNum={}]", _zkConnect, _topic, _topicCreationEnabled, _topicAddPartitionEnabled, + _topicReassignPartitionAndElectLeaderEnabled, _minPartitionsToBrokersRatio, _minPartitionNum); } @SuppressWarnings("unchecked") @@ -317,7 +327,8 @@ int minPartitionNum() throws InterruptedException, ExecutionException { return Math.max((int) Math.ceil(_minPartitionsToBrokersRatio * brokerCount), _minPartitionNum); } - void maybeAddPartitions(int minPartitionNum) throws ExecutionException, InterruptedException { + void maybeAddPartitions(final int requiredMinPartitionNum) + throws ExecutionException, InterruptedException, CancellationException, TimeoutException { if (!_topicAddPartitionEnabled) { LOGGER.info("Adding partition to {} topic is not enabled in a cluster with Zookeeper URL {}. " + "Refer to config: {}", _topic, _zkConnect, TopicManagementServiceConfig.TOPIC_ADD_PARTITION_ENABLED_CONFIG); @@ -326,32 +337,36 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup Map> kafkaFutureMap = _adminClient.describeTopics(Collections.singleton(_topic)).values(); KafkaFuture topicDescriptions = kafkaFutureMap.get(_topic); - List partitions = topicDescriptions.get().partitions(); - - int partitionNum = partitions.size(); - if (partitionNum < minPartitionNum) { - LOGGER.info("{} will increase partition of the topic {} in the cluster from {}" + " to {}.", - this.getClass().toString(), _topic, partitionNum, minPartitionNum); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); - Set brokers = new HashSet<>(); - for (Node broker : _adminClient.describeCluster().nodes().get()) { - BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null); - brokers.add(brokerMetadata); - } + List partitions = topicDescriptions.get(_requestTimeout.toMillis(), TimeUnit.MILLISECONDS).partitions(); - if (!blackListedBrokers.isEmpty()) { - brokers.removeIf(broker -> blackListedBrokers.contains(broker.id())); - } + final int currPartitionNum = partitions.size(); + if (currPartitionNum >= requiredMinPartitionNum) { + LOGGER.debug("{} will not increase partition of the topic {} in the cluster. Current partition count {} and '" + + "minimum required partition count is {}.", this.getClass().toString(), _topic, currPartitionNum, requiredMinPartitionNum); + return; + } + LOGGER.info("{} will increase partition of the topic {} in the cluster from {}" + " to {}.", + this.getClass().toString(), _topic, currPartitionNum, requiredMinPartitionNum); + Set brokers = new HashSet<>(); + for (Node broker : _adminClient.describeCluster().nodes().get(_requestTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null); + brokers.add(brokerMetadata); + } + Set excludedBrokers = _topicFactory.getExcludedBrokers(_adminClient); + if (!excludedBrokers.isEmpty()) { + brokers.removeIf(broker -> excludedBrokers.contains(broker.id())); + } - List> newPartitionAssignments = - newPartitionAssignments(minPartitionNum, partitionNum, brokers, _replicationFactor); + List> newPartitionAssignments = + newPartitionAssignments(requiredMinPartitionNum, currPartitionNum, brokers, _replicationFactor); - NewPartitions newPartitions = NewPartitions.increaseTo(minPartitionNum, newPartitionAssignments); + NewPartitions newPartitions = NewPartitions.increaseTo(requiredMinPartitionNum, newPartitionAssignments); - Map newPartitionsMap = new HashMap<>(); - newPartitionsMap.put(_topic, newPartitions); - CreatePartitionsResult createPartitionsResult = _adminClient.createPartitions(newPartitionsMap); - } + Map newPartitionsMap = new HashMap<>(); + newPartitionsMap.put(_topic, newPartitions); + _adminClient.createPartitions(newPartitionsMap).all().get(_requestTimeout.toMillis(), TimeUnit.MILLISECONDS); + LOGGER.info("{} finished increasing partition of the topic {} in the cluster from {} to {}", + this.getClass().toString(), _topic, currPartitionNum, requiredMinPartitionNum); } static List> newPartitionAssignments(int minPartitionNum, int partitionNum, @@ -427,8 +442,8 @@ int numPartitions() throws InterruptedException, ExecutionException { private Set getAvailableBrokers() throws ExecutionException, InterruptedException { Set brokers = new HashSet<>(_adminClient.describeCluster().nodes().get()); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); - brokers.removeIf(broker -> blackListedBrokers.contains(broker.id())); + Set excludedBrokers = _topicFactory.getExcludedBrokers(_adminClient); + brokers.removeIf(broker -> excludedBrokers.contains(broker.id())); return brokers; } diff --git a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java index 26f29d86..8b2ddef7 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java @@ -32,7 +32,7 @@ public int createTopicIfNotExist(String topic, short replicationFactor, double p } @Override - public Set getBlackListedBrokers(AdminClient adminClient) { + public Set getExcludedBrokers(AdminClient adminClient) { return Collections.emptySet(); } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java index 7473d62d..b3d0a706 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java @@ -42,6 +42,6 @@ int createTopicIfNotExist(String topic, short replicationFactor, double partitio * @param adminClient AdminClient object * @return A set of brokers that don't take new partitions or reassigned partitions for topics. */ - Set getBlackListedBrokers(AdminClient adminClient); + Set getExcludedBrokers(AdminClient adminClient); }