diff --git a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java index cb7887c..a51cb39 100644 --- a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java +++ b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java @@ -19,6 +19,7 @@ import com.codahale.metrics.graphite.GraphiteUDP; import com.google.common.base.Throwables; import com.verisign.storm.metrics.reporters.AbstractReporter; +import com.verisign.storm.metrics.util.ConfigurableSocketFactory; import com.verisign.storm.metrics.util.ConnectionFailureException; import com.verisign.storm.metrics.util.GraphiteCodec; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * This class is a wrapper for the Graphite class in the Metrics library. It encapsulates the handling of errors that @@ -36,9 +38,14 @@ public class GraphiteReporter extends AbstractReporter { private static final Logger LOG = LoggerFactory.getLogger(GraphiteReporter.class); private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5; + private static final int DEFAULT_READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + private static final int DEFAULT_CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host"; public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port"; public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol"; + public static final String GRAPHITE_CONNECT_TIMEOUT = "metrics.graphite.connect.timeout"; + public static final String GRAPHITE_READ_TIMEOUT = "metrics.graphite.read.timeout"; public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION = "metrics.graphite.min-connect-attempt-interval-secs"; @@ -87,7 +94,21 @@ public GraphiteReporter() { this.graphite = new GraphiteUDP(graphiteSocketAddr); } else { // Default TCP client - this.graphite = new Graphite(graphiteSocketAddr); + int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + if (conf.containsKey(GRAPHITE_CONNECT_TIMEOUT)) { + connectTimeout = Integer.parseInt(conf.get(GRAPHITE_CONNECT_TIMEOUT).toString()); + } + + int readTimeout = DEFAULT_READ_TIMEOUT; + if (conf.containsKey(GRAPHITE_READ_TIMEOUT)) { + readTimeout = Integer.parseInt(conf.get(GRAPHITE_READ_TIMEOUT).toString()); + } + + ConfigurableSocketFactory socketFactory = new ConfigurableSocketFactory(); + socketFactory.setConnectTimeout(connectTimeout); + socketFactory.setReadTimeout(readTimeout); + + this.graphite = new Graphite(graphiteSocketAddr, socketFactory); } lastConnectAttemptTimestampMs = 0; } diff --git a/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java new file mode 100644 index 0000000..0b4c070 --- /dev/null +++ b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java @@ -0,0 +1,66 @@ +package com.verisign.storm.metrics.util; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +public class ConfigurableSocketFactory extends SocketFactory { + + private int connectTimeout = 0; + private int readTimeout = 0; + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + + @Override + public Socket createSocket(InetAddress host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(InetAddress host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + private Socket createConfiguredSocket() throws SocketException { + Socket socket = new Socket(); + socket.setSoTimeout(readTimeout); + return socket; + } +}