Skip to content
This repository has been archived by the owner on Oct 15, 2020. It is now read-only.

Commit

Permalink
Move message encode/decode to utility class
Browse files Browse the repository at this point in the history
- New MessageCodec class
- Also remove all import foo.* in non-test code.

Refs Issue nsqio#4
  • Loading branch information
Andy O'Neill committed Apr 16, 2013
1 parent 44dca4e commit 966faec
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 45 deletions.
49 changes: 8 additions & 41 deletions src/main/java/ly/bit/nsq/Connection.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package ly.bit.nsq;

import java.io.*;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.util.ConnectionUtils;
import ly.bit.nsq.util.FrameType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


/**
* @author dan
Expand Down Expand Up @@ -58,41 +60,6 @@ public void messageReceivedCallback(Message message){
public abstract void readForever() throws NSQException;
public abstract void close();

public Message decodeMessage(byte[] data) throws NSQException {
DataInputStream ds = new DataInputStream(new ByteArrayInputStream(data));
try {
long timestamp = ds.readLong(); // 8 bytes
short attempts = ds.readShort(); // 2 bytes
byte[] id = new byte[16];
ds.read(id);
byte[] body = new byte[data.length - 26];
ds.read(body);
return new Message(id, body, timestamp, attempts, this);
} catch (IOException e) {
throw new NSQException(e);
}
}

/**
* Reverse of decodeMessage, helpful in testing so far.
* @param msg
* @return
* @throws NSQException
*/
public byte[] encodeMessage(Message msg) throws NSQException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bytes);
try {
ds.writeLong(msg.getTimestamp());
ds.writeShort(msg.getAttempts());
ds.write(msg.getId());
ds.write(msg.getBody());
ds.close();
} catch (IOException e) {
throw new NSQException(e);
}
return bytes.toByteArray();
}

public void handleResponse(byte[] response) throws NSQException {
DataInputStream ds = new DataInputStream(new ByteArrayInputStream(response));
Expand All @@ -104,7 +71,7 @@ public void handleResponse(byte[] response) throws NSQException {
break;
case FRAMETYPEMESSAGE:
byte[] messageBytes = Arrays.copyOfRange(response, 4, response.length);
Message msg = this.decodeMessage(messageBytes);
Message msg = MessageCodec.decode(messageBytes, this);
this.messageReceivedCallback(msg);
break;
case FRAMETYPEERROR:
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/ly/bit/nsq/MessageCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ly.bit.nsq;


import ly.bit.nsq.exceptions.NSQException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public class MessageCodec {
public static Message decode(byte[] data, Connection conn) throws NSQException {
DataInputStream ds = new DataInputStream(new ByteArrayInputStream(data));
try {
long timestamp = ds.readLong(); // 8 bytes
short attempts = ds.readShort(); // 2 bytes
byte[] id = new byte[16];
ds.read(id);
byte[] body = new byte[data.length - 26];
ds.read(body);
return new Message(id, body, timestamp, attempts, conn);
} catch (IOException e) {
throw new NSQException(e);
}
}

/**
* Reverse of decodeMessage, helpful in testing so far.
* @param msg
* @return
* @throws NSQException
*/
public static byte[] encode(Message msg) throws NSQException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bytes);
try {
ds.writeLong(msg.getTimestamp());
ds.writeShort(msg.getAttempts());
ds.write(msg.getId());
ds.write(msg.getBody());
ds.close();
} catch (IOException e) {
throw new NSQException(e);
}
return bytes.toByteArray();
}

}
6 changes: 3 additions & 3 deletions src/test/java/ly/bit/nsq/ConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ public void testCodec() throws NSQException {
String body = "kjehfliuANDY.WAS.HEREe;flijwe,jfhwqliuehfj";
Message msg = new Message(id, body.getBytes(), System.currentTimeMillis()*1000,
new Integer(0).shortValue(), conn);
byte[] encoded = conn.encodeMessage(msg);
byte[] encoded = MessageCodec.encode(msg);
log.debug("Encoded message: {}", encoded);

Message decoded = conn.decodeMessage(encoded);
Message decoded = MessageCodec.decode(encoded, conn);
assertEquals(msg.getAttempts(), decoded.getAttempts());
for (int i=0; i<id.length; i++) {
assertEquals(id[i], decoded.getId()[i]);
}
assertEquals(new String(msg.getBody()), new String(decoded.getBody()));
assertEquals(msg.getTimestamp(), decoded.getTimestamp());

byte[] reenecoded = conn.encodeMessage(decoded);
byte[] reenecoded = MessageCodec.encode(decoded);
assertEquals(encoded.length, reenecoded.length);
for (int i=0; i<reenecoded.length; i++) {
assertEquals(encoded[i], reenecoded[i]);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/ly/bit/nsq/MockConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void fakeMessage(byte[] idBytes, String body) throws NSQException {
Message msg = new Message(idBytes, body.getBytes(), System.currentTimeMillis()*1000,
new Integer(0).shortValue(), this);

byte[] framed = frameMessage(this.encodeMessage(msg));
byte[] framed = frameMessage(MessageCodec.encode(msg));
log.debug("Inserting fake message: '{}'", framed);

this.messages.add(framed);
Expand Down

0 comments on commit 966faec

Please sign in to comment.