From b92a465bc8d0ee045d5611c02dd630c5e0bc4e5a Mon Sep 17 00:00:00 2001 From: Robert Niedziela <175605712+robsunday@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:01:21 +0100 Subject: [PATCH] JMX Scraper: Kafka server, producer and consumer YAMLs and integration tests added (#1670) --- .../target_systems/KafkaIntegrationTest.java | 59 ++--- .../target-systems/kafka-consumer.groovy | 16 +- .../target-systems/kafka-producer.groovy | 16 +- .../resources/target-systems/kafka.groovy | 44 ++-- .../TargetSystemIntegrationTest.java | 41 +++- .../kafka/KafkaConsumerIntegrationTest.java | 142 ++++++++++++ .../kafka/KafkaContainerFactory.java | 61 +++++ .../kafka/KafkaIntegrationTest.java | 215 ++++++++++++++++++ .../kafka/KafkaProducerIntegrationTest.java | 155 +++++++++++++ .../contrib/jmxscraper/JmxScraper.java | 2 +- .../src/main/resources/kafka-consumer.yaml | 45 ++++ .../src/main/resources/kafka-producer.yaml | 48 ++++ jmx-scraper/src/main/resources/kafka.yaml | 213 +++++++++++++++++ 13 files changed, 986 insertions(+), 71 deletions(-) create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java create mode 100644 jmx-scraper/src/main/resources/kafka-consumer.yaml create mode 100644 jmx-scraper/src/main/resources/kafka-producer.yaml create mode 100644 jmx-scraper/src/main/resources/kafka.yaml diff --git a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java index a54909b67..f206ff1f8 100644 --- a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java +++ b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java @@ -140,51 +140,51 @@ List> kafkaBrokerAssertions() { metric, "kafka.partition.count", "The number of partitions on the broker", - "{partitions}"), + "{partition}"), metric -> assertGauge( metric, "kafka.partition.offline", "The number of partitions offline", - "{partitions}"), + "{partition}"), metric -> assertGauge( metric, "kafka.partition.under_replicated", "The number of under replicated partitions", - "{partitions}"), + "{partition}"), metric -> assertSumWithAttributes( metric, "kafka.isr.operation.count", "The number of in-sync replica shrink and expand operations", - "{operations}", + "{operation}", attrs -> attrs.containsOnly(entry("operation", "shrink")), attrs -> attrs.containsOnly(entry("operation", "expand"))), metric -> assertGauge( metric, "kafka.controller.active.count", - "controller is active on broker", - "{controllers}"), + "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.", + "{controller}"), metric -> assertSum( metric, "kafka.leader.election.rate", - "leader election rate - increasing indicates broker failures", - "{elections}"), + "Leader election rate - increasing indicates broker failures", + "{election}"), metric -> assertGauge( metric, "kafka.max.lag", - "max lag in messages between follower and leader replicas", - "{messages}"), + "Max lag in messages between follower and leader replicas", + "{message}"), metric -> assertSum( metric, "kafka.unclean.election.rate", - "unclean leader election rate - increasing indicates broker failures", - "{elections}")); + "Unclean leader election rate - increasing indicates broker failures", + "{election}")); } static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest { @@ -235,52 +235,52 @@ void endToEnd() { metric, "kafka.consumer.bytes-consumed-rate", "The average number of bytes consumed per second", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.fetch-rate", "The number of fetch requests for all topics per second", - "1"), + "{request}"), metric -> assertKafkaGauge( metric, "kafka.consumer.fetch-size-avg", "The average number of bytes fetched per request", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.records-consumed-rate", "The average number of records consumed per second", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.records-lag-max", "Number of messages the consumer lags behind the producer", - "1"), + "{record}"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.bytes-consumed-rate", "The average number of bytes consumed for all topics per second", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.fetch-size-avg", "The average number of bytes fetched per request for all topics", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.records-consumed-rate", "The average number of records consumed for all topics per second", - "1")); + "{record}")); } } @@ -300,14 +300,14 @@ void endToEnd() { metric, "kafka.producer.byte-rate", "The average number of bytes sent per second for a topic", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.producer.compression-rate", "The average compression rate of record batches for a topic", - "1", + "{ratio}", topics), metric -> assertKafkaGauge( @@ -320,27 +320,27 @@ void endToEnd() { metric, "kafka.producer.outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.producer.record-error-rate", "The average per-second number of record sends that resulted in errors for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.producer.record-retry-rate", "The average per-second number of retried record sends for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.producer.record-send-rate", "The average number of records sent per second for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( @@ -353,10 +353,13 @@ void endToEnd() { metric, "kafka.producer.request-rate", "The average number of requests sent per second", - "1"), + "{request}"), metric -> assertKafkaGauge( - metric, "kafka.producer.response-rate", "Responses received per second", "1")); + metric, + "kafka.producer.response-rate", + "Responses received per second", + "{response}")); } } diff --git a/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy b/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy index a2b8e3a74..dcd05a78e 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy @@ -16,45 +16,45 @@ def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics") otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate", - "The number of fetch requests for all topics per second", "1", + "The number of fetch requests for all topics per second", "{request}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "fetch-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max", - "Number of messages the consumer lags behind the producer", "1", + "Number of messages the consumer lags behind the producer", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "records-lag-max", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate", - "The average number of bytes consumed for all topics per second", "by", + "The average number of bytes consumed for all topics per second", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "bytes-consumed-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg", - "The average number of bytes fetched per request for all topics", "by", + "The average number of bytes fetched per request for all topics", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "fetch-size-avg", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate", - "The average number of records consumed for all topics per second", "1", + "The average number of records consumed for all topics per second", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "records-consumed-rate", otel.&doubleValueCallback) def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics") otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate", - "The average number of bytes consumed per second", "by", + "The average number of bytes consumed per second", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "bytes-consumed-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg", - "The average number of bytes fetched per request", "by", + "The average number of bytes fetched per request", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "fetch-size-avg", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate", - "The average number of records consumed per second", "1", + "The average number of records consumed per second", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "records-consumed-rate", otel.&doubleValueCallback) diff --git a/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy b/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy index 1d9583f32..2b3d99154 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy @@ -20,7 +20,7 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "io-wait-time-ns-avg", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate", - "The average number of outgoing bytes sent per second to all servers", "by", + "The average number of outgoing bytes sent per second to all servers", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "outgoing-byte-rate", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.request-latency-avg", @@ -28,37 +28,37 @@ otel.instrument(producerMetrics, "kafka.producer.request-latency-avg", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "request-latency-avg", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.request-rate", - "The average number of requests sent per second", "1", + "The average number of requests sent per second", "{request}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "request-rate", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.response-rate", - "Responses received per second", "1", + "Responses received per second", "{response}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "response-rate", otel.&doubleValueCallback) def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics") otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate", - "The average number of bytes sent per second for a topic", "by", + "The average number of bytes sent per second for a topic", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "byte-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate", - "The average compression rate of record batches for a topic", "1", + "The average compression rate of record batches for a topic", "{ratio}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "compression-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate", - "The average per-second number of record sends that resulted in errors for a topic", "1", + "The average per-second number of record sends that resulted in errors for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-error-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate", - "The average per-second number of retried record sends for a topic", "1", + "The average per-second number of retried record sends for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-retry-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate", - "The average number of records sent per second for a topic", "1", + "The average number of records sent per second for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-send-rate", otel.&doubleValueCallback) diff --git a/jmx-metrics/src/main/resources/target-systems/kafka.groovy b/jmx-metrics/src/main/resources/target-systems/kafka.groovy index 456f28573..c759e688d 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka.groovy @@ -18,7 +18,7 @@ def messagesInPerSec = otel.mbean("kafka.server:type=BrokerTopicMetrics,name=Mes otel.instrument(messagesInPerSec, "kafka.message.count", "The number of messages received by the broker", - "{messages}", + "{message}", "Count", otel.&longCounterCallback) def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec", @@ -26,7 +26,7 @@ def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProd otel.instrument(requests, "kafka.request.count", "The number of requests received by the broker", - "{requests}", + "{request}", [ "type" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "TotalProduceRequestsPerSec": @@ -45,7 +45,7 @@ def failedRequests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=Fai otel.instrument(failedRequests, "kafka.request.failed", "The number of requests to the broker resulting in a failure", - "{requests}", + "{request}", [ "type" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "FailedProduceRequestsPerSec": @@ -100,7 +100,7 @@ def network = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=BytesInPer otel.instrument(network, "kafka.network.io", "The bytes received or sent by the broker", - "by", + "By", [ "state" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "BytesInPerSec": @@ -119,7 +119,7 @@ def purgatorySize = otel.mbeans(["kafka.server:type=DelayedOperationPurgatory,na otel.instrument(purgatorySize, "kafka.purgatory.size", "The number of requests waiting in purgatory", - "{requests}", + "{request}", [ "type" : { mbean -> mbean.name().getKeyProperty("delayedOperation").toLowerCase() }, ], @@ -129,21 +129,21 @@ def partitionCount = otel.mbean("kafka.server:type=ReplicaManager,name=Partition otel.instrument(partitionCount, "kafka.partition.count", "The number of partitions on the broker", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def partitionOffline = otel.mbean("kafka.controller:type=KafkaController,name=OfflinePartitionsCount") otel.instrument(partitionOffline, "kafka.partition.offline", "The number of partitions offline", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def partitionUnderReplicated = otel.mbean("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions") otel.instrument(partitionUnderReplicated, "kafka.partition.under_replicated", "The number of under replicated partitions", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrinksPerSec", @@ -151,7 +151,7 @@ def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrin otel.instrument(isrOperations, "kafka.isr.operation.count", "The number of in-sync replica shrink and expand operations", - "{operations}", + "{operation}", [ "operation" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "IsrShrinksPerSec": @@ -167,29 +167,29 @@ otel.instrument(isrOperations, def maxLag = otel.mbean("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica") -otel.instrument(maxLag, "kafka.max.lag", "max lag in messages between follower and leader replicas", - "{messages}", "Value", otel.&longValueCallback) +otel.instrument(maxLag, "kafka.max.lag", "Max lag in messages between follower and leader replicas", + "{message}", "Value", otel.&longValueCallback) def activeControllerCount = otel.mbean("kafka.controller:type=KafkaController,name=ActiveControllerCount") -otel.instrument(activeControllerCount, "kafka.controller.active.count", "controller is active on broker", - "{controllers}", "Value", otel.&longValueCallback) +otel.instrument(activeControllerCount, "kafka.controller.active.count", "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.", + "{controller}", "Value", otel.&longValueCallback) def leaderElectionRate = otel.mbean("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs") -otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "leader election rate - increasing indicates broker failures", - "{elections}", "Count", otel.&longCounterCallback) +otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "Leader election rate - increasing indicates broker failures", + "{election}", "Count", otel.&longCounterCallback) def uncleanLeaderElections = otel.mbean("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec") -otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "unclean leader election rate - increasing indicates broker failures", - "{elections}", "Count", otel.&longCounterCallback) +otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "Unclean leader election rate - increasing indicates broker failures", + "{election}", "Count", otel.&longCounterCallback) def requestQueueSize = otel.mbean("kafka.network:type=RequestChannel,name=RequestQueueSize") -otel.instrument(requestQueueSize, "kafka.request.queue", "size of the request queue", - "{requests}", "Value", otel.&longValueCallback) +otel.instrument(requestQueueSize, "kafka.request.queue", "Size of the request queue", + "{request}", "Value", otel.&longValueCallback) def logFlushRate = otel.mbean("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs") -otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "log flush count", +otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "Log flush count", "ms", "Count", otel.&longCounterCallback) -otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "log flush time - 50th percentile", +otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "Log flush time - 50th percentile", "ms", "50thPercentile", otel.&doubleValueCallback) -otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "log flush time - 99th percentile", +otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "Log flush time - 99th percentile", "ms", "99thPercentile", otel.&doubleValueCallback) diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java index ddfc0567f..9c58c07fc 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -56,6 +57,7 @@ public abstract class TargetSystemIntegrationTest { private static Network network; private static OtlpGrpcServer otlpServer; + private Collection> prerequisiteContainers; private GenericContainer target; private JmxScraperContainer scraper; @@ -86,12 +88,23 @@ static void afterAll() { @AfterEach void afterEach() { + if (scraper != null && scraper.isRunning()) { + scraper.stop(); + } + if (target != null && target.isRunning()) { target.stop(); } - if (scraper != null && scraper.isRunning()) { - scraper.stop(); + + if (prerequisiteContainers != null) { + prerequisiteContainers.forEach( + container -> { + if (container.isRunning()) { + container.stop(); + } + }); } + if (otlpServer != null) { otlpServer.reset(); } @@ -103,14 +116,31 @@ protected String scraperBaseImage() { @Test void endToEndTest(@TempDir Path tmpDir) { + startContainers(tmpDir); + verifyMetrics(); + } + + protected void startContainers(Path tmpDir) { + prerequisiteContainers = createPrerequisiteContainers(); target = createTargetContainer(JMX_PORT) .withLogConsumer(new Slf4jLogConsumer(targetSystemLogger)) .withNetwork(network) .withNetworkAliases(TARGET_SYSTEM_NETWORK_ALIAS); + + // If there are any containers that must be started before target then initialize them. + // Then make target depending on them, so it is started after dependencies + for (GenericContainer container : prerequisiteContainers) { + container.withNetwork(network); + target.dependsOn(container); + } + + // Target container must be running before scraper container is customized. + // It is necessary to allow interactions with the container, like file copying etc. target.start(); + // Create and initialize scraper container scraper = new JmxScraperContainer(otlpEndpoint, scraperBaseImage()) .withLogConsumer(new Slf4jLogConsumer(jmxScraperLogger)) @@ -119,14 +149,13 @@ void endToEndTest(@TempDir Path tmpDir) { scraper = customizeScraperContainer(scraper, target, tmpDir); scraper.start(); - - verifyMetrics(); } protected void verifyMetrics() { MetricsVerifier metricsVerifier = createMetricsVerifier(); await() .atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted( () -> { List receivedMetrics = otlpServer.getMetrics(); @@ -158,6 +187,10 @@ protected JmxScraperContainer customizeScraperContainer( return scraper; } + protected Collection> createPrerequisiteContainers() { + return Collections.emptyList(); + } + private static class OtlpGrpcServer extends ServerExtension { private final BlockingQueue metricRequests = diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java new file mode 100644 index 000000000..ed61aabbb --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java @@ -0,0 +1,142 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaConsumerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaConsumerIntegrationTest extends TargetSystemIntegrationTest { + + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + GenericContainer kafka = + createKafkaContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka"))) + .withNetworkAliases("kafka") + .dependsOn(zookeeper); + + GenericContainer kafkaProducer = + createKafkaProducerContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer"))) + .withNetworkAliases("kafka-producer") + .dependsOn(kafka); + + return Arrays.asList(zookeeper, kafka, kafkaProducer); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaConsumerContainer() + .withEnv("JMX_PORT", Integer.toString(jmxPort)) + .withExposedPorts(jmxPort) + .waitingFor(Wait.forListeningPorts(jmxPort)); + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka-consumer"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + // TODO: change to follow semconv + AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id"); + AttributeMatcher topicAttribute = attribute("topic", "test-topic-1"); + + return MetricsVerifier.create() + .add( + "kafka.consumer.fetch-rate", + metric -> + metric + .hasDescription("The number of fetch requests for all topics per second") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.total.bytes-consumed-rate", + metric -> + metric + .hasDescription( + "The average number of bytes consumed for all topics per second") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.total.fetch-size-avg", + metric -> + metric + .hasDescription( + "The average number of bytes fetched per request for all topics") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.total.records-consumed-rate", + metric -> + metric + .hasDescription( + "The average number of records consumed for all topics per second") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.records-lag-max", + metric -> + metric + .hasDescription("Number of messages the consumer lags behind the producer") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.bytes-consumed-rate", + metric -> + metric + .hasDescription("The average number of bytes consumed per second") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.consumer.fetch-size-avg", + metric -> + metric + .hasDescription("The average number of bytes fetched per request") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.consumer.records-consumed-rate", + metric -> + metric + .hasDescription("The average number of records consumed per second") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup(clientIdAttribute, topicAttribute))); + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java new file mode 100644 index 000000000..8eb9432a5 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import java.time.Duration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaContainerFactory { + private static final int KAFKA_PORT = 9092; + private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT; + private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1"; + + private KafkaContainerFactory() {} + + public static GenericContainer createZookeeperContainer() { + return new GenericContainer<>("zookeeper:3.5") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forListeningPort()); + } + + public static GenericContainer createKafkaContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + .withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181") + .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1 + .withStartupTimeout(Duration.ofMinutes(2)) + .withExposedPorts(KAFKA_PORT) + .waitingFor( + Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1)); + } + + public static GenericContainer createKafkaProducerContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + .withCommand( + "sh", + "-c", + "echo 'Sending messages to test-topic-1'; " + + "i=1; while true; do echo \"Message $i\"; sleep .25; i=$((i+1)); done | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server " + + KAFKA_BROKER + + " --topic test-topic-1;") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forLogMessage(".*Welcome to the Bitnami kafka container.*", 1)); + } + + public static GenericContainer createKafkaConsumerContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + .withCommand( + "kafka-console-consumer.sh", + "--bootstrap-server", + KAFKA_BROKER, + "--whitelist", + "test-topic-.*", + "--max-messages", + "100") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forListeningPort()); + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java new file mode 100644 index 000000000..f59040509 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java @@ -0,0 +1,215 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcherGroup; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +public class KafkaIntegrationTest extends TargetSystemIntegrationTest { + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + return Collections.singletonList(zookeeper); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaContainer().withEnv("JMX_PORT", Integer.toString(jmxPort)); + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + AttributeMatcherGroup[] requestTypes = { + attributeGroup(attribute("type", "Produce")), + attributeGroup(attribute("type", "FetchFollower")), + attributeGroup(attribute("type", "FetchConsumer")) + }; + + return MetricsVerifier.create() + .add( + "kafka.message.count", + metric -> + metric + .hasDescription("The number of messages received by the broker") + .hasUnit("{message}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.request.count", + metric -> + metric + .hasDescription("The number of requests received by the broker") + .hasUnit("{request}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "produce")), + attributeGroup(attribute("type", "fetch")))) + .add( + "kafka.request.failed", + metric -> + metric + .hasDescription("The number of requests to the broker resulting in a failure") + .hasUnit("{request}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "produce")), + attributeGroup(attribute("type", "fetch")))) + .add( + "kafka.network.io", + metric -> + metric + .hasDescription("The bytes received or sent by the broker") + .hasUnit("By") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("direction", "in")), + attributeGroup(attribute("direction", "out")))) + .add( + "kafka.purgatory.size", + metric -> + metric + .hasDescription("The number of requests waiting in purgatory") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "Produce")), + attributeGroup(attribute("type", "Fetch")))) + .add( + "kafka.request.time.total", + metric -> + metric + .hasDescription("The total time the broker has taken to service requests") + .hasUnit("ms") + .isCounter() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.50p", + metric -> + metric + .hasDescription( + "The 50th percentile time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.99p", + metric -> + metric + .hasDescription( + "The 99th percentile time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.avg", + metric -> + metric + .hasDescription("The average time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.queue", + metric -> + metric + .hasDescription("Size of the request queue") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.count", + metric -> + metric + .hasDescription("The number of partitions on the broker") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.offline", + metric -> + metric + .hasDescription("The number of partitions offline") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.under_replicated", + metric -> + metric + .hasDescription("The number of under replicated partitions") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.isr.operation.count", + metric -> + metric + .hasDescription("The number of in-sync replica shrink and expand operations") + .hasUnit("{operation}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("operation", "shrink")), + attributeGroup(attribute("operation", "expand")))) + .add( + "kafka.controller.active.count", + metric -> + metric + .hasDescription( + "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.") + .hasUnit("{controller}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.leader.election.rate", + metric -> + metric + .hasDescription("The leader election count") + .hasUnit("{election}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.unclean.election.rate", + metric -> + metric + .hasDescription( + "Unclean leader election count - increasing indicates broker failures") // CHANGED + .hasUnit("{election}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.max.lag", + metric -> + metric + .hasDescription( + "The max lag in messages between follower and leader replicas") // CHANGED + .hasUnit("{message}") + .isGauge() + .hasDataPointsWithoutAttributes()); + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java new file mode 100644 index 000000000..155cb9fc5 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java @@ -0,0 +1,155 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaProducerIntegrationTest extends TargetSystemIntegrationTest { + + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + GenericContainer kafka = + createKafkaContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka"))) + .withNetworkAliases("kafka") + .dependsOn(zookeeper); + + return Arrays.asList(zookeeper, kafka); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaProducerContainer() + .withEnv("JMX_PORT", Integer.toString(jmxPort)) + .withExposedPorts(jmxPort) + .waitingFor(Wait.forListeningPorts(jmxPort)); + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka-producer"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + // TODO: change to follow semconv + AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id"); + AttributeMatcher topicAttribute = attribute("topic", "test-topic-1"); + + return MetricsVerifier.create() + .add( + "kafka.producer.io-wait-time-ns-avg", + metric -> + metric + .hasDescription( + "The average length of time the I/O thread spent waiting for a socket ready for reads or writes") + .hasUnit("ns") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.producer.outgoing-byte-rate", + metric -> + metric + .hasDescription( + "The average number of outgoing bytes sent per second to all servers") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.producer.request-latency-avg", + metric -> + metric + .hasDescription("The average request latency") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.producer.request-rate", + metric -> + metric + .hasDescription("The average number of requests sent per second") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.producer.response-rate", + metric -> + metric + .hasDescription("Responses received per second") + .hasUnit("{response}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) + + // Per topic metrics + .add( + "kafka.producer.byte-rate", + metric -> + metric + .hasDescription("The average number of bytes sent per second for a topic") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.producer.compression-rate", + metric -> + metric + .hasDescription( + "The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size") + .hasUnit("{ratio}") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.producer.record-error-rate", + metric -> + metric + .hasDescription( + "The average per-second number of record sends that resulted in errors for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.producer.record-retry-rate", + metric -> + metric + .hasDescription( + "The average per-second number of retried record sends for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) + .add( + "kafka.producer.record-send-rate", + metric -> + metric + .hasDescription("The average number of records sent per second for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup(clientIdAttribute, topicAttribute))); + } +} diff --git a/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java b/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java index ad8c8b7f4..656b05eb9 100644 --- a/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java +++ b/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java @@ -237,7 +237,7 @@ private static void addRulesForSystem(String system, MetricConfiguration conf) { RuleParser parserInstance = RuleParser.get(); parserInstance.addMetricDefsTo(conf, inputStream, system); } else { - throw new IllegalArgumentException("No support for system" + system); + throw new IllegalArgumentException("No support for system " + system); } } catch (Exception e) { throw new IllegalStateException("Error while loading rules for system " + system, e); diff --git a/jmx-scraper/src/main/resources/kafka-consumer.yaml b/jmx-scraper/src/main/resources/kafka-consumer.yaml new file mode 100644 index 000000000..0324c4b19 --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka-consumer.yaml @@ -0,0 +1,45 @@ +--- +# Kafka Consumer metrics +rules: + + - bean: kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics + metricAttribute: + client-id: param(client-id) + prefix: kafka.consumer. + type: gauge + mapping: + fetch-rate: + desc: The number of fetch requests for all topics per second + unit: '{request}' + bytes-consumed-rate: + metric: total.bytes-consumed-rate + desc: The average number of bytes consumed for all topics per second + unit: By + fetch-size-avg: + metric: total.fetch-size-avg + desc: The average number of bytes fetched per request for all topics + unit: By + records-consumed-rate: + metric: total.records-consumed-rate + desc: The average number of records consumed for all topics per second + unit: '{record}' + records-lag-max: + desc: Number of messages the consumer lags behind the producer + unit: '{record}' + + - bean: kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics + metricAttribute: + client-id: param(client-id) + topic: param(topic) + prefix: kafka.consumer. + type: gauge + mapping: + bytes-consumed-rate: + desc: The average number of bytes consumed per second + unit: By + fetch-size-avg: + desc: The average number of bytes fetched per request + unit: By + records-consumed-rate: + desc: The average number of records consumed per second + unit: '{record}' diff --git a/jmx-scraper/src/main/resources/kafka-producer.yaml b/jmx-scraper/src/main/resources/kafka-producer.yaml new file mode 100644 index 000000000..f3d5ff69c --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka-producer.yaml @@ -0,0 +1,48 @@ +--- +# Kafka Producer metrics +rules: + - bean: kafka.producer:client-id=*,type=producer-metrics + metricAttribute: + client-id: param(client-id) + prefix: kafka.producer. + type: gauge + mapping: + io-wait-time-ns-avg: + desc: The average length of time the I/O thread spent waiting for a socket ready for reads or writes + unit: ns + outgoing-byte-rate: + desc: The average number of outgoing bytes sent per second to all servers + unit: By + request-latency-avg: + desc: The average request latency + unit: ms + request-rate: + desc: The average number of requests sent per second + unit: '{request}' + response-rate: + desc: Responses received per second + unit: '{response}' + + # per topic metrics + - bean: kafka.producer:client-id=*,topic=*,type=producer-topic-metrics + metricAttribute: + client-id: param(client-id) + topic: param(topic) + prefix: kafka.producer. + type: gauge + mapping: + byte-rate: + desc: The average number of bytes sent per second for a topic + unit: By + compression-rate: + desc: The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size + unit: '{ratio}' + record-error-rate: + desc: The average per-second number of record sends that resulted in errors for a topic + unit: '{record}' + record-retry-rate: + desc: The average per-second number of retried record sends for a topic + unit: '{record}' + record-send-rate: + desc: The average number of records sent per second for a topic + unit: '{record}' diff --git a/jmx-scraper/src/main/resources/kafka.yaml b/jmx-scraper/src/main/resources/kafka.yaml new file mode 100644 index 000000000..cf737aa56 --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka.yaml @@ -0,0 +1,213 @@ +--- +rules: + # Broker metrics + + - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec + mapping: + Count: + metric: kafka.message.count + type: counter + desc: The number of messages received by the broker + unit: "{message}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec + metricAttribute: + type: const(fetch) + mapping: + Count: + metric: &metric kafka.request.count + type: &type counter + desc: &desc The number of requests received by the broker + unit: &unit "{request}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec + metricAttribute: + type: const(produce) + mapping: + Count: + metric: *metric + type: *type + desc: *desc + unit: *unit + + - bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec + metricAttribute: + type: const(fetch) + mapping: + Count: + metric: &metric kafka.request.failed + type: &type counter + desc: &desc The number of requests to the broker resulting in a failure + unit: &unit "{request}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec + metricAttribute: + type: const(produce) + mapping: + Count: + metric: *metric + type: *type + desc: *desc + unit: *unit + + - beans: + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower + metricAttribute: + type: param(request) + unit: ms + mapping: + Count: + metric: kafka.request.time.total + type: counter + desc: The total time the broker has taken to service requests + 50thPercentile: + metric: kafka.request.time.50p + type: gauge + desc: The 50th percentile time the broker has taken to service requests + 99thPercentile: + metric: kafka.request.time.99p + type: gauge + desc: The 99th percentile time the broker has taken to service requests + # Added + Mean: + metric: kafka.request.time.avg + type: gauge + desc: The average time the broker has taken to service requests + + - bean: kafka.network:type=RequestChannel,name=RequestQueueSize + mapping: + Value: + metric: kafka.request.queue + type: gauge + desc: Size of the request queue + unit: "{request}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec + metricAttribute: + direction: const(in) + mapping: + Count: + metric: &metric kafka.network.io + type: &type counter + desc: &desc The bytes received or sent by the broker + unit: &unit By + + - bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec + metricAttribute: + direction: const(out) + mapping: + Count: + metric: *metric + type: *type + desc: *desc + unit: *unit + + - beans: + - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce + - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch + metricAttribute: + type: param(delayedOperation) + mapping: + Value: + metric: kafka.purgatory.size + type: gauge + desc: The number of requests waiting in purgatory + unit: "{request}" + + - bean: kafka.server:type=ReplicaManager,name=PartitionCount + mapping: + Value: + metric: kafka.partition.count + type: gauge + desc: The number of partitions on the broker + unit: "{partition}" + + - bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount + mapping: + Value: + metric: kafka.partition.offline + type: gauge + desc: The number of partitions offline + unit: "{partition}" + + - bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions + mapping: + Value: + metric: kafka.partition.under_replicated + type: gauge + desc: The number of under replicated partitions + unit: "{partition}" + + - bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec + metricAttribute: + operation: const(shrink) + mapping: + Count: + metric: kafka.isr.operation.count + type: counter + desc: The number of in-sync replica shrink and expand operations + unit: "{operation}" + + - bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec + metricAttribute: + operation: const(expand) + mapping: + Count: + metric: kafka.isr.operation.count + type: counter + desc: The number of in-sync replica shrink and expand operations + unit: "{operation}" + + - bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica + mapping: + Value: + metric: kafka.max.lag + type: gauge + desc: The max lag in messages between follower and leader replicas + unit: "{message}" + + + - bean: kafka.controller:type=KafkaController,name=ActiveControllerCount + mapping: + Value: + metric: kafka.controller.active.count + type: gauge + desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. + unit: "{controller}" + + - bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs + mapping: + Count: + metric: kafka.leader.election.rate + type: counter + desc: The leader election count + unit: "{election}" + + - bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec + mapping: + Count: + metric: kafka.unclean.election.rate + type: counter + desc: Unclean leader election count - increasing indicates broker failures + unit: "{election}" + + # Log metrics + + - bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs + unit: ms + type: gauge + prefix: kafka.logs.flush. + mapping: + Count: + metric: count + unit: '{flush}' + type: counter + desc: Log flush count + 50thPercentile: + metric: time.50p + desc: Log flush time - 50th percentile + 99thPercentile: + metric: time.99p + desc: Log flush time - 99th percentile