Skip to content

Commit

Permalink
chore: minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Feb 6, 2024
1 parent 6a4d8b1 commit 3701cdd
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import java.io.File;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

@Slf4j
Expand Down Expand Up @@ -62,8 +60,8 @@ public void bindTo(final MeterRegistry registry) {
diskSpaceMetrics.bindTo(registry);

final ReadOnlyKeyValueStore<String, MetricAverage> store = null;
// kafkaStreams.store(
// StoreQueryParameters.fromNameAndType("latency-metrics", QueryableStoreTypes.keyValueStore()));
kafkaStreams.store(
StoreQueryParameters.fromNameAndType("latency-metrics", QueryableStoreTypes.keyValueStore()));
final LatencyMetricExporter latencyMetricExporter = new LatencyMetricExporter(store);
latencyMetricExporter.bindTo(registry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@
@Slf4j
public class KafkaTopicBootstrap extends Bootstrap implements Measurable {

private final KafkaClientMetrics kafkaClientMetrics;
private final AdminClient adminClient;

public KafkaTopicBootstrap(final CanaryConfig config) {
super(config);

final AdminClient adminClient =
KafkaAdminClient.create(config.toKafkaAdminConfig().toMap());
adminClient = KafkaAdminClient.create(config.toKafkaAdminConfig().toMap());
Shutdown.addShutdownHook("Topics Creator", adminClient);

kafkaClientMetrics = new KafkaClientMetrics(adminClient);
Shutdown.addShutdownHook("Topics Creator: Prometheus Exporter", kafkaClientMetrics);

try {
final NewTopic canaryTopic =
new NewTopic(config.getTopicName(), config.getTopicPartitions(), config.getTopicReplicas());
Expand All @@ -48,6 +44,8 @@ public KafkaTopicBootstrap(final CanaryConfig config) {

@Override
public void bindTo(final MeterRegistry registry) {
final KafkaClientMetrics kafkaClientMetrics = new KafkaClientMetrics(adminClient);
Shutdown.addShutdownHook("Topics Creator: Prometheus Exporter", kafkaClientMetrics);
kafkaClientMetrics.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@ public class MetricsEmitter implements Measurable {

private final Producer<String, Bytes> producer;
private final String topicName;
private final KafkaClientMetrics kafkaClientMetrics;

public MetricsEmitter(final String topicName, final Map<String, Object> kafkaProducerConfigMap) {
this.topicName = topicName;

producer = new KafkaProducer<>(kafkaProducerConfigMap);
Shutdown.addShutdownHook("Metrics Emitter", producer);

kafkaClientMetrics = new KafkaClientMetrics(producer);
Shutdown.addShutdownHook("Metrics Emitter: Prometheus Exporter", kafkaClientMetrics);
}

/**
Expand Down Expand Up @@ -68,6 +64,8 @@ public RecordMetadata emit(final String key, final Metric metric) {

@Override
public void bindTo(final MeterRegistry registry) {
final KafkaClientMetrics kafkaClientMetrics = new KafkaClientMetrics(producer);
Shutdown.addShutdownHook("Metrics Emitter: Prometheus Exporter", kafkaClientMetrics);
kafkaClientMetrics.bindTo(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,13 @@ public MetronomeBootstrap(final CanaryConfig config) {
}

private static String getServerVersion(final LHConfig lhConfig) {
final ServerVersionResponse serverVersionResponse =
lhConfig.getBlockingStub().getServerVersion(Empty.getDefaultInstance());
final ServerVersionResponse response = lhConfig.getBlockingStub().getServerVersion(Empty.getDefaultInstance());
return "%s.%s.%s%s"
.formatted(
serverVersionResponse.getMajorVersion(),
serverVersionResponse.getMinorVersion(),
serverVersionResponse.getPatchVersion(),
serverVersionResponse.hasPreReleaseIdentifier()
? "-" + serverVersionResponse.getPreReleaseIdentifier()
: "");
response.getMajorVersion(),
response.getMinorVersion(),
response.getPatchVersion(),
response.hasPreReleaseIdentifier() ? "-" + response.getPreReleaseIdentifier() : "");
}

@Override
Expand Down

0 comments on commit 3701cdd

Please sign in to comment.