Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#26 Add tcp connect / read timeouts when interacting with Graphite #28

Merged
merged 3 commits into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.graphite.GraphiteUDP;
import com.google.common.base.Throwables;
import com.verisign.storm.metrics.reporters.AbstractReporter;
import com.verisign.storm.metrics.util.ConfigurableSocketFactory;
import com.verisign.storm.metrics.util.ConnectionFailureException;
import com.verisign.storm.metrics.util.GraphiteCodec;
import org.slf4j.Logger;
Expand All @@ -27,6 +28,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* This class is a wrapper for the Graphite class in the Metrics library. It encapsulates the handling of errors that
Expand All @@ -36,9 +38,14 @@ public class GraphiteReporter extends AbstractReporter {

private static final Logger LOG = LoggerFactory.getLogger(GraphiteReporter.class);
private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5;
private static final int DEFAULT_READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
private static final int DEFAULT_CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);

public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host";
public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port";
public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol";
public static final String GRAPHITE_CONNECT_TIMEOUT = "metrics.graphite.connect.timeout";
public static final String GRAPHITE_READ_TIMEOUT = "metrics.graphite.read.timeout";
public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION
= "metrics.graphite.min-connect-attempt-interval-secs";

Expand Down Expand Up @@ -87,7 +94,21 @@ public GraphiteReporter() {
this.graphite = new GraphiteUDP(graphiteSocketAddr);
} else {
// Default TCP client
this.graphite = new Graphite(graphiteSocketAddr);
int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
if (conf.containsKey(GRAPHITE_CONNECT_TIMEOUT)) {
connectTimeout = Integer.parseInt(conf.get(GRAPHITE_CONNECT_TIMEOUT).toString());
}

int readTimeout = DEFAULT_READ_TIMEOUT;
if (conf.containsKey(GRAPHITE_READ_TIMEOUT)) {
readTimeout = Integer.parseInt(conf.get(GRAPHITE_READ_TIMEOUT).toString());
}

ConfigurableSocketFactory socketFactory = new ConfigurableSocketFactory();
socketFactory.setConnectTimeout(connectTimeout);
socketFactory.setReadTimeout(readTimeout);

this.graphite = new Graphite(graphiteSocketAddr, socketFactory);
}
lastConnectAttemptTimestampMs = 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.verisign.storm.metrics.util;

import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;

public class ConfigurableSocketFactory extends SocketFactory {

private int connectTimeout = 0;
private int readTimeout = 0;

public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}

public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}

@Override
public Socket createSocket(InetAddress host, int port) throws IOException {
Socket socket = createConfiguredSocket();

socket.connect(new InetSocketAddress(host, port), connectTimeout);

return socket;
}

@Override
public Socket createSocket(String host, int port) throws IOException {
Socket socket = createConfiguredSocket();

socket.connect(new InetSocketAddress(host, port), connectTimeout);

return socket;
}

@Override
public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException {
Socket socket = createConfiguredSocket();

socket.bind(new InetSocketAddress(localHost, localPort));
socket.connect(new InetSocketAddress(host, port), connectTimeout);

return socket;
}

@Override
public Socket createSocket(InetAddress host, int port, InetAddress localHost, int localPort) throws IOException {
Socket socket = createConfiguredSocket();

socket.bind(new InetSocketAddress(localHost, localPort));
socket.connect(new InetSocketAddress(host, port), connectTimeout);

return socket;
}

private Socket createConfiguredSocket() throws SocketException {
Socket socket = new Socket();
socket.setSoTimeout(readTimeout);
return socket;
}
}