Skip to content

Commit

Permalink
Support set client.id with Zone ID for Kafka driver
Browse files Browse the repository at this point in the history
  • Loading branch information
codelipenghui committed Dec 2, 2024
1 parent fe3c5a0 commit e9da754
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@

public class KafkaBenchmarkDriver implements BenchmarkDriver {

private static final String ZONE_ID_CONFIG = "zone.id";
private static final String ZONE_ID_TEMPLATE = "{zone.id}";
private static final String KAFKA_CLIENT_ID = "client.id";
private Config config;

private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -63,6 +66,11 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
Properties commonProperties = new Properties();
commonProperties.load(new StringReader(config.commonConfig));

if (commonProperties.containsKey(KAFKA_CLIENT_ID)) {
commonProperties.put(KAFKA_CLIENT_ID, applyZoneId(commonProperties.getProperty(KAFKA_CLIENT_ID),
System.getProperty(ZONE_ID_CONFIG)));
}

producerProperties = new Properties();
commonProperties.forEach((key, value) -> producerProperties.put(key, value));
producerProperties.load(new StringReader(config.producerConfig));
Expand Down Expand Up @@ -151,6 +159,10 @@ public void close() throws Exception {
admin.close();
}

private static String applyZoneId(String clientId, String zoneId) {
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
}

private static final ObjectMapper mapper =
new ObjectMapper(new YAMLFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down

0 comments on commit e9da754

Please sign in to comment.