diff --git a/docs/README.md b/docs/README.md
index b477134..a612746 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -2,11 +2,13 @@
Build Scalable Node.js WebSocket Applications
-
+
-
+
+
+
@@ -15,5 +17,5 @@
\ No newline at end of file
diff --git a/src/main/java/com/clusterws/Channel.java b/src/main/java/com/clusterws/Channel.java
index 0242d83..6ab8efe 100644
--- a/src/main/java/com/clusterws/Channel.java
+++ b/src/main/java/com/clusterws/Channel.java
@@ -3,7 +3,7 @@
import java.util.List;
public class Channel {
- public interface IChannelListener{
+ public interface IChannelListener {
void onDataReceived(String channelName, Object data);
}
@@ -16,13 +16,13 @@ public Channel(String channelName, ClusterWS clusterWS) {
mClusterWS = clusterWS;
}
- public Channel watch(IChannelListener listener){
+ public Channel watch(IChannelListener listener) {
mChannelListener = listener;
return this;
}
- public Channel publish(Object data){
- mClusterWS.send(mChannelName,data,"publish");
+ public Channel publish(Object data) {
+ mClusterWS.send(mChannelName, data, "publish");
return this;
}
@@ -42,7 +42,7 @@ void onMessage(Object data) {
}
}
- void subscribe(){
- mClusterWS.send("subscribe",mChannelName,"system");
+ void subscribe() {
+ mClusterWS.send("subscribe", mChannelName, "system");
}
}
diff --git a/src/main/java/com/clusterws/ClusterWS.java b/src/main/java/com/clusterws/ClusterWS.java
index 5d7c714..b77af17 100644
--- a/src/main/java/com/clusterws/ClusterWS.java
+++ b/src/main/java/com/clusterws/ClusterWS.java
@@ -7,6 +7,9 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ThreadLocalRandom;
public class ClusterWS {
private Socket mSocket;
@@ -17,20 +20,30 @@ public class ClusterWS {
private MessageHandler mMessageHandler;
private PingHandler mPingHandler;
private List mChannels;
- private ReconnectionHandler mReconnectionHandler;
+ private ReconnectionParams mReconnectionParams;
public ClusterWS(String url) {
- if (url == null){
+ if (url == null) {
throw new NullPointerException("Url must be provided");
}
mUrl = url;
mChannels = new ArrayList<>();
- mReconnectionHandler = new ReconnectionHandler(null, null, null, null, this);
+ mReconnectionParams = new ReconnectionParams(
+ false,
+ null,
+ null,
+ null);
createSocket();
}
- public ClusterWS setReconnection(Boolean autoReconnect, Integer reconnectionIntervalMin, Integer reconnectionIntervalMax, Integer reconnectionAttempts) {
- mReconnectionHandler = new ReconnectionHandler(autoReconnect, reconnectionIntervalMin, reconnectionIntervalMax, reconnectionAttempts, this);
+ public ClusterWS setReconnection(Boolean autoReconnect,
+ Integer reconnectionIntervalMin,
+ Integer reconnectionIntervalMax,
+ Integer reconnectionAttempts) {
+ mReconnectionParams = new ReconnectionParams(autoReconnect,
+ reconnectionIntervalMin,
+ reconnectionIntervalMax,
+ reconnectionAttempts);
return this;
}
@@ -53,7 +66,7 @@ public void on(String event, IEmitterListener listener) {
}
public void send(String event, Object data) {
- send(event,data,"emit");
+ send(event, data, "emit");
}
public WebSocket.READYSTATE getState() {
@@ -117,7 +130,7 @@ private void createSocket() {
mSocket = new Socket(URI.create(mUrl), new ISocketEvents() {
@Override
public void onOpen() {
- mReconnectionHandler.onOpen();
+ mClusterWSListener.onConnected();
}
@Override
@@ -132,17 +145,22 @@ public void onClose(int code, String reason) {
if (mPingHandler.getPingTimer() != null) {
mPingHandler.getPingTimer().cancel();
}
-
- if (mReconnectionHandler.isInReconnectionState()) {
- return;
- }
- if (mReconnectionHandler.isAutoReconnect() && code != 1000) {
- mReconnectionHandler.reconnect();
- }
-
- if (mClusterWSListener != null) {
- mClusterWSListener.onDisconnected(code, reason);
+ if (mReconnectionParams.isAutoReconnect() && code != 1000 && (mReconnectionParams.getReconnectionAttempts() == 0 || mReconnectionParams.getReconnectionsAttempted() < mReconnectionParams.getReconnectionAttempts())) {
+ if (mSocket.getReadyState() == WebSocket.READYSTATE.CLOSED || mSocket.getReadyState() == WebSocket.READYSTATE.NOT_YET_CONNECTED) {
+ mReconnectionParams.incrementReconnectionsAttempted();
+ int randomDelay = ThreadLocalRandom.current().nextInt(1,
+ mReconnectionParams.getReconnectionIntervalMax() -
+ mReconnectionParams.getReconnectionIntervalMin() +
+ 1);
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ connect();
+ }
+ }, randomDelay);
+ }
}
+ mClusterWSListener.onDisconnected(code, reason);
}
@Override
@@ -170,5 +188,4 @@ private void onMessageReceived(String message) {
mMessageHandler.messageDecode(ClusterWS.this, message);
}
}
-
}
diff --git a/src/main/java/com/clusterws/Emitter.java b/src/main/java/com/clusterws/Emitter.java
index 74e3767..90f3ad6 100644
--- a/src/main/java/com/clusterws/Emitter.java
+++ b/src/main/java/com/clusterws/Emitter.java
@@ -3,28 +3,28 @@
import java.util.concurrent.ConcurrentHashMap;
class Emitter {
- private ConcurrentHashMap mEvents;
+ private ConcurrentHashMap mEvents;
Emitter() {
mEvents = new ConcurrentHashMap<>();
}
- void addEventListener(String event, IEmitterListener listener){
- if (mEvents.containsKey(event)){
- mEvents.replace(event,listener);
+ void addEventListener(String event, IEmitterListener listener) {
+ if (mEvents.containsKey(event)) {
+ mEvents.replace(event, listener);
} else {
- mEvents.put(event,listener);
+ mEvents.put(event, listener);
}
}
- void emit(String event, Object object){
+ void emit(String event, Object object) {
IEmitterListener listener = mEvents.get(event);
- if (listener != null){
+ if (listener != null) {
listener.onDataReceived(object);
}
}
- void removeAllEvents(){
+ void removeAllEvents() {
mEvents = new ConcurrentHashMap<>();
}
}
diff --git a/src/main/java/com/clusterws/IClusterWSListener.java b/src/main/java/com/clusterws/IClusterWSListener.java
index dfade6c..988a639 100644
--- a/src/main/java/com/clusterws/IClusterWSListener.java
+++ b/src/main/java/com/clusterws/IClusterWSListener.java
@@ -2,6 +2,8 @@
public interface IClusterWSListener {
void onConnected();
+
void onError(Exception exception);
+
void onDisconnected(int code, String reason);
}
diff --git a/src/main/java/com/clusterws/ISocketEvents.java b/src/main/java/com/clusterws/ISocketEvents.java
index 7fbd416..4f7e27f 100644
--- a/src/main/java/com/clusterws/ISocketEvents.java
+++ b/src/main/java/com/clusterws/ISocketEvents.java
@@ -4,9 +4,13 @@
public interface ISocketEvents {
void onOpen();
+
void onError(Exception exception);
+
void onClose(int code, String reason);
+
void onBinaryMessage(ByteBuffer bytes);
+
void onMessage(String message);
}
diff --git a/src/main/java/com/clusterws/MessageHandler.java b/src/main/java/com/clusterws/MessageHandler.java
index 51b013c..44018d8 100644
--- a/src/main/java/com/clusterws/MessageHandler.java
+++ b/src/main/java/com/clusterws/MessageHandler.java
@@ -4,16 +4,15 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
class MessageHandler {
- String messageEncode(String event, Object data, String type){
+ String messageEncode(String event, Object data, String type) {
JSONObject jsonObject = new JSONObject();
JSONArray jsonArray = new JSONArray();
- switch (type){
+ switch (type) {
case "publish":
jsonArray.add("p");
jsonArray.add(event);
@@ -27,18 +26,18 @@ String messageEncode(String event, Object data, String type){
jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
case "system":
- switch (event){
+ switch (event) {
case "subscribe":
jsonArray.add("s");
jsonArray.add("s");
jsonArray.add(data);
- jsonObject.put("#",jsonArray);
+ jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
case "unsubscribe":
jsonArray.add("s");
jsonArray.add("u");
jsonArray.add(data);
- jsonObject.put("#",jsonArray);
+ jsonObject.put("#", jsonArray);
return jsonObject.toJSONString();
}
case "ping":
@@ -48,9 +47,9 @@ String messageEncode(String event, Object data, String type){
}
}
- void messageDecode(final ClusterWS socket, String message){
+ void messageDecode(final ClusterWS socket, String message) {
JSONArray jsonArray = JSON.parseObject(message).getJSONArray("#");
- switch (jsonArray.getString(0)){
+ switch (jsonArray.getString(0)) {
case "p":
//
List channelArrayList = socket.getChannels();
@@ -64,10 +63,10 @@ void messageDecode(final ClusterWS socket, String message){
}
break;
case "e":
- socket.getEmitter().emit(jsonArray.getString(1),jsonArray.get(2));
+ socket.getEmitter().emit(jsonArray.getString(1), jsonArray.get(2));
break;
case "s":
- if (jsonArray.getString(1).equals("c")){
+ if (jsonArray.getString(1).equals("c")) {
socket.getPingHandler().getPingTimer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
@@ -78,10 +77,10 @@ public void run() {
cancel();
}
}
- },0,jsonArray.getJSONObject(2).getInteger("ping"));
+ }, 0, jsonArray.getJSONObject(2).getInteger("ping"));
boolean useBinary = jsonArray.getJSONObject(2).getBoolean("binary");
socket.setUseBinary(useBinary);
- if (socket.getClusterWSListener() != null){
+ if (socket.getClusterWSListener() != null) {
socket.getClusterWSListener().onConnected();
}
}
diff --git a/src/main/java/com/clusterws/PingHandler.java b/src/main/java/com/clusterws/PingHandler.java
index 5d9b26d..5572ace 100644
--- a/src/main/java/com/clusterws/PingHandler.java
+++ b/src/main/java/com/clusterws/PingHandler.java
@@ -15,7 +15,7 @@ void incrementMissedPing() {
mMissedPing++;
}
- void setMissedPingToZero(){
+ void setMissedPingToZero() {
mMissedPing = 0;
}
diff --git a/src/main/java/com/clusterws/ReconnectionHandler.java b/src/main/java/com/clusterws/ReconnectionHandler.java
deleted file mode 100644
index ab50a76..0000000
--- a/src/main/java/com/clusterws/ReconnectionHandler.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.clusterws;
-
-import org.java_websocket.WebSocket;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ThreadLocalRandom;
-
-class ReconnectionHandler {
- private final boolean mAutoReconnect;
- private final int mReconnectionIntervalMin;
- private final int mReconnectionIntervalMax;
- private final int mReconnectionAttempts;
- private boolean mInReconnectionState;
- private int mReconnectionsAttempted;
- private Timer mReconnectionTimer;
- private Timer mTimerOff;
- private ClusterWS mSocket;
-
- ReconnectionHandler(Boolean autoReconnect, Integer reconnectionIntervalMin, Integer reconnectionIntervalMax, Integer reconnectionAttempts, ClusterWS socket) {
- mAutoReconnect = autoReconnect != null ? autoReconnect : false;
- mReconnectionIntervalMin = reconnectionIntervalMin != null ? reconnectionIntervalMin : 1000;
- mReconnectionIntervalMax = reconnectionIntervalMax != null ? reconnectionIntervalMax : 5000;
- mReconnectionAttempts = reconnectionAttempts != null ? reconnectionAttempts : 0;
- mSocket = socket;
- mInReconnectionState = false;
- mReconnectionsAttempted = 0;
- }
-
- void onOpen(){
- if (mReconnectionTimer != null) {
- mReconnectionTimer.cancel();
- mReconnectionTimer = null;
- }
-
- if (mTimerOff != null) {
- mTimerOff.cancel();
- mTimerOff = null;
- }
-
- mInReconnectionState = false;
- mReconnectionsAttempted = 0;
- List channels = mSocket.getChannels();
-
- for (Channel channel :
- channels) {
- channel.subscribe();
- }
- }
-
- void reconnect(){
- mInReconnectionState = true;
- mReconnectionTimer = new Timer();
- mReconnectionTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- if (mSocket.getState() == WebSocket.READYSTATE.CLOSED) {
- mReconnectionsAttempted++;
- if (mReconnectionAttempts != 0 && mReconnectionsAttempted >= mReconnectionAttempts) {
- cancel();
- mInReconnectionState = false;
- } else {
- if (mTimerOff != null) {
- mTimerOff.cancel();
- }
- mTimerOff = new Timer();
- int randomDelay = ThreadLocalRandom.current().nextInt(1, mReconnectionIntervalMax- mReconnectionIntervalMin + 1);
- mTimerOff.schedule(new TimerTask() {
- @Override
- public void run() {
- mSocket.connect();
- }
- }, randomDelay);
- }
- }
- }
- }, 0, mReconnectionIntervalMin);
- }
-
- boolean isInReconnectionState() {
- return mInReconnectionState;
- }
-
- boolean isAutoReconnect() {
- return mAutoReconnect;
- }
-}
diff --git a/src/main/java/com/clusterws/ReconnectionParams.java b/src/main/java/com/clusterws/ReconnectionParams.java
new file mode 100644
index 0000000..1246148
--- /dev/null
+++ b/src/main/java/com/clusterws/ReconnectionParams.java
@@ -0,0 +1,48 @@
+package com.clusterws;
+
+public class ReconnectionParams {
+ private Boolean mAutoReconnect;
+ private Integer mReconnectionIntervalMin;
+ private Integer mReconnectionIntervalMax;
+ private Integer mReconnectionAttempts;
+ private Integer mReconnectionsAttempted;
+
+ public ReconnectionParams(Boolean autoReconnect,
+ Integer reconnectionIntervalMin,
+ Integer reconnectionIntervalMax,
+ Integer reconnectionAttempts) {
+ mAutoReconnect = autoReconnect != null ? autoReconnect : false;
+ mReconnectionIntervalMin = reconnectionIntervalMin != null ? reconnectionIntervalMin : 1000;
+ mReconnectionIntervalMax = reconnectionIntervalMax != null ? reconnectionIntervalMax : 5000;
+ mReconnectionAttempts = reconnectionAttempts != null ? reconnectionAttempts : 0;
+ mReconnectionsAttempted = 0;
+ }
+
+ public boolean isAutoReconnect() {
+ return mAutoReconnect;
+ }
+
+ public Integer getReconnectionIntervalMin() {
+ return mReconnectionIntervalMin;
+ }
+
+ public Integer getReconnectionIntervalMax() {
+ return mReconnectionIntervalMax;
+ }
+
+ public Integer getReconnectionAttempts() {
+ return mReconnectionAttempts;
+ }
+
+ public void incrementReconnectionsAttempted() {
+ mReconnectionsAttempted++;
+ }
+
+ public void resetReconnectionsAttempted() {
+ mReconnectionsAttempted = 0;
+ }
+
+ public Integer getReconnectionsAttempted() {
+ return mReconnectionsAttempted;
+ }
+}
diff --git a/src/main/java/com/clusterws/Socket.java b/src/main/java/com/clusterws/Socket.java
index 4eaca7e..99d31ac 100644
--- a/src/main/java/com/clusterws/Socket.java
+++ b/src/main/java/com/clusterws/Socket.java
@@ -27,7 +27,7 @@ public void onMessage(String message) {
@Override
public void onClose(int code, String reason, boolean remote) {
- mSocketEvents.onClose(code,reason);
+ mSocketEvents.onClose(code, reason);
}
@Override