diff --git a/src/main/java/ly/bit/nsq/BasicConnection.java b/src/main/java/ly/bit/nsq/BasicConnection.java index 8350a34..10aab40 100644 --- a/src/main/java/ly/bit/nsq/BasicConnection.java +++ b/src/main/java/ly/bit/nsq/BasicConnection.java @@ -106,7 +106,7 @@ public void close() { // whatever, we're not doing anything with this anymore log.error("Exception closing connection: ", e); } - this.reader.connections.remove(this.toString()); + BasicConnection.this.reader.connectionClosed(BasicConnection.this); } } diff --git a/src/main/java/ly/bit/nsq/Connection.java b/src/main/java/ly/bit/nsq/Connection.java index 121c907..af58f65 100644 --- a/src/main/java/ly/bit/nsq/Connection.java +++ b/src/main/java/ly/bit/nsq/Connection.java @@ -1,19 +1,18 @@ package ly.bit.nsq; +import ly.bit.nsq.exceptions.NSQException; +import ly.bit.nsq.util.ConnectionUtils; +import ly.bit.nsq.util.FrameType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import ly.bit.nsq.exceptions.NSQException; -import ly.bit.nsq.util.ConnectionUtils; -import ly.bit.nsq.util.FrameType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author dan @@ -61,20 +60,6 @@ public void messageReceivedCallback(Message message){ public abstract void readForever() throws NSQException; public abstract void close(); - public Message decodeMesage(byte[] data) throws NSQException { - DataInputStream ds = new DataInputStream(new ByteArrayInputStream(data)); - try { - long timestamp = ds.readLong(); // 8 bytes - short attempts = ds.readShort(); // 2 bytes - byte[] id = new byte[16]; - ds.read(id); - byte[] body = new byte[data.length - 26]; - ds.read(body); - return new Message(id, body, timestamp, attempts, this); - } catch (IOException e) { - throw new NSQException(e); - } - } public void handleResponse(byte[] response) throws NSQException { DataInputStream ds = new DataInputStream(new ByteArrayInputStream(response)); @@ -86,7 +71,7 @@ public void handleResponse(byte[] response) throws NSQException { break; case FRAMETYPEMESSAGE: byte[] messageBytes = Arrays.copyOfRange(response, 4, response.length); - Message msg = this.decodeMesage(messageBytes); + Message msg = MessageCodec.decode(messageBytes, this); this.messageReceivedCallback(msg); break; case FRAMETYPEERROR: diff --git a/src/main/java/ly/bit/nsq/Message.java b/src/main/java/ly/bit/nsq/Message.java index 0415934..6bb7efa 100644 --- a/src/main/java/ly/bit/nsq/Message.java +++ b/src/main/java/ly/bit/nsq/Message.java @@ -54,5 +54,4 @@ public Connection getConn() { return conn; } - } diff --git a/src/main/java/ly/bit/nsq/MessageCodec.java b/src/main/java/ly/bit/nsq/MessageCodec.java new file mode 100644 index 0000000..4d01dd7 --- /dev/null +++ b/src/main/java/ly/bit/nsq/MessageCodec.java @@ -0,0 +1,49 @@ +package ly.bit.nsq; + + +import ly.bit.nsq.exceptions.NSQException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class MessageCodec { + public static Message decode(byte[] data, Connection conn) throws NSQException { + DataInputStream ds = new DataInputStream(new ByteArrayInputStream(data)); + try { + long timestamp = ds.readLong(); // 8 bytes + short attempts = ds.readShort(); // 2 bytes + byte[] id = new byte[16]; + ds.read(id); + byte[] body = new byte[data.length - 26]; + ds.read(body); + return new Message(id, body, timestamp, attempts, conn); + } catch (IOException e) { + throw new NSQException(e); + } + } + + /** + * Reverse of decodeMessage, helpful in testing so far. + * @param msg + * @return + * @throws NSQException + */ + public static byte[] encode(Message msg) throws NSQException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream ds = new DataOutputStream(bytes); + try { + ds.writeLong(msg.getTimestamp()); + ds.writeShort(msg.getAttempts()); + ds.write(msg.getId()); + ds.write(msg.getBody()); + ds.close(); + } catch (IOException e) { + throw new NSQException(e); + } + return bytes.toByteArray(); + } + +} diff --git a/src/main/java/ly/bit/nsq/NSQReader.java b/src/main/java/ly/bit/nsq/NSQReader.java index 1ad21d8..c831f02 100644 --- a/src/main/java/ly/bit/nsq/NSQReader.java +++ b/src/main/java/ly/bit/nsq/NSQReader.java @@ -1,12 +1,12 @@ package ly.bit.nsq; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import static java.util.concurrent.TimeUnit.SECONDS; +/** + * Base implementation of an NSQ consusumer client which manages lookupd requests + * and responses, nsqd connections etc. + * Typically you would use a specific implementation like SyncResponseReader, which + * additionally implements synchronous responses for messages. + * See the PrintReader example. + */ import ly.bit.nsq.exceptions.NSQException; import ly.bit.nsq.lookupd.AbstractLookupd; @@ -15,9 +15,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.concurrent.TimeUnit.SECONDS; + public abstract class NSQReader { private static final Logger log = LoggerFactory.getLogger(NSQReader.class); + + private static final int LOOKUPD_INITIAL_DELAY = 0; + private static final int LOOKUPD_POLL_INTERVAL = 30; protected int requeueDelay; protected int maxRetries; @@ -30,7 +44,7 @@ public abstract class NSQReader { protected ExecutorService executor; - protected Class connClass; + protected Class connClass = BasicConnection.class; protected ConcurrentHashMap connections; protected ConcurrentHashMap lookupdConnections; @@ -55,7 +69,6 @@ public void init(String topic, String channel){ String[] hostParts = this.hostname.split("\\."); this.shortHostname = hostParts[0]; - this.connClass = BasicConnection.class; // TODO can be passed by caller this.lookupdConnections = new ConcurrentHashMap(); this.lookupdScheduler = Executors.newScheduledThreadPool(1); @@ -132,7 +145,55 @@ public void connectToNsqd(String address, int port) throws NSQException{ conn.send(ConnectionUtils.ready(conn.maxInFlight)); conn.readForever(); } - + + /** + * Set the currently configured nsqd addresses that we should be connected to. This triggers disconnects (from + * disused servers) and connection attempts (to new servers). + * This function is probably only called from the Lookupd job. + * @param connectionAddresses + */ + public void handleLookupdResponse(Set connectionAddresses) { + Set toClose = new HashSet(connections.keySet()); + Set toOpen = new HashSet(connectionAddresses); + + // We need to open any connections in set that are not already open + toOpen.removeAll(connections.keySet()); + // We need to close any connections that are open but not in the set + toClose.removeAll(connectionAddresses); + + // Open new connections + for(String address : toOpen) { + log.info("Opening new producer connection: {}", address); + String[] components = address.split(":"); + String nsqdAddress = components[0]; + int nsqdPort = Integer.parseInt(components[1]); + try { + connectToNsqd(nsqdAddress, nsqdPort); + } catch (NSQException e) { + log.error("Erroring response from lookupd", e); + } + } + // close old connections + for(String address : toClose) { + if (connections.containsKey(address)) { + log.info("Producer not in lookupd response, closing connection: {}", address); + connections.get(address).close(); + this.connections.remove(address); + } + + } + } + + /** + * Remove this nsqd connection from our active pool if it is present. + * Typically called after problems are detected with the connection, or + * after a lookupd request shows that the server is no longer active. + * @param conn + */ + public void connectionClosed(Connection conn) { + this.connections.remove(conn.toString()); + + } // lookupd stuff @@ -142,7 +203,7 @@ public void addLookupd(AbstractLookupd lookupd) { if (stored != null){ return; } - lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), 30, 30, SECONDS); + lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), LOOKUPD_INITIAL_DELAY, LOOKUPD_POLL_INTERVAL, SECONDS); } public String toString(){ @@ -157,4 +218,12 @@ public ConcurrentHashMap getLookupdConnections() { return lookupdConnections; } + public void setConnClass(Class clazz) { + this.connClass = clazz; + } + + public ConcurrentHashMap getConnections() { + return this.connections; + } + } diff --git a/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java b/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java index 1bb4eba..f803035 100644 --- a/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java +++ b/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java @@ -1,10 +1,10 @@ package ly.bit.nsq.lookupd; +import java.util.HashSet; import java.util.List; import java.util.Map; import ly.bit.nsq.NSQReader; -import ly.bit.nsq.exceptions.NSQException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,16 +22,9 @@ public BasicLookupdJob(String lookupdAddress, NSQReader reader) { public void run() { Map lookupdConnections = reader.getLookupdConnections(); AbstractLookupd lookupd = lookupdConnections.get(lookupdAddress); - List producers = lookupd.query(reader.getTopic()); - for(String producer : producers) { - String[] components = producer.split(":"); - String nsqdAddress = components[0]; - int nsqdPort = Integer.parseInt(components[1]); - try { - reader.connectToNsqd(nsqdAddress, nsqdPort); - } catch (NSQException e) { - log.error("Error reading response from lookupd", e); - } - } + List producers = lookupd.query(reader.getTopic()); + + reader.handleLookupdResponse(new HashSet(producers)); + } } diff --git a/src/test/java/ly/bit/nsq/ConnectionTest.java b/src/test/java/ly/bit/nsq/ConnectionTest.java new file mode 100644 index 0000000..06f9e4e --- /dev/null +++ b/src/test/java/ly/bit/nsq/ConnectionTest.java @@ -0,0 +1,50 @@ +package ly.bit.nsq; + +import ly.bit.nsq.exceptions.NSQException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +/** + * User: oneill + * Date: 4/8/13 + */ +public class ConnectionTest { + Logger log = LoggerFactory.getLogger(ConnectionTest.class); + + + /** + * Test that we can encode and decode a message properly. + * @throws NSQException + */ + @Test + public void testCodec() throws NSQException { + + byte[] id = MockConnection.randomId(); + + + Connection conn = new MockConnection(); + String body = "kjehfliuANDY.WAS.HEREe;flijwe,jfhwqliuehfj"; + Message msg = new Message(id, body.getBytes(), System.currentTimeMillis()*1000, + new Integer(0).shortValue(), conn); + byte[] encoded = MessageCodec.encode(msg); + log.debug("Encoded message: {}", encoded); + + Message decoded = MessageCodec.decode(encoded, conn); + assertEquals(msg.getAttempts(), decoded.getAttempts()); + for (int i=0; i messages = new ArrayBlockingQueue(10); + + @Override + public void init(String host, int port, NSQReader reader) { + this.host = host; + this.port = port; + this.reader = reader; + this.closed.getAndSet(false); + log.debug("Init mock connection {}:{}", host, port); + + } + + @Override + public void send(String command) throws NSQException { + log.debug("Sending '{}'", command); + lastSent = command; + } + + @Override + public void connect() throws NSQException { + log.debug("{} is connecting", this.toString()); + } + + private byte[] readResponse() throws NSQException { + try { + byte[] msg = messages.take(); + return msg; + } catch (InterruptedException e) { + throw new NSQException(e); + } + } + + @Override + public void readForever() throws NSQException { + class ReadThis implements Runnable { + public void run() { + while(closed.get() != true){ + byte[] response; + try { + response = readResponse(); + } catch (NSQException e) { + // Assume this meant that we couldn't read somehow, should close the connection + close(); + break; + } + try { + handleResponse(response); + } catch (NSQException e) { + // malformed message or something... + log.error("Message error: ", e); + } + } + } + } + Thread t = new Thread(new ReadThis(), this.toString()); + t.start(); + } + + @Override + public void close() { + boolean prev = this.closed.getAndSet(true); + if(prev == true){ + return; + } + log.info("Closing connection {}", this.toString()); + MockConnection.this.reader.connectionClosed(MockConnection.this); + } + + public void fakeMessage(byte[] idBytes, String body) throws NSQException { + + Message msg = new Message(idBytes, body.getBytes(), System.currentTimeMillis()*1000, + new Integer(0).shortValue(), this); + + byte[] framed = frameMessage(MessageCodec.encode(msg)); + log.debug("Inserting fake message: '{}'", framed); + + this.messages.add(framed); + } + + /** + * Helper for creating 16-byte ids + * @return + */ + public static byte[] randomId() { + return UUID.randomUUID().toString().replace("-", "").substring(0, 16).getBytes(); + } + + public static byte[] frameMessage(byte[] msg) throws NSQException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try { + DataOutputStream ds = new DataOutputStream(bytes); + ds.writeInt(2); + ds.write(msg); + ds.flush(); + return bytes.toByteArray(); + } catch(IOException e) { + throw new NSQException(e); + } + } + + public String getLastSent() { + return this.lastSent; + } + +} diff --git a/src/test/java/ly/bit/nsq/NSQProducerTest.java b/src/test/java/ly/bit/nsq/NSQProducerTest.java index f120ded..afed416 100644 --- a/src/test/java/ly/bit/nsq/NSQProducerTest.java +++ b/src/test/java/ly/bit/nsq/NSQProducerTest.java @@ -1,15 +1,12 @@ package ly.bit.nsq; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; - - import ly.bit.nsq.exceptions.NSQException; -import org.apache.http.*; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.message.BasicStatusLine; import org.junit.After; import org.junit.Before; @@ -19,6 +16,12 @@ import java.util.concurrent.FutureTask; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * User: oneill * Date: 4/4/13 diff --git a/src/test/java/ly/bit/nsq/NSQReaderTest.java b/src/test/java/ly/bit/nsq/NSQReaderTest.java new file mode 100644 index 0000000..1a40ee4 --- /dev/null +++ b/src/test/java/ly/bit/nsq/NSQReaderTest.java @@ -0,0 +1,68 @@ +package ly.bit.nsq; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * User: oneill + * Date: 4/5/13 + */ +public class NSQReaderTest { + Logger log = LoggerFactory.getLogger(NSQReaderTest.class); + + + @Test + public void testSetConnections() throws Exception { + + Set firstResponse = new HashSet(); + firstResponse.add("producerA:4151"); + firstResponse.add("producerB:4151"); + firstResponse.add("producerC:4151"); + + Set secondResponse = new HashSet(); + secondResponse.add("producerB:4151"); + secondResponse.add("producerC:4151"); + secondResponse.add("producerD:4151"); + + NSQReader reader = new NSQReader() { + @Override + protected Runnable makeRunnableFromMessage(Message msg) { + return new Runnable(){ + public void run() { + + } + }; + + } + }; + + reader.connClass = MockConnection.class; + reader.init("testTopic", "testChannel"); + + log.debug("Setting first lookupd response (A,B,C)"); + reader.handleLookupdResponse(firstResponse); + + assertEquals(firstResponse.size(), reader.connections.size()); + assertTrue(reader.connections.containsKey("producerA:4151")); + assertTrue(reader.connections.containsKey("producerB:4151")); + assertTrue(reader.connections.containsKey("producerC:4151")); + + + log.debug("Setting second lookupd response (B,C,D)"); + reader.handleLookupdResponse(secondResponse); + assertEquals(secondResponse.size(), reader.connections.size()); + assertTrue(reader.connections.containsKey("producerB:4151")); + assertTrue(reader.connections.containsKey("producerC:4151")); + assertTrue(reader.connections.containsKey("producerD:4151")); + + + } + +} diff --git a/src/test/java/ly/bit/nsq/syncresponse/MockHandler.java b/src/test/java/ly/bit/nsq/syncresponse/MockHandler.java new file mode 100644 index 0000000..0bc68c0 --- /dev/null +++ b/src/test/java/ly/bit/nsq/syncresponse/MockHandler.java @@ -0,0 +1,47 @@ +package ly.bit.nsq.syncresponse; + +import ly.bit.nsq.Message; +import ly.bit.nsq.exceptions.NSQException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A mock implementation of SyncResponseHandler that tracks incoming + * messages for unit tests + */ +public class MockHandler implements SyncResponseHandler { + private List received; + private boolean failNextMessage = false; + + public MockHandler() { + this.received = Collections.synchronizedList(new ArrayList()); + } + + @Override + public boolean handleMessage(Message msg) throws NSQException { + received.add(msg); + + boolean ret = true; + synchronized(this) { + if (failNextMessage) { + failNextMessage = false; + ret = false; + } + } + return ret; + } + + /** + * For testing, will cause the next message handling to fail. + */ + public void failNext() { + synchronized (this) { + this.failNextMessage = true; + } + } + public List getReceived() { + return this.received; + } +} diff --git a/src/test/java/ly/bit/nsq/syncresponse/SyncResponseReaderTest.java b/src/test/java/ly/bit/nsq/syncresponse/SyncResponseReaderTest.java new file mode 100644 index 0000000..24c8291 --- /dev/null +++ b/src/test/java/ly/bit/nsq/syncresponse/SyncResponseReaderTest.java @@ -0,0 +1,85 @@ +package ly.bit.nsq.syncresponse; + +import ly.bit.nsq.Message; +import ly.bit.nsq.MockConnection; +import ly.bit.nsq.exceptions.NSQException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * User: oneill + * Date: 4/8/13 + */ +public class SyncResponseReaderTest { + Logger log = LoggerFactory.getLogger(SyncResponseReaderTest.class); + + @Test + public void testIncomingMessage_success() throws NSQException, InterruptedException { + // Setup a dummy handler which records the incoming messages + MockHandler handler = new MockHandler(); + SyncResponseReader reader = new SyncResponseReader("testTopic", "testChannel", handler); + reader.setConnClass(MockConnection.class); + reader.connectToNsqd("127.0.0.1", 4151); + + // Get the mock connection and inject a message into it + MockConnection conn = (MockConnection) reader.getConnections().get("127.0.0.1:4151"); + assertNotNull(conn); + + String body = "{\"foo\":\"bar\"}"; + byte[] idBytes = MockConnection.randomId(); + String idString = new String(idBytes); + conn.fakeMessage(idBytes, body); + + Thread.sleep(3000); + + // Compare the injected and received messages + List received = handler.getReceived(); + assertEquals(1, received.size()); // received one message + + // handler was successful, we ack the message with "FIN $id" + String lastSent = conn.getLastSent(); + assertEquals("FIN " + idString + "\n", lastSent); + } + + /** + * Test the case where the handler rejects the message (e.g. database down). + * @throws NSQException + * @throws InterruptedException + */ + @Test + public void testIncomingMessage_failure() throws NSQException, InterruptedException { + // Setup a dummy handler which records the incoming messages + MockHandler handler = new MockHandler(); + SyncResponseReader reader = new SyncResponseReader("testTopic", "testChannel", handler); + reader.setConnClass(MockConnection.class); + reader.connectToNsqd("127.0.0.1", 4151); + handler.failNext(); + + // Get the mock connection and inject a message into it + MockConnection conn = (MockConnection) reader.getConnections().get("127.0.0.1:4151"); + assertNotNull(conn); + + String body = "{\"foo\":\"bar\"}"; + byte[] idBytes = MockConnection.randomId(); + String idString = new String(idBytes); + conn.fakeMessage(idBytes, body); + + Thread.sleep(3000); + + // Compare the injected and received messages + List received = handler.getReceived(); + assertEquals(1, received.size()); + + // handler had an error, we will try to requeue the message for the first time + String lastSent = conn.getLastSent(); + assertEquals("REQ " + idString + " 0\n", lastSent); + + } +}