From bb5eb00da126c13a84b716fff8b24e48be7c90be Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Mon, 18 Apr 2016 20:03:22 -0400 Subject: [PATCH 1/3] #26 Add tcp connect / read timeouts when interacting with Graphite --- .../reporters/graphite/GraphiteReporter.java | 23 +++++- .../util/ConfigurableSocketFactory.java | 66 +++++++++++++++++ .../reporters/GraphiteSocketSettingsTest.java | 70 +++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java create mode 100644 src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java diff --git a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java index cb7887c..a51cb39 100644 --- a/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java +++ b/src/main/java/com/verisign/storm/metrics/reporters/graphite/GraphiteReporter.java @@ -19,6 +19,7 @@ import com.codahale.metrics.graphite.GraphiteUDP; import com.google.common.base.Throwables; import com.verisign.storm.metrics.reporters.AbstractReporter; +import com.verisign.storm.metrics.util.ConfigurableSocketFactory; import com.verisign.storm.metrics.util.ConnectionFailureException; import com.verisign.storm.metrics.util.GraphiteCodec; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * This class is a wrapper for the Graphite class in the Metrics library. It encapsulates the handling of errors that @@ -36,9 +38,14 @@ public class GraphiteReporter extends AbstractReporter { private static final Logger LOG = LoggerFactory.getLogger(GraphiteReporter.class); private static final int DEFAULT_MIN_CONNECT_ATTEMPT_INTERVAL_SECS = 5; + private static final int DEFAULT_READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + private static final int DEFAULT_CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + public static final String GRAPHITE_HOST_OPTION = "metrics.graphite.host"; public static final String GRAPHITE_PORT_OPTION = "metrics.graphite.port"; public static final String GRAPHITE_PROTOCOL_OPTION = "metrics.graphite.protocol"; + public static final String GRAPHITE_CONNECT_TIMEOUT = "metrics.graphite.connect.timeout"; + public static final String GRAPHITE_READ_TIMEOUT = "metrics.graphite.read.timeout"; public static final String GRAPHITE_MIN_CONNECT_ATTEMPT_INTERVAL_SECS_OPTION = "metrics.graphite.min-connect-attempt-interval-secs"; @@ -87,7 +94,21 @@ public GraphiteReporter() { this.graphite = new GraphiteUDP(graphiteSocketAddr); } else { // Default TCP client - this.graphite = new Graphite(graphiteSocketAddr); + int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + if (conf.containsKey(GRAPHITE_CONNECT_TIMEOUT)) { + connectTimeout = Integer.parseInt(conf.get(GRAPHITE_CONNECT_TIMEOUT).toString()); + } + + int readTimeout = DEFAULT_READ_TIMEOUT; + if (conf.containsKey(GRAPHITE_READ_TIMEOUT)) { + readTimeout = Integer.parseInt(conf.get(GRAPHITE_READ_TIMEOUT).toString()); + } + + ConfigurableSocketFactory socketFactory = new ConfigurableSocketFactory(); + socketFactory.setConnectTimeout(connectTimeout); + socketFactory.setReadTimeout(readTimeout); + + this.graphite = new Graphite(graphiteSocketAddr, socketFactory); } lastConnectAttemptTimestampMs = 0; } diff --git a/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java new file mode 100644 index 0000000..0b4c070 --- /dev/null +++ b/src/main/java/com/verisign/storm/metrics/util/ConfigurableSocketFactory.java @@ -0,0 +1,66 @@ +package com.verisign.storm.metrics.util; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +public class ConfigurableSocketFactory extends SocketFactory { + + private int connectTimeout = 0; + private int readTimeout = 0; + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + + @Override + public Socket createSocket(InetAddress host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + @Override + public Socket createSocket(InetAddress host, int port, InetAddress localHost, int localPort) throws IOException { + Socket socket = createConfiguredSocket(); + + socket.bind(new InetSocketAddress(localHost, localPort)); + socket.connect(new InetSocketAddress(host, port), connectTimeout); + + return socket; + } + + private Socket createConfiguredSocket() throws SocketException { + Socket socket = new Socket(); + socket.setSoTimeout(readTimeout); + return socket; + } +} diff --git a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java new file mode 100644 index 0000000..4537ae9 --- /dev/null +++ b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java @@ -0,0 +1,70 @@ +package com.verisign.storm.metrics.reporters; + +import com.verisign.storm.metrics.reporters.graphite.GraphiteReporter; +import com.verisign.storm.metrics.util.ConnectionFailureException; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.channels.ServerSocketChannel; +import java.util.HashMap; + +public class GraphiteSocketSettingsTest { + + private InetSocketAddress graphiteSocketAddress; + private ServerSocketChannel graphiteServer; + + @BeforeTest + public void setUp() throws IOException { + String graphiteHost = "127.0.0.1"; + int graphitePort = 2003; + + graphiteSocketAddress = new InetSocketAddress(graphiteHost, graphitePort); + + graphiteServer = ServerSocketChannel.open(); + } + + @AfterTest + public void tearDown() throws Exception { + if (graphiteServer != null && graphiteServer.isOpen()) { + graphiteServer.close(); + } + } + + @Test + public void testConnectTimeout() throws IOException { + // Backlog of one socket + graphiteServer.socket().bind(graphiteSocketAddress, 1); + graphiteServer.configureBlocking(false); + + graphiteServer.accept(); + + // Take the only available socket + new Socket().connect(graphiteServer.socket().getLocalSocketAddress()); + + HashMap reporterConfig = new HashMap(); + + reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteSocketAddress.getHostName()); + reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, String.valueOf(graphiteSocketAddress.getPort())); + reporterConfig.put(GraphiteReporter.GRAPHITE_CONNECT_TIMEOUT, 1); + + GraphiteReporter graphiteReporter = new GraphiteReporter(); + graphiteReporter.prepare(reporterConfig); + + try { + graphiteReporter.connect(); + Assert.fail(); + } catch (ConnectionFailureException expected) { + + } finally { + graphiteReporter.disconnect(); + } + + + } +} From 705465890b9d444d12f6d568e1d0bdf38699f878 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Mon, 18 Apr 2016 20:08:09 -0400 Subject: [PATCH 2/3] #26 Update test to use random bind port --- .../reporters/GraphiteSocketSettingsTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java index 4537ae9..1e2378c 100644 --- a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java +++ b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java @@ -16,15 +16,10 @@ public class GraphiteSocketSettingsTest { - private InetSocketAddress graphiteSocketAddress; private ServerSocketChannel graphiteServer; @BeforeTest public void setUp() throws IOException { - String graphiteHost = "127.0.0.1"; - int graphitePort = 2003; - - graphiteSocketAddress = new InetSocketAddress(graphiteHost, graphitePort); graphiteServer = ServerSocketChannel.open(); } @@ -38,8 +33,9 @@ public void tearDown() throws Exception { @Test public void testConnectTimeout() throws IOException { + String graphiteHost = "127.0.0.1"; // Backlog of one socket - graphiteServer.socket().bind(graphiteSocketAddress, 1); + graphiteServer.socket().bind(new InetSocketAddress(graphiteHost, 0), 1); graphiteServer.configureBlocking(false); graphiteServer.accept(); @@ -49,8 +45,8 @@ public void testConnectTimeout() throws IOException { HashMap reporterConfig = new HashMap(); - reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteSocketAddress.getHostName()); - reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, String.valueOf(graphiteSocketAddress.getPort())); + reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteHost); + reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, String.valueOf(graphiteServer.socket().getLocalPort())); reporterConfig.put(GraphiteReporter.GRAPHITE_CONNECT_TIMEOUT, 1); GraphiteReporter graphiteReporter = new GraphiteReporter(); From 32e0f30b7c6ead26eae31ffc8481cb992061bb5c Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Tue, 19 Apr 2016 09:46:19 -0400 Subject: [PATCH 3/3] #26 Remove incorrect test --- .../reporters/GraphiteSocketSettingsTest.java | 66 ------------------- 1 file changed, 66 deletions(-) delete mode 100644 src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java diff --git a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java b/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java deleted file mode 100644 index 1e2378c..0000000 --- a/src/test/java/com/verisign/storm/metrics/reporters/GraphiteSocketSettingsTest.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.verisign.storm.metrics.reporters; - -import com.verisign.storm.metrics.reporters.graphite.GraphiteReporter; -import com.verisign.storm.metrics.util.ConnectionFailureException; -import org.testng.Assert; -import org.testng.annotations.AfterTest; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.nio.channels.ServerSocketChannel; -import java.util.HashMap; - -public class GraphiteSocketSettingsTest { - - private ServerSocketChannel graphiteServer; - - @BeforeTest - public void setUp() throws IOException { - - graphiteServer = ServerSocketChannel.open(); - } - - @AfterTest - public void tearDown() throws Exception { - if (graphiteServer != null && graphiteServer.isOpen()) { - graphiteServer.close(); - } - } - - @Test - public void testConnectTimeout() throws IOException { - String graphiteHost = "127.0.0.1"; - // Backlog of one socket - graphiteServer.socket().bind(new InetSocketAddress(graphiteHost, 0), 1); - graphiteServer.configureBlocking(false); - - graphiteServer.accept(); - - // Take the only available socket - new Socket().connect(graphiteServer.socket().getLocalSocketAddress()); - - HashMap reporterConfig = new HashMap(); - - reporterConfig.put(GraphiteReporter.GRAPHITE_HOST_OPTION, graphiteHost); - reporterConfig.put(GraphiteReporter.GRAPHITE_PORT_OPTION, String.valueOf(graphiteServer.socket().getLocalPort())); - reporterConfig.put(GraphiteReporter.GRAPHITE_CONNECT_TIMEOUT, 1); - - GraphiteReporter graphiteReporter = new GraphiteReporter(); - graphiteReporter.prepare(reporterConfig); - - try { - graphiteReporter.connect(); - Assert.fail(); - } catch (ConnectionFailureException expected) { - - } finally { - graphiteReporter.disconnect(); - } - - - } -}