From e9da754d131e92334472d2e012f5ed1884348d9c Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 2 Dec 2024 12:35:25 -0800 Subject: [PATCH] Support set client.id with Zone ID for Kafka driver --- .../benchmark/driver/kafka/KafkaBenchmarkDriver.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java index a2789ac2..f474ea96 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkDriver.java @@ -45,6 +45,9 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver { + private static final String ZONE_ID_CONFIG = "zone.id"; + private static final String ZONE_ID_TEMPLATE = "{zone.id}"; + private static final String KAFKA_CLIENT_ID = "client.id"; private Config config; private List producers = Collections.synchronizedList(new ArrayList<>()); @@ -63,6 +66,11 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I Properties commonProperties = new Properties(); commonProperties.load(new StringReader(config.commonConfig)); + if (commonProperties.containsKey(KAFKA_CLIENT_ID)) { + commonProperties.put(KAFKA_CLIENT_ID, applyZoneId(commonProperties.getProperty(KAFKA_CLIENT_ID), + System.getProperty(ZONE_ID_CONFIG))); + } + producerProperties = new Properties(); commonProperties.forEach((key, value) -> producerProperties.put(key, value)); producerProperties.load(new StringReader(config.producerConfig)); @@ -151,6 +159,10 @@ public void close() throws Exception { admin.close(); } + private static String applyZoneId(String clientId, String zoneId) { + return clientId.replace(ZONE_ID_TEMPLATE, zoneId); + } + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);