diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ff8f50..1753d9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 0.2.4 (August 18, 2015) + +IMPROVEMENTS + +* Added support for UDP graphite reporter. +* Added configuration option `metrics.graphite.protocol`, which configures which Graphite reporter will be used. This + configuration option defaults to use the TCP graphite reporter for backwards compatibility. Set this option to 'udp' + to use a UDP reporter instead. + # 0.2.3 (June 24, 2015) BUG FIXES diff --git a/README.md b/README.md index ea7ee49..74e0d36 100644 --- a/README.md +++ b/README.md @@ -267,6 +267,8 @@ topology.metrics.consumer.register: metrics.graphite.port: "2003" metrics.graphite.prefix: "storm.test" metrics.graphite.min-connect-attempt-interval-secs: "5" + # Optional arguments can also be supplied to enable UDP + metrics.graphite.protocol: "udp" ``` ##### Reporting Metrics to Kafka diff --git a/build.gradle b/build.gradle index 975ac65..4203ed4 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ buildscript { } group = 'com.verisign.storm.metrics' -version = '0.2.3-SNAPSHOT' +version = '0.2.4-SNAPSHOT' description = "An IMetricsConsumer that forwards Storm's built-in metrics to a Graphite server for real-time graphing." apply plugin: 'java' diff --git a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java index 40159cd..cb7887c 100644 --- a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java +++ b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java @@ -15,6 +15,8 @@ package com.verisign.storm.metrics.reporters.graphite; import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; import com.google.common.base.Throwables; import com.verisign.storm.metrics.reporters.AbstractReporter; import com.verisign.storm.metrics.util.ConnectionFailureException; @@ -36,17 +38,17 @@ public class GraphiteReporter extends AbstractReporter { private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5; public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host"; public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port"; + public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol"; public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION = "metrics.graphite.min-connect-attempt-interval-secs"; - private String graphiteHost; private int graphitePort; private InetSocketAddress graphiteSocketAddr; private String serverFingerprint; private int minConnectAttemptIntervalSecs; - private Graphite graphite; + private GraphiteSender graphite; private long lastConnectAttemptTimestampMs; public GraphiteReporter() { @@ -79,7 +81,14 @@ public GraphiteReporter() { graphiteSocketAddr = new InetSocketAddress(graphiteHost, graphitePort); serverFingerprint = graphiteSocketAddr.getAddress() + ":" + graphiteSocketAddr.getPort(); - this.graphite = new Graphite(graphiteSocketAddr); + + if (conf.containsKey(GRAPHITE_PROTOCOL_OPTION) && ((String)conf.get(GRAPHITE_PROTOCOL_OPTION)).equalsIgnoreCase("udp")) { + // Use UDP client + this.graphite = new GraphiteUDP(graphiteSocketAddr); + } else { + // Default TCP client + this.graphite = new Graphite(graphiteSocketAddr); + } lastConnectAttemptTimestampMs = 0; } diff --git a/src/test/java/com/verisign/storm/metrics/reporters/UdpGraphiteReporterTest.java b/src/test/java/com/verisign/storm/metrics/reporters/UdpGraphiteReporterTest.java new file mode 100644 index 0000000..632965a --- /dev/null +++ b/src/test/java/com/verisign/storm/metrics/reporters/UdpGraphiteReporterTest.java @@ -0,0 +1,112 @@ +/* + * Copyright 2014 VeriSign, Inc. + * + * VeriSign licenses this file to you under the Apache License, version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + */ +package com.verisign.storm.metrics.reporters; + +import com.google.common.base.Charsets; +import com.verisign.storm.metrics.reporters.graphite.GraphiteReporter; +import com.verisign.storm.metrics.util.ConnectionFailureException; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.HashMap; + +import static org.fest.assertions.api.Assertions.assertThat; + +public class UdpGraphiteReporterTest { + + private DatagramSocket graphiteServer; + private String graphiteHost; + private Integer graphitePort; + private GraphiteReporter graphiteReporter; + + private final Charset DEFAULT_CHARSET = Charsets.UTF_8; + + @BeforeTest + public void connectClientToGraphiteServer() throws IOException { + launchGraphiteServer(); + launchGraphiteClient(); + } + + private void launchGraphiteServer() throws IOException { + graphiteHost = "127.0.0.1"; + graphitePort = 2003; + graphiteServer = new DatagramSocket(graphitePort); + } + + private void launchGraphiteClient() throws ConnectionFailureException { + HashMap reporterConfig = new HashMap(); + + reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteHost); + reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, graphitePort.toString()); + reporterConfig.put(GraphiteReporter.GRAPHITE_PROTOCOL_OPTION, "udp"); + + graphiteReporter = new GraphiteReporter(); + graphiteReporter.prepare(reporterConfig); + graphiteReporter.connect(); + } + + @AfterTest + public void exit() throws IOException { + if (graphiteServer != null && graphiteServer.isClosed()) { + graphiteServer.close(); + } + graphiteReporter.disconnect(); + } + + @DataProvider(name = "metrics") + public Object[][] metricsProvider() { + return new Object[][] { new Object[] { "test.storm", "metric1", 1.00, new Long("1408393534971"), + "test.storm.metric1 1.00 1408393534971\n" }, + new Object[] { "test.storm", "metric2", 0.00, new Long("1408393534971"), + "test.storm.metric2 0.00 1408393534971\n" }, + new Object[] { "test.storm", "metric3", 3.14, new Long("1408393534971"), + "test.storm.metric3 3.14 1408393534971\n" }, + new Object[] { "test.storm", "metric3", 99.0, new Long("1408393534971"), + "test.storm.metric3 99.00 1408393534971\n" }, + new Object[] { "test.storm", "metric3", 1e3, new Long("1408393534971"), + "test.storm.metric3 1000.00 1408393534971\n" } }; + } + + @Test(dataProvider = "metrics") + public void sendMetricTupleAsFormattedStringToGraphiteServer(String metricPrefix, String metricKey, Double value, + long timestamp, + String expectedMessageReceived) throws IOException { + // Given a tuple representing a (metricPath, value, timestamp) metric (injected via data provider) + + HashMap values = new HashMap(); + values.put(metricKey, value); + // When the reporter sends the metric + graphiteReporter.appendToBuffer(metricPrefix, values, timestamp); + graphiteReporter.sendBufferContents(); + + // Then the server should receive a properly formatted string representing the metric + byte[] buf = new byte[256]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + graphiteServer.receive(packet); + + String actualMessageReceived = new String(packet.getData(), 0, packet.getLength(), DEFAULT_CHARSET); + assertThat(actualMessageReceived).isEqualTo(expectedMessageReceived); + } +}