diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtils.java index a1a77e42b..3a2164633 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtils.java @@ -12,6 +12,7 @@ import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric; import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde; import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType; +import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ModelUtils; import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef; import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad; @@ -354,4 +355,19 @@ public static String bootstrapServers(Map config) { public static Consumer createSampleStoreConsumer(Map configs, String clientIdPrefix) { return createConsumer(configs, clientIdPrefix, bootstrapServers(configs), ByteArrayDeserializer.class, ByteArrayDeserializer.class, false); } + + public static String convertMSKPrivateLinkHostToBrokerHost(Node node) { + return _convertMSKPrivateLinkHostToBrokerHost(node.host(), node.id()); + } + + public static String convertMSKPrivateLinkHostToBrokerHost(Broker broker) { + return _convertMSKPrivateLinkHostToBrokerHost(broker.host().name(), broker.id()); + } + + private static String _convertMSKPrivateLinkHostToBrokerHost(String privatelinkHost, int brokerId) { + final String expectedHostPrefix = "b-" + brokerId; + final String host = privatelinkHost.indexOf(expectedHostPrefix) != 0 ? + expectedHostPrefix + privatelinkHost.substring(privatelinkHost.indexOf(".")) : privatelinkHost; + return host; + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java index 8a9a73391..693c45b91 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java @@ -70,14 +70,14 @@ public void recordMetric(CruiseControlMetric ccm) { break; case TOPIC: TopicMetric tm = (TopicMetric) ccm; - _dotHandledTopicMetrics.computeIfAbsent(tm.topic(), t -> new RawMetricsHolder()) + _dotHandledTopicMetrics.computeIfAbsent(replaceDotsWithUnderscores(tm.topic()), t -> new RawMetricsHolder()) .recordCruiseControlMetric(ccm); break; case PARTITION: PartitionMetric pm = (PartitionMetric) ccm; - _dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(pm.topic(), pm.partition()), tp -> new RawMetricsHolder()) + _dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(replaceDotsWithUnderscores(pm.topic()), pm.partition()), tp -> new RawMetricsHolder()) .recordCruiseControlMetric(ccm); - _dotHandledTopicsWithPartitionSizeReported.add(pm.topic()); + _dotHandledTopicsWithPartitionSizeReported.add(replaceDotsWithUnderscores(pm.topic())); break; default: throw new IllegalStateException(String.format("Should never be here. Unrecognized metric scope %s", diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler.java index 34ed02b65..d83ef61ba 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler.java @@ -32,6 +32,7 @@ import static com.linkedin.cruisecontrol.common.config.ConfigDef.Type.CLASS; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.SEC_TO_MS; import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.replaceDotsWithUnderscores; +import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost; /** * Metric sampler that fetches Kafka metrics from a Prometheus server and converts them to samples. @@ -152,7 +153,7 @@ private Integer getBrokerIdForHostName(String host, Cluster cluster) { private void mapNodesToClusterId(Cluster cluster) { for (Node node : cluster.nodes()) { - _hostToBrokerIdMap.put(node.host(), node.id()); + _hostToBrokerIdMap.put(convertMSKPrivateLinkHostToBrokerHost(node), node.id()); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/BrokerStats.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/BrokerStats.java index 35d9ca3b7..50da6dd7d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/BrokerStats.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/BrokerStats.java @@ -21,7 +21,7 @@ import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.JSON_VERSION; import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.VERSION; - +import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost; /** * Get broker level stats in human readable format. @@ -61,7 +61,7 @@ public BrokerStats(KafkaCruiseControlConfig config) { * @param isEstimated {@code true} if the broker capacity is estimated, {@code false} otherwise. */ public void addSingleBrokerStats(Broker broker, double potentialBytesOutRate, boolean isEstimated) { - String host = broker.host().name(); + String host = convertMSKPrivateLinkHostToBrokerHost(broker); String rack = broker.rack().id(); SingleBrokerStats singleBrokerStats = new SingleBrokerStats(broker, potentialBytesOutRate, isEstimated); _brokerStats.add(singleBrokerStats); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/SingleBrokerStats.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/SingleBrokerStats.java index b5b109102..984f15a48 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/SingleBrokerStats.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/stats/SingleBrokerStats.java @@ -12,6 +12,8 @@ import java.util.HashMap; import java.util.Map; +import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost; + @JsonResponseClass public class SingleBrokerStats extends BasicStats { @JsonResponseField @@ -33,7 +35,7 @@ public class SingleBrokerStats extends BasicStats { SingleBrokerStats(Broker broker, double potentialBytesOutRate, boolean isEstimated) { super(broker, potentialBytesOutRate); - _host = broker.host().name(); + _host = convertMSKPrivateLinkHostToBrokerHost(broker); _id = broker.id(); _state = broker.state(); _isEstimated = isEstimated;