Skip to content

Commit

Permalink
Merge pull request verisign#21 from Crim/master
Browse files Browse the repository at this point in the history
add support for UDP graphite client
  • Loading branch information
KevinJMao committed Aug 19, 2015
2 parents 412fedb + 1b91d7a commit 1ec0270
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 4 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# 0.2.4 (August 18, 2015)

IMPROVEMENTS

* Added support for UDP graphite reporter.
* Added configuration option `metrics.graphite.protocol`, which configures which Graphite reporter will be used. This
configuration option defaults to use the TCP graphite reporter for backwards compatibility. Set this option to 'udp'
to use a UDP reporter instead.

# 0.2.3 (June 24, 2015)

BUG FIXES
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ topology.metrics.consumer.register:
metrics.graphite.port: "2003"
metrics.graphite.prefix: "storm.test"
metrics.graphite.min-connect-attempt-interval-secs: "5"
# Optional arguments can also be supplied to enable UDP
metrics.graphite.protocol: "udp"
```
##### Reporting Metrics to Kafka
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ buildscript {
}

group = 'com.verisign.storm.metrics'
version = '0.2.3-SNAPSHOT'
version = '0.2.4-SNAPSHOT'
description = "An IMetricsConsumer that forwards Storm's built-in metrics to a Graphite server for real-time graphing."

apply plugin: 'java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.verisign.storm.metrics.reporters.graphite;

import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteSender;
import com.codahale.metrics.graphite.GraphiteUDP;
import com.google.common.base.Throwables;
import com.verisign.storm.metrics.reporters.AbstractReporter;
import com.verisign.storm.metrics.util.ConnectionFailureException;
Expand All @@ -36,17 +38,17 @@ public class GraphiteReporter extends AbstractReporter {
private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5;
public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host";
public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port";
public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol";
public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION
= "metrics.graphite.min-connect-attempt-interval-secs";


private String graphiteHost;
private int graphitePort;
private InetSocketAddress graphiteSocketAddr;

private String serverFingerprint;
private int minConnectAttemptIntervalSecs;
private Graphite graphite;
private GraphiteSender graphite;
private long lastConnectAttemptTimestampMs;

public GraphiteReporter() {
Expand Down Expand Up @@ -79,7 +81,14 @@ public GraphiteReporter() {
graphiteSocketAddr = new InetSocketAddress(graphiteHost, graphitePort);

serverFingerprint = graphiteSocketAddr.getAddress() + ":" + graphiteSocketAddr.getPort();
this.graphite = new Graphite(graphiteSocketAddr);

if (conf.containsKey(GRAPHITE_PROTOCOL_OPTION) && ((String)conf.get(GRAPHITE_PROTOCOL_OPTION)).equalsIgnoreCase("udp")) {
// Use UDP client
this.graphite = new GraphiteUDP(graphiteSocketAddr);
} else {
// Default TCP client
this.graphite = new Graphite(graphiteSocketAddr);
}
lastConnectAttemptTimestampMs = 0;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2014 VeriSign, Inc.
*
* VeriSign licenses this file to you under the Apache License, version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
*/
package com.verisign.storm.metrics.reporters;

import com.google.common.base.Charsets;
import com.verisign.storm.metrics.reporters.graphite.GraphiteReporter;
import com.verisign.storm.metrics.util.ConnectionFailureException;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;

import static org.fest.assertions.api.Assertions.assertThat;

public class UdpGraphiteReporterTest {

private DatagramSocket graphiteServer;
private String graphiteHost;
private Integer graphitePort;
private GraphiteReporter graphiteReporter;

private final Charset DEFAULT_CHARSET = Charsets.UTF_8;

@BeforeTest
public void connectClientToGraphiteServer() throws IOException {
launchGraphiteServer();
launchGraphiteClient();
}

private void launchGraphiteServer() throws IOException {
graphiteHost = "127.0.0.1";
graphitePort = 2003;
graphiteServer = new DatagramSocket(graphitePort);
}

private void launchGraphiteClient() throws ConnectionFailureException {
HashMap<String, Object> reporterConfig = new HashMap<String, Object>();

reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteHost);
reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, graphitePort.toString());
reporterConfig.put(GraphiteReporter.GRAPHITE_PROTOCOL_OPTION, "udp");

graphiteReporter = new GraphiteReporter();
graphiteReporter.prepare(reporterConfig);
graphiteReporter.connect();
}

@AfterTest
public void exit() throws IOException {
if (graphiteServer != null && graphiteServer.isClosed()) {
graphiteServer.close();
}
graphiteReporter.disconnect();
}

@DataProvider(name = "metrics")
public Object[][] metricsProvider() {
return new Object[][] { new Object[] { "test.storm", "metric1", 1.00, new Long("1408393534971"),
"test.storm.metric1 1.00 1408393534971\n" },
new Object[] { "test.storm", "metric2", 0.00, new Long("1408393534971"),
"test.storm.metric2 0.00 1408393534971\n" },
new Object[] { "test.storm", "metric3", 3.14, new Long("1408393534971"),
"test.storm.metric3 3.14 1408393534971\n" },
new Object[] { "test.storm", "metric3", 99.0, new Long("1408393534971"),
"test.storm.metric3 99.00 1408393534971\n" },
new Object[] { "test.storm", "metric3", 1e3, new Long("1408393534971"),
"test.storm.metric3 1000.00 1408393534971\n" } };
}

@Test(dataProvider = "metrics")
public void sendMetricTupleAsFormattedStringToGraphiteServer(String metricPrefix, String metricKey, Double value,
long timestamp,
String expectedMessageReceived) throws IOException {
// Given a tuple representing a (metricPath, value, timestamp) metric (injected via data provider)

HashMap<String, Double> values = new HashMap<String, Double>();
values.put(metricKey, value);
// When the reporter sends the metric
graphiteReporter.appendToBuffer(metricPrefix, values, timestamp);
graphiteReporter.sendBufferContents();

// Then the server should receive a properly formatted string representing the metric
byte[] buf = new byte[256];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
graphiteServer.receive(packet);

String actualMessageReceived = new String(packet.getData(), 0, packet.getLength(), DEFAULT_CHARSET);
assertThat(actualMessageReceived).isEqualTo(expectedMessageReceived);
}
}

0 comments on commit 1ec0270

Please sign in to comment.