From cf12556cf09186a3360a3e1cb3b1a7bd9ef8394f Mon Sep 17 00:00:00 2001 From: andrewluotechnologies Date: Mon, 3 May 2021 23:11:38 +0200 Subject: [PATCH 1/2] Remove support for Kafka 0.8 --- README.md | 24 +-- build.gradle | 31 +--- .../kafka/kafka08/StatsdMetricsReporter.java | 152 ------------------ .../kafka08/StatsdMetricsReporterMBean.java | 27 ---- .../kafka08/StatsdMetricsReporterTest.java | 76 --------- 5 files changed, 10 insertions(+), 300 deletions(-) delete mode 100644 src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporter.java delete mode 100644 src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterMBean.java delete mode 100644 src/test/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterTest.java diff --git a/README.md b/README.md index a247dcc5f..a1236ccb6 100644 --- a/README.md +++ b/README.md @@ -19,12 +19,13 @@ Metrics can be filtered based on the metric name and the metric dimensions (min, ## Supported Kafka versions - For Kafka `0.9.0.0` or later use `kafka-statsd-metrics2-0.5.0` -- For Kafka `0.8.2.0` or later use `kafka-statsd-metrics2-0.4.0` -- For Kafka `0.8.1.1` or prior use `kafka-statsd-metrics2-0.3.0` ## Releases +### 0.5.5 +- - Remove support for Kafka v0.8. + ### 0.5.4 - Fix metrics with different tags not reported properly @@ -58,7 +59,7 @@ Metrics can be filtered based on the metric name and the metric dimensions (min, - Install the jar in Kafka classpath, typically `./kafka_2.11-0.9.0.1/libs/` - In the Kafka config file, `server.properties`, add the following properties. Default values are in parenthesis. -## How to use metrics in Kafka 0.9 / 0.8? +## How to use metrics in Kafka 0.9? ### New metrics in kafka 0.9 1. Add `metric.reporters` in producer.properties or consumer.properties @@ -74,23 +75,6 @@ Producer: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --zookeeper localhost:2181 --topic test -from-beginning --consumer.config config/consumer.properties ``` -### Old metrics in kafka 0.8 - -1. Add `kafka.metrics.reporters` in producer.properties or consumer.properties -```bash - # declare the reporter if old producer/consumer is used - kafka.metrics.reporters=com.airbnb.kafka.kafka08.StatsdMetricsReporter -``` -2. Run old-producer or old-consumer - -```bash - bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties --old-producer - bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper localhost:2181 --topic test -from-beginning --consumer.config config/consumer.properties -``` - -2. Run old-consumer - - ### Configurations ```bash # declare the reporter if new producer/consumer is used diff --git a/build.gradle b/build.gradle index 12d38d967..e0908947e 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { defaultTasks 'build' group = 'com.airbnb' -description = 'StatsD Support for Kafka Metrics (kafka version: 0.8 and 0.9)' +description = 'StatsD Support for Kafka Metrics (kafka version: 0.9)' repositories { mavenCentral() @@ -18,7 +18,7 @@ repositories { ext { defaultKafkaVersion = '0.9.0.1' kafkaVersion = project.getProperties().getOrDefault("kafkaVersion", defaultKafkaVersion) - kafkaVersionsToTest = ['0.8.2.2', '0.9.0.1', '0.10.2.2', '1.1.1', '2.3.1'] + kafkaVersionsToTest = ['0.9.0.1', '0.10.2.2', '1.1.1', '2.3.1'] testWithKafkaClient = project.getProperties().getOrDefault("testWithKafkaClient", false) kafkaClientVersionsToTest = ['0.9.0.1', '0.10.2.2', '1.1.1', '2.3.1'] @@ -27,18 +27,12 @@ ext { dependencies { compile 'com.indeed:java-dogstatsd-client:2.0.11' - if (testWithKafkaClient) { - compileOnly "org.apache.kafka:kafka-clients:${kafkaVersion}" - } else { - compileOnly "org.apache.kafka:kafka_2.11:${defaultKafkaVersion}" - } + compileOnly "org.apache.kafka:kafka-clients:${kafkaVersion}" + compileOnly 'com.yammer.metrics:metrics-core:2.2.0' compileOnly 'org.slf4j:slf4j-log4j12:+' - if (testWithKafkaClient) { - testCompile "org.apache.kafka:kafka-clients:${kafkaVersion}" - } else { - testCompile "org.apache.kafka:kafka_2.11:${kafkaVersion}" - } + testCompile "org.apache.kafka:kafka-clients:${kafkaVersion}" + testCompile 'com.yammer.metrics:metrics-core:2.2.0' testCompile 'org.slf4j:slf4j-log4j12:+' testCompile 'junit:junit:4.11' testCompile 'org.easymock:easymock:3.2' @@ -61,11 +55,9 @@ def excludeFromCompile(Task task, Map> excludeMap) { if (testWithKafkaClient) { def srcExcludes = [ - "com/airbnb/kafka/kafka08/": ["**/*"], "com/airbnb/metrics/": ["ExcludeMetricPredicate", "MetricNameFormatter", "Parser", "ParserForNoTag", "ParserForTagInMBeanName", "StatsDReporter"] ] def testExcludes = [ - "com/airbnb/kafka/kafka08/": ["**/*"], "com/airbnb/metrics/": ["DimensionTest", "ExcludeMetricPredicateTest", "MetricNameFormatterTest", "ParserTest", "StatsDReporterTest"] ] @@ -73,17 +65,6 @@ if (testWithKafkaClient) { excludeFromCompile(compileTestJava, testExcludes) } -configurations { - // manually excludes some unnecessary dependencies - compile.exclude module: 'zookeeper' - compile.exclude module: 'zkclient' - compile.exclude module: 'javax' - compile.exclude module: 'jline' - compile.exclude module: 'jms' - compile.exclude module: 'jmxri' - compile.exclude module: 'jmxtools' - compile.exclude module: 'mail' -} shadowJar { exclude 'META-INF/*.DSA' diff --git a/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporter.java b/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporter.java deleted file mode 100644 index 3750e610a..000000000 --- a/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporter.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright (c) 2015. Airbnb.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.airbnb.kafka.kafka08; - -import com.airbnb.metrics.Dimension; -import com.airbnb.metrics.ExcludeMetricPredicate; -import com.airbnb.metrics.StatsDReporter; -import com.timgroup.statsd.NonBlockingStatsDClient; -import com.timgroup.statsd.StatsDClient; -import com.timgroup.statsd.StatsDClientException; -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.MetricPredicate; -import com.yammer.metrics.reporting.AbstractPollingReporter; -import kafka.metrics.KafkaMetricsReporter; -import kafka.utils.VerifiableProperties; -import org.slf4j.LoggerFactory; - -import java.util.EnumSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * - */ -public class StatsdMetricsReporter implements StatsdMetricsReporterMBean, KafkaMetricsReporter { - - private static final org.slf4j.Logger log = LoggerFactory.getLogger(StatsDReporter.class); - - public static final String DEFAULT_EXCLUDE_REGEX = "(kafka\\.server\\.FetcherStats.*ConsumerFetcherThread.*)|(kafka\\.consumer\\.FetchRequestAndResponseMetrics.*)|(.*ReplicaFetcherThread.*)|(kafka\\.server\\.FetcherLagMetrics\\..*)|(kafka\\.log\\.Log\\..*)|(kafka\\.cluster\\.Partition\\..*)"; - - private boolean enabled; - private final AtomicBoolean running = new AtomicBoolean(false); - private String host; - private int port; - private String prefix; - private long pollingPeriodInSeconds; - private EnumSet metricDimensions; - private MetricPredicate metricPredicate; - private StatsDClient statsd; - private boolean isTagEnabled; - private AbstractPollingReporter underlying = null; - - @Override - public String getMBeanName() { - return "kafka:type=" + getClass().getName(); - } - - public boolean isRunning() { - return running.get(); - } - - //try to make it compatible with kafka-statsd-metrics2 - @Override - public synchronized void init(VerifiableProperties props) { - loadConfig(props); - if (enabled) { - log.info("Reporter is enabled and starting..."); - startReporter(pollingPeriodInSeconds); - } else { - log.warn("Reporter is disabled"); - } - } - - private void loadConfig(VerifiableProperties props) { - enabled = props.getBoolean("external.kafka.statsd.reporter.enabled", false); - host = props.getString("external.kafka.statsd.host", "localhost"); - port = props.getInt("external.kafka.statsd.port", 8125); - prefix = props.getString("external.kafka.statsd.metrics.prefix", ""); - pollingPeriodInSeconds = props.getInt("kafka.metrics.polling.interval.secs", 10); - metricDimensions = Dimension.fromProperties(props.props(), "external.kafka.statsd.dimension.enabled."); - - String excludeRegex = props.getString("external.kafka.statsd.metrics.exclude_regex", DEFAULT_EXCLUDE_REGEX); - if (excludeRegex != null && excludeRegex.length() != 0) { - metricPredicate = new ExcludeMetricPredicate(excludeRegex); - } else { - metricPredicate = MetricPredicate.ALL; - } - - this.isTagEnabled = props.getBoolean("external.kafka.statsd.tag.enabled", true); - } - - @Override - public void startReporter(long pollingPeriodInSeconds) { - if (pollingPeriodInSeconds <= 0) { - throw new IllegalArgumentException("Polling period must be greater than zero"); - } - - synchronized (running) { - if (running.get()) { - log.warn("Reporter is already running"); - } else { - statsd = createStatsd(); - underlying = new StatsDReporter( - Metrics.defaultRegistry(), - statsd, - metricPredicate, - metricDimensions, - isTagEnabled); - underlying.start(pollingPeriodInSeconds, TimeUnit.SECONDS); - log.info("Started Reporter with host={}, port={}, polling_period_secs={}, prefix={}", - host, port, pollingPeriodInSeconds, prefix); - running.set(true); - } - } - } - - private StatsDClient createStatsd() { - try { - return new NonBlockingStatsDClient( - prefix, /* prefix to any stats; may be null or empty string */ - host, /* common case: localhost */ - port /* port */ - ); - } catch (StatsDClientException ex) { - log.error("Reporter cannot be started"); - throw ex; - } - } - - @Override - public void stopReporter() { - if (!enabled) { - log.warn("Reporter is disabled"); - } else { - synchronized (running) { - if (running.get()) { - underlying.shutdown(); - statsd.stop(); - running.set(false); - log.info("Stopped Reporter with host={}, port={}", host, port); - } else { - log.warn("Reporter is not running"); - } - } - } - } - -} diff --git a/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterMBean.java b/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterMBean.java deleted file mode 100644 index c795903df..000000000 --- a/src/main/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterMBean.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2015. Airbnb.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.airbnb.kafka.kafka08; - -import kafka.metrics.KafkaMetricsReporterMBean; - -/** - * @see kafka.metrics.KafkaMetricsReporterMBean: the custom reporter needs to - * additionally implement an MBean trait that extends kafka.metrics.KafkaMetricsReporterMBean - * so that the registered MBean is compliant with the standard MBean convention. - */ -public interface StatsdMetricsReporterMBean extends KafkaMetricsReporterMBean { -} diff --git a/src/test/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterTest.java b/src/test/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterTest.java deleted file mode 100644 index 5a5cb744b..000000000 --- a/src/test/java/com/airbnb/kafka/kafka08/StatsdMetricsReporterTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2015. Airbnb.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.airbnb.kafka.kafka08; - -import kafka.utils.VerifiableProperties; -import org.junit.Before; -import org.junit.Test; - -import java.util.Properties; - -import static org.easymock.EasyMock.*; -import static org.junit.Assert.*; - -public class StatsdMetricsReporterTest { - - private VerifiableProperties properties; - - @Before - public void init() { - properties = createMock(VerifiableProperties.class); - expect(properties.props()).andReturn(new Properties()); - expect(properties.getInt("kafka.metrics.polling.interval.secs", 10)).andReturn(11); - expect(properties.getString("external.kafka.statsd.host", "localhost")).andReturn("127.0.0.1"); - expect(properties.getInt("external.kafka.statsd.port", 8125)).andReturn(1234); - expect(properties.getString("external.kafka.statsd.metrics.prefix", "")).andReturn("foo"); - expect(properties.getString("external.kafka.statsd.metrics.exclude_regex", - StatsdMetricsReporter.DEFAULT_EXCLUDE_REGEX)).andReturn("foo"); - expect(properties.getBoolean("external.kafka.statsd.tag.enabled", true)).andReturn(false); - } - - @Test - public void mbean_name_should_match() { - String name = new StatsdMetricsReporter().getMBeanName(); - assertEquals("kafka:type=com.airbnb.kafka.kafka08.StatsdMetricsReporter", name); - } - - @Test - public void init_should_start_reporter_when_enabled() { - expect(properties.getBoolean("external.kafka.statsd.reporter.enabled", false)).andReturn(true); - - replay(properties); - StatsdMetricsReporter reporter = new StatsdMetricsReporter(); - assertFalse("reporter should not be running", reporter.isRunning()); - reporter.init(properties); - assertTrue("reporter should be running once #init has been invoked", reporter.isRunning()); - - verify(properties); - } - - @Test - public void init_should_not_start_reporter_when_disabled() { - expect(properties.getBoolean("external.kafka.statsd.reporter.enabled", false)).andReturn(false); - - replay(properties); - StatsdMetricsReporter reporter = new StatsdMetricsReporter(); - assertFalse("reporter should not be running", reporter.isRunning()); - reporter.init(properties); - assertFalse("reporter should NOT be running once #init has been invoked", reporter.isRunning()); - - verify(properties); - } -} From 6ac6daecd54cb99529969cbf644403533c5a9388 Mon Sep 17 00:00:00 2001 From: andrewluotechnologies Date: Wed, 2 Jun 2021 21:33:52 -0700 Subject: [PATCH 2/2] Address comments --- build.gradle | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index e0908947e..fcb954b81 100644 --- a/build.gradle +++ b/build.gradle @@ -20,18 +20,26 @@ ext { kafkaVersion = project.getProperties().getOrDefault("kafkaVersion", defaultKafkaVersion) kafkaVersionsToTest = ['0.9.0.1', '0.10.2.2', '1.1.1', '2.3.1'] - testWithKafkaClient = project.getProperties().getOrDefault("testWithKafkaClient", false) + testWithKafkaBroker = project.getProperties().getOrDefault("testWithKafkaBroker", true) kafkaClientVersionsToTest = ['0.9.0.1', '0.10.2.2', '1.1.1', '2.3.1'] } dependencies { compile 'com.indeed:java-dogstatsd-client:2.0.11' - compileOnly "org.apache.kafka:kafka-clients:${kafkaVersion}" + if (testWithKafkaBroker) { + compileOnly "org.apache.kafka:kafka_2.11:${kafkaVersion}" + } else { + compileOnly "org.apache.kafka:kafka-clients:${kafkaVersion}" + } compileOnly 'com.yammer.metrics:metrics-core:2.2.0' compileOnly 'org.slf4j:slf4j-log4j12:+' - testCompile "org.apache.kafka:kafka-clients:${kafkaVersion}" + if (testWithKafkaBroker) { + testCompile "org.apache.kafka:kafka_2.11:${kafkaVersion}" + } else { + testCompile "org.apache.kafka:kafka-clients:${kafkaVersion}" + } testCompile 'com.yammer.metrics:metrics-core:2.2.0' testCompile 'org.slf4j:slf4j-log4j12:+' testCompile 'junit:junit:4.11' @@ -53,7 +61,7 @@ def excludeFromCompile(Task task, Map> excludeMap) { } } -if (testWithKafkaClient) { +if (testWithKafkaBroker) { def srcExcludes = [ "com/airbnb/metrics/": ["ExcludeMetricPredicate", "MetricNameFormatter", "Parser", "ParserForNoTag", "ParserForTagInMBeanName", "StatsDReporter"] ] @@ -99,7 +107,7 @@ for (kafkaVersion in kafkaVersionsToTest) { test.doFirst { println "Running tests with Kafka version ${kafkaVersion}" - println "Testing with Kafka clients: ${testWithKafkaClient}" + println "Testing with Kafka broker: ${testWithKafkaBroker}" } task testWithAllSupportedKafkaCoreVersions { @@ -112,7 +120,7 @@ def testKafkaClientVersionTasks = [] for (kafkaVersion in kafkaClientVersionsToTest) { String kafkaVersionUnderscored = kafkaVersion.replace('.', '_') - def task = tasks.create(name: "testWithKafkaClient_${kafkaVersionUnderscored}", type: GradleBuild) { + def task = tasks.create(name: "testWithKafkaBroker_${kafkaVersionUnderscored}", type: GradleBuild) { // Changing `buildName` is a workaround for a Gradle issue when trying to run // the aggregate task `testWithAllSupportedKafkaVersions`. // See the following issues: @@ -121,7 +129,7 @@ for (kafkaVersion in kafkaClientVersionsToTest) { buildName = project.getName() + "-kafka-clients_" + kafkaVersionUnderscored buildFile = './build.gradle' tasks = ['test'] - startParameter.projectProperties = [kafkaVersion: "${kafkaVersion}", testWithKafkaClient: true] + startParameter.projectProperties = [kafkaVersion: "${kafkaVersion}", testWithKafkaBroker: false] group = 'Verification' description = "Runs the unit tests with Kafka client version ${kafkaVersion}" }