diff --git a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java index cb7887c..a51cb39 100644 --- a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java +++ b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java @@ -19,6 +19,7 @@ import com.codahale.metrics.graphite.GraphiteUDP; import com.google.common.base.Throwables; import com.verisign.storm.metrics.reporters.AbstractReporter; +import com.verisign.storm.metrics.util.ConfigurableSocketFactory; import com.verisign.storm.metrics.util.ConnectionFailureException; import com.verisign.storm.metrics.util.GraphiteCodec; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * This class is a wrapper for the Graphite class in the Metrics library. It encapsulates the handling of errors that @@ -36,9 +38,14 @@ public class GraphiteReporter extends AbstractReporter { private static final Logger LOG = LoggerFactory.getLogger(GraphiteReporter.class); private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5; + private static final int DEFAULT_READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + private static final int DEFAULT_CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host"; public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port"; public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol"; + public static final String GRAPHITE_CONNECT_TIMEOUT = "metrics.graphite.connect.timeout"; + public static final String GRAPHITE_READ_TIMEOUT = "metrics.graphite.read.timeout"; public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION = "metrics.graphite.min-connect-attempt-interval-secs"; @@ -87,7 +94,21 @@ public GraphiteReporter() { this.graphite = new GraphiteUDP(graphiteSocketAddr); } else { // Default TCP client - this.graphite = new Graphite(graphiteSocketAddr); + int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + if (conf.containsKey(GRAPHITE_CONNECT_TIMEOUT)) { + connectTimeout = Integer.parseInt(conf.get(GRAPHITE_CONNECT_TIMEOUT).toString()); + } + + int readTimeout = DEFAULT_READ_TIMEOUT; + if (conf.containsKey(GRAPHITE_READ_TIMEOUT)) { + readTimeout = Integer.parseInt(conf.get(GRAPHITE_READ_TIMEOUT).toString()); + } + + ConfigurableSocketFactory socketFactory = new ConfigurableSocketFactory(); + socketFactory.setConnectTimeout(connectTimeout); + socketFactory.setReadTimeout(readTimeout); + + this.graphite = new Graphite(graphiteSocketAddr, socketFactory); } lastConnectAttemptTimestampMs = 0; } diff --git a/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java new file mode 100644 index 0000000..0b4c070 --- /dev/null +++ b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java @@ -0,0 +1,66 @@ +package com.verisign.storm.metrics.util; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +public class ConfigurableSocketFactory extends SocketFactory { + + private int connectTimeout = 0; + private int readTimeout = 0; + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + + @Override + public Socket createSocket(InetAddress host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(InetAddress host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + private Socket createConfiguredSocket() throws SocketException { + Socket socket = new Socket(); + socket.setSoTimeout(readTimeout); + return socket; + } +} diff --git a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java new file mode 100644 index 0000000..4537ae9 --- /dev/null +++ b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java @@ -0,0 +1,70 @@ +package com.verisign.storm.metrics.reporters; + +import com.verisign.storm.metrics.reporters.graphite.GraphiteReporter; +import com.verisign.storm.metrics.util.ConnectionFailureException; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.channels.ServerSocketChannel; +import java.util.HashMap; + +public class GraphiteSocketSettingsTest { + + private InetSocketAddress graphiteSocketAddress; + private ServerSocketChannel graphiteServer; + + @BeforeTest + public void setUp() throws IOException { + String graphiteHost = "127.0.0.1"; + int graphitePort = 2003; + + graphiteSocketAddress = new InetSocketAddress(graphiteHost, graphitePort); + + graphiteServer = ServerSocketChannel.open(); + } + + @AfterTest + public void tearDown() throws Exception { + if (graphiteServer != null && graphiteServer.isOpen()) { + graphiteServer.close(); + } + } + + @Test + public void testConnectTimeout() throws IOException { + // Backlog of one socket + graphiteServer.socket().bind(graphiteSocketAddress, 1); + graphiteServer.configureBlocking(false); + + graphiteServer.accept(); + + // Take the only available socket + new Socket().connect(graphiteServer.socket().getLocalSocketAddress()); + + HashMap reporterConfig = new HashMap(); + + reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteSocketAddress.getHostName()); + reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, String.valueOf(graphiteSocketAddress.getPort())); + reporterConfig.put(GraphiteReporter.GRAPHITE_CONNECT_TIMEOUT, 1); + + GraphiteReporter graphiteReporter = new GraphiteReporter(); + graphiteReporter.prepare(reporterConfig); + + try { + graphiteReporter.connect(); + Assert.fail(); + } catch (ConnectionFailureException expected) { + + } finally { + graphiteReporter.disconnect(); + } + + + } +}