Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 4 #5

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/ly/bit/nsq/BasicConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void close() {
// whatever, we're not doing anything with this anymore
log.error("Exception closing connection: ", e);
}
this.reader.connections.remove(this.toString());
BasicConnection.this.reader.connectionClosed(BasicConnection.this);
}

}
30 changes: 24 additions & 6 deletions src/main/java/ly/bit/nsq/Connection.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package ly.bit.nsq;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not import * here? We're pretty strict here at Bitly about explicit imports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That must be an IDE setting. I will find it and kill it.

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -61,7 +58,7 @@ public void messageReceivedCallback(Message message){
public abstract void readForever() throws NSQException;
public abstract void close();

public Message decodeMesage(byte[] data) throws NSQException {
public Message decodeMessage(byte[] data) throws NSQException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! Nice catch

DataInputStream ds = new DataInputStream(new ByteArrayInputStream(data));
try {
long timestamp = ds.readLong(); // 8 bytes
Expand All @@ -76,6 +73,27 @@ public Message decodeMesage(byte[] data) throws NSQException {
}
}

/**
* Reverse of decodeMessage, helpful in testing so far.
* @param msg
* @return
* @throws NSQException
*/
public byte[] encodeMessage(Message msg) throws NSQException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this doesn't depend on any fields of the Connection instance, perhaps we might move it to ConnectionUtils and make it a static method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe a MessageCodec class

ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bytes);
try {
ds.writeLong(msg.getTimestamp());
ds.writeShort(msg.getAttempts());
ds.write(msg.getId());
ds.write(msg.getBody());
ds.close();
} catch (IOException e) {
throw new NSQException(e);
}
return bytes.toByteArray();
}

public void handleResponse(byte[] response) throws NSQException {
DataInputStream ds = new DataInputStream(new ByteArrayInputStream(response));
try {
Expand All @@ -86,7 +104,7 @@ public void handleResponse(byte[] response) throws NSQException {
break;
case FRAMETYPEMESSAGE:
byte[] messageBytes = Arrays.copyOfRange(response, 4, response.length);
Message msg = this.decodeMesage(messageBytes);
Message msg = this.decodeMessage(messageBytes);
this.messageReceivedCallback(msg);
break;
case FRAMETYPEERROR:
Expand Down
1 change: 0 additions & 1 deletion src/main/java/ly/bit/nsq/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,4 @@ public Connection getConn() {
return conn;
}


}
91 changes: 80 additions & 11 deletions src/main/java/ly/bit/nsq/NSQReader.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package ly.bit.nsq;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Base implementation of an NSQ consusumer client which manages lookupd requests
* and responses, nsqd connections etc.
* Typically you would use a specific implementation like SyncResponseReader, which
* additionally implements synchronous responses for messages.
* See the PrintReader example.
*/

import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.lookupd.AbstractLookupd;
Expand All @@ -15,9 +15,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;


public abstract class NSQReader {
private static final Logger log = LoggerFactory.getLogger(NSQReader.class);

private static final int LOOKUPD_INITIAL_DELAY = 0;
private static final int LOOKUPD_POLL_INTERVAL = 30;

protected int requeueDelay;
protected int maxRetries;
Expand All @@ -30,7 +44,7 @@ public abstract class NSQReader {

protected ExecutorService executor;

protected Class<? extends Connection> connClass;
protected Class<? extends Connection> connClass = BasicConnection.class;

protected ConcurrentHashMap<String, Connection> connections;
protected ConcurrentHashMap<String, AbstractLookupd> lookupdConnections;
Expand All @@ -55,7 +69,6 @@ public void init(String topic, String channel){
String[] hostParts = this.hostname.split("\\.");
this.shortHostname = hostParts[0];

this.connClass = BasicConnection.class; // TODO can be passed by caller
this.lookupdConnections = new ConcurrentHashMap<String, AbstractLookupd>();
this.lookupdScheduler = Executors.newScheduledThreadPool(1);

Expand Down Expand Up @@ -132,7 +145,55 @@ public void connectToNsqd(String address, int port) throws NSQException{
conn.send(ConnectionUtils.ready(conn.maxInFlight));
conn.readForever();
}


/**
* Set the currently configured nsqd addresses that we should be connected to. This triggers disconnects (from
* disused servers) and connection attempts (to new servers).
* This function is probably only called from the Lookupd job.
* @param connectionAddresses
*/
public void handleLookupdResponse(Set<String> connectionAddresses) {
Set<String> toClose = new HashSet<String>(connections.keySet());
Set<String> toOpen = new HashSet<String>(connectionAddresses);

// We need to open any connections in set that are not already open
toOpen.removeAll(connections.keySet());
// We need to close any connections that are open but not in the set
toClose.removeAll(connectionAddresses);

// Open new connections
for(String address : toOpen) {
log.info("Opening new producer connection: {}", address);
String[] components = address.split(":");
String nsqdAddress = components[0];
int nsqdPort = Integer.parseInt(components[1]);
try {
connectToNsqd(nsqdAddress, nsqdPort);
} catch (NSQException e) {
log.error("Erroring response from lookupd", e);
}
}
// close old connections
for(String address : toClose) {
if (connections.containsKey(address)) {
log.info("Producer not in lookupd response, closing connection: {}", address);
connections.get(address).close();
this.connections.remove(address);
}

}
}

/**
* Remove this nsqd connection from our active pool if it is present.
* Typically called after problems are detected with the connection, or
* after a lookupd request shows that the server is no longer active.
* @param conn
*/
public void connectionClosed(Connection conn) {
this.connections.remove(conn.toString());

}

// lookupd stuff

Expand All @@ -142,7 +203,7 @@ public void addLookupd(AbstractLookupd lookupd) {
if (stored != null){
return;
}
lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), 30, 30, SECONDS);
lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), LOOKUPD_INITIAL_DELAY, LOOKUPD_POLL_INTERVAL, SECONDS);
}

public String toString(){
Expand All @@ -157,4 +218,12 @@ public ConcurrentHashMap<String, AbstractLookupd> getLookupdConnections() {
return lookupdConnections;
}

public void setConnClass(Class<? extends Connection> clazz) {
this.connClass = clazz;
}

public ConcurrentHashMap<String, Connection> getConnections() {
return this.connections;
}

}
17 changes: 5 additions & 12 deletions src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ly.bit.nsq.lookupd;

import java.util.HashSet;
import java.util.List;
import java.util.Map;

import ly.bit.nsq.NSQReader;
import ly.bit.nsq.exceptions.NSQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,16 +22,9 @@ public BasicLookupdJob(String lookupdAddress, NSQReader reader) {
public void run() {
Map<String, AbstractLookupd> lookupdConnections = reader.getLookupdConnections();
AbstractLookupd lookupd = lookupdConnections.get(lookupdAddress);
List<String> producers = lookupd.query(reader.getTopic());
for(String producer : producers) {
String[] components = producer.split(":");
String nsqdAddress = components[0];
int nsqdPort = Integer.parseInt(components[1]);
try {
reader.connectToNsqd(nsqdAddress, nsqdPort);
} catch (NSQException e) {
log.error("Error reading response from lookupd", e);
}
}
List<String> producers = lookupd.query(reader.getTopic());

reader.handleLookupdResponse(new HashSet<String>(producers));

}
}
50 changes: 50 additions & 0 deletions src/test/java/ly/bit/nsq/ConnectionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ly.bit.nsq;

import ly.bit.nsq.exceptions.NSQException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;

/**
* User: oneill
* Date: 4/8/13
*/
public class ConnectionTest {
Logger log = LoggerFactory.getLogger(ConnectionTest.class);


/**
* Test that we can encode and decode a message properly.
* @throws NSQException
*/
@Test
public void testCodec() throws NSQException {

byte[] id = MockConnection.randomId();


Connection conn = new MockConnection();
String body = "kjehfliuANDY.WAS.HEREe;flijwe,jfhwqliuehfj";
Message msg = new Message(id, body.getBytes(), System.currentTimeMillis()*1000,
new Integer(0).shortValue(), conn);
byte[] encoded = conn.encodeMessage(msg);
log.debug("Encoded message: {}", encoded);

Message decoded = conn.decodeMessage(encoded);
assertEquals(msg.getAttempts(), decoded.getAttempts());
for (int i=0; i<id.length; i++) {
assertEquals(id[i], decoded.getId()[i]);
}
assertEquals(new String(msg.getBody()), new String(decoded.getBody()));
assertEquals(msg.getTimestamp(), decoded.getTimestamp());

byte[] reenecoded = conn.encodeMessage(decoded);
assertEquals(encoded.length, reenecoded.length);
for (int i=0; i<reenecoded.length; i++) {
assertEquals(encoded[i], reenecoded[i]);
}

}
}
Loading