Skip to content

Commit

Permalink
make CC work form MSK fronted by privatelink
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyuliu-cb committed Mar 16, 2023
1 parent 96c3d6b commit 70a7457
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ModelUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
Expand Down Expand Up @@ -354,4 +355,19 @@ public static String bootstrapServers(Map<String, ?> config) {
public static Consumer<byte[], byte[]> createSampleStoreConsumer(Map<String, ?> configs, String clientIdPrefix) {
return createConsumer(configs, clientIdPrefix, bootstrapServers(configs), ByteArrayDeserializer.class, ByteArrayDeserializer.class, false);
}

public static String convertMSKPrivateLinkHostToBrokerHost(Node node) {
return _convertMSKPrivateLinkHostToBrokerHost(node.host(), node.id());
}

public static String convertMSKPrivateLinkHostToBrokerHost(Broker broker) {
return _convertMSKPrivateLinkHostToBrokerHost(broker.host().name(), broker.id());
}

private static String _convertMSKPrivateLinkHostToBrokerHost(String privatelinkHost, int brokerId) {
final String expectedHostPrefix = "b-" + brokerId;
final String host = privatelinkHost.indexOf(expectedHostPrefix) != 0 ?
expectedHostPrefix + privatelinkHost.substring(privatelinkHost.indexOf(".")) : privatelinkHost;
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public void recordMetric(CruiseControlMetric ccm) {
break;
case TOPIC:
TopicMetric tm = (TopicMetric) ccm;
_dotHandledTopicMetrics.computeIfAbsent(tm.topic(), t -> new RawMetricsHolder())
_dotHandledTopicMetrics.computeIfAbsent(replaceDotsWithUnderscores(tm.topic()), t -> new RawMetricsHolder())
.recordCruiseControlMetric(ccm);
break;
case PARTITION:
PartitionMetric pm = (PartitionMetric) ccm;
_dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(pm.topic(), pm.partition()), tp -> new RawMetricsHolder())
_dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(replaceDotsWithUnderscores(pm.topic()), pm.partition()), tp -> new RawMetricsHolder())
.recordCruiseControlMetric(ccm);
_dotHandledTopicsWithPartitionSizeReported.add(pm.topic());
_dotHandledTopicsWithPartitionSizeReported.add(replaceDotsWithUnderscores(pm.topic()));
break;
default:
throw new IllegalStateException(String.format("Should never be here. Unrecognized metric scope %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static com.linkedin.cruisecontrol.common.config.ConfigDef.Type.CLASS;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.SEC_TO_MS;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.replaceDotsWithUnderscores;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost;

/**
* Metric sampler that fetches Kafka metrics from a Prometheus server and converts them to samples.
Expand Down Expand Up @@ -152,7 +153,7 @@ private Integer getBrokerIdForHostName(String host, Cluster cluster) {

private void mapNodesToClusterId(Cluster cluster) {
for (Node node : cluster.nodes()) {
_hostToBrokerIdMap.put(node.host(), node.id());
_hostToBrokerIdMap.put(convertMSKPrivateLinkHostToBrokerHost(node), node.id());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.JSON_VERSION;
import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.VERSION;

import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost;

/**
* Get broker level stats in human readable format.
Expand Down Expand Up @@ -61,7 +61,7 @@ public BrokerStats(KafkaCruiseControlConfig config) {
* @param isEstimated {@code true} if the broker capacity is estimated, {@code false} otherwise.
*/
public void addSingleBrokerStats(Broker broker, double potentialBytesOutRate, boolean isEstimated) {
String host = broker.host().name();
String host = convertMSKPrivateLinkHostToBrokerHost(broker);
String rack = broker.rack().id();
SingleBrokerStats singleBrokerStats = new SingleBrokerStats(broker, potentialBytesOutRate, isEstimated);
_brokerStats.add(singleBrokerStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.HashMap;
import java.util.Map;

import static com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingUtils.convertMSKPrivateLinkHostToBrokerHost;

@JsonResponseClass
public class SingleBrokerStats extends BasicStats {
@JsonResponseField
Expand All @@ -33,7 +35,7 @@ public class SingleBrokerStats extends BasicStats {

SingleBrokerStats(Broker broker, double potentialBytesOutRate, boolean isEstimated) {
super(broker, potentialBytesOutRate);
_host = broker.host().name();
_host = convertMSKPrivateLinkHostToBrokerHost(broker);
_id = broker.id();
_state = broker.state();
_isEstimated = isEstimated;
Expand Down

0 comments on commit 70a7457

Please sign in to comment.