diff --git a/src/main/java/com/cf/client/WSSClient.java b/src/main/java/com/cf/client/WSSClient.java index a99aef4..3ae7be1 100644 --- a/src/main/java/com/cf/client/WSSClient.java +++ b/src/main/java/com/cf/client/WSSClient.java @@ -1,12 +1,11 @@ package com.cf.client; -import com.cf.client.wss.subscription.Subscription; -import com.cf.client.wss.handler.SubscriptionMessageHandler; +import com.cf.client.poloniex.wss.model.PoloniexWSSSubscription; import java.io.IOException; import java.net.URI; import java.util.concurrent.TimeUnit; -import com.cf.client.wss.router.WebSocketClientRouter; +import com.cf.client.poloniex.PoloniexWSSClientRouter; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -29,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; +import com.cf.client.wss.handler.IMessageHandler; /** * @@ -43,7 +43,7 @@ public class WSSClient implements AutoCloseable { private final SslContext sslCtx; private final EventLoopGroup group; - private Map subscriptions; + private Map subscriptions; public WSSClient(String url) throws Exception { uri = new URI(url); @@ -64,7 +64,7 @@ public WSSClient(String url) throws Exception { * @param subscription * @param subscriptionMessageHandler */ - public void addSubscription(Subscription subscription, SubscriptionMessageHandler subscriptionMessageHandler) { + public void addSubscription(PoloniexWSSSubscription subscription, IMessageHandler subscriptionMessageHandler) { this.subscriptions.put(subscription, subscriptionMessageHandler); } @@ -79,8 +79,8 @@ public void addSubscription(Subscription subscription, SubscriptionMessageHandle */ public void run(long runTimeInMillis) throws InterruptedException, IOException, URISyntaxException { - final WebSocketClientRouter router = new WebSocketClientRouter(uri, subscriptions.entrySet().stream() - .collect(Collectors.toMap((Map.Entry e) -> Double.parseDouble(e.getKey().channel), Map.Entry::getValue))); + final PoloniexWSSClientRouter router = new PoloniexWSSClientRouter(uri, subscriptions.entrySet().stream() + .collect(Collectors.toMap((Map.Entry e) -> Double.parseDouble(e.getKey().channel), Map.Entry::getValue))); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @@ -96,7 +96,7 @@ protected void initChannel(SocketChannel ch) { Channel ch = b.connect(uri.getHost(), 443).sync().channel(); router.handshakeFuture().sync(); - for (Entry subscription : subscriptions.entrySet()) { + for (Entry subscription : subscriptions.entrySet()) { WebSocketFrame frame = new TextWebSocketFrame(subscription.getKey().toString()); ch.writeAndFlush(frame); } diff --git a/src/main/java/com/cf/client/wss/router/WebSocketClientRouter.java b/src/main/java/com/cf/client/poloniex/PoloniexWSSClientRouter.java similarity index 82% rename from src/main/java/com/cf/client/wss/router/WebSocketClientRouter.java rename to src/main/java/com/cf/client/poloniex/PoloniexWSSClientRouter.java index 78668a3..be97447 100644 --- a/src/main/java/com/cf/client/wss/router/WebSocketClientRouter.java +++ b/src/main/java/com/cf/client/poloniex/PoloniexWSSClientRouter.java @@ -1,7 +1,6 @@ -package com.cf.client.wss.router; +package com.cf.client.poloniex; -import com.cf.client.wss.handler.LoggingSubscriptionMessageHandler; -import com.cf.client.wss.handler.SubscriptionMessageHandler; +import com.cf.client.wss.handler.LoggerMessageHandler; import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -25,8 +24,9 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; +import com.cf.client.wss.handler.IMessageHandler; -public class WebSocketClientRouter extends SimpleChannelInboundHandler { +public class PoloniexWSSClientRouter extends SimpleChannelInboundHandler { private final static Logger LOG = LogManager.getLogger(); private static final int MAX_FRAME_LENGTH = 1262144; @@ -35,18 +35,20 @@ public class WebSocketClientRouter extends SimpleChannelInboundHandler { private ChannelPromise handshakeFuture; private boolean running; - private Map subscriptions; - private final SubscriptionMessageHandler defaultSubscriptionMessageHandler; + private Map subscriptions; + private final IMessageHandler defaultSubscriptionMessageHandler; + private final Gson gson; - public WebSocketClientRouter(URI url, Map subscriptions) throws URISyntaxException { + public PoloniexWSSClientRouter(URI url, Map subscriptions) throws URISyntaxException { this(WebSocketClientHandshakerFactory .newHandshaker(url, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), MAX_FRAME_LENGTH), subscriptions); } - public WebSocketClientRouter(WebSocketClientHandshaker handshaker, Map subscriptions) { + public PoloniexWSSClientRouter(WebSocketClientHandshaker handshaker, Map subscriptions) { this.handshaker = handshaker; this.subscriptions = subscriptions; - this.defaultSubscriptionMessageHandler = new LoggingSubscriptionMessageHandler(); + this.defaultSubscriptionMessageHandler = new LoggerMessageHandler(); + this.gson = new Gson(); } public ChannelFuture handshakeFuture() { @@ -95,7 +97,7 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; LOG.trace("WebSocket Client received message: " + textFrame.text()); - List results = new Gson().fromJson(textFrame.text(), List.class); + List results = this.gson.fromJson(textFrame.text(), List.class); this.subscriptions.getOrDefault(results.get(0), this.defaultSubscriptionMessageHandler).handle(textFrame.text()); } else if (frame instanceof CloseWebSocketFrame) { diff --git a/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSSubscription.java b/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSSubscription.java new file mode 100644 index 0000000..9c7f63f --- /dev/null +++ b/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSSubscription.java @@ -0,0 +1,29 @@ +package com.cf.client.poloniex.wss.model; + +import com.google.gson.Gson; + +/** + * + * @author David + */ +public class PoloniexWSSSubscription { + + public final static transient PoloniexWSSSubscription TICKER = new PoloniexWSSSubscription("1002"); + public final static transient PoloniexWSSSubscription HEARTBEAT = new PoloniexWSSSubscription("1010"); + public final static transient PoloniexWSSSubscription BASE_COIN_DAILY_VOLUME_STATS = new PoloniexWSSSubscription("1003"); + public final static transient PoloniexWSSSubscription USDT_BTC = new PoloniexWSSSubscription("121"); + public final static transient PoloniexWSSSubscription USDT_ETH = new PoloniexWSSSubscription("149"); + + public final String command; + public final String channel; + + public PoloniexWSSSubscription(String channel) { + this.command = "subscribe"; + this.channel = channel; + } + + @Override + public String toString() { + return new Gson().toJson(this); + } +} diff --git a/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSTicker.java b/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSTicker.java new file mode 100644 index 0000000..5e1c4f2 --- /dev/null +++ b/src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSTicker.java @@ -0,0 +1,111 @@ +package com.cf.client.poloniex.wss.model; + +import com.google.gson.Gson; +import java.math.BigDecimal; + +/** + * + * @author David + */ +public class PoloniexWSSTicker { + + public final Double currencyPair; + public final BigDecimal lastPrice; + public final BigDecimal lowestAsk; + public final BigDecimal highestBid; + public final BigDecimal percentChange; + public final BigDecimal baseVolume; + public final BigDecimal quoteVolume; + public final Boolean isFrozen; + public final BigDecimal twentyFourHourHigh; + public final BigDecimal twentyFourHourLow; + + public PoloniexWSSTicker(Double currencyPair, BigDecimal lastPrice, BigDecimal lowestAsk, BigDecimal highestBid, BigDecimal percentChange, BigDecimal baseVolume, BigDecimal quoteVolume, Boolean isFrozen, BigDecimal twentyFourHourHigh, BigDecimal twentyFourHourLow) { + this.currencyPair = currencyPair; + this.lastPrice = lastPrice; + this.lowestAsk = lowestAsk; + this.highestBid = highestBid; + this.percentChange = percentChange; + this.baseVolume = baseVolume; + this.quoteVolume = quoteVolume; + this.isFrozen = isFrozen; + this.twentyFourHourHigh = twentyFourHourHigh; + this.twentyFourHourLow = twentyFourHourLow; + } + + @Override + public String toString() { + return new Gson().toJson(this); + } + + public static class PoloniexWSSTickerBuilder { + + private Double currencyPair; + private BigDecimal lastPrice; + private BigDecimal lowestAsk; + private BigDecimal highestBid; + private BigDecimal percentChange; + private BigDecimal baseVolume; + private BigDecimal quoteVolume; + private Boolean isFrozen; + private BigDecimal twentyFourHourHigh; + private BigDecimal twentyFourHourLow; + + public PoloniexWSSTickerBuilder() { + } + + public PoloniexWSSTickerBuilder setCurrencyPair(Double currencyPair) { + this.currencyPair = currencyPair; + return this; + } + + public PoloniexWSSTickerBuilder setLastPrice(BigDecimal lastPrice) { + this.lastPrice = lastPrice; + return this; + } + + public PoloniexWSSTickerBuilder setLowestAsk(BigDecimal lowestAsk) { + this.lowestAsk = lowestAsk; + return this; + } + + public PoloniexWSSTickerBuilder setHighestBid(BigDecimal highestBid) { + this.highestBid = highestBid; + return this; + } + + public PoloniexWSSTickerBuilder setPercentChange(BigDecimal percentChange) { + this.percentChange = percentChange; + return this; + } + + public PoloniexWSSTickerBuilder setBaseVolume(BigDecimal baseVolume) { + this.baseVolume = baseVolume; + return this; + } + + public PoloniexWSSTickerBuilder setQuoteVolume(BigDecimal quoteVolume) { + this.quoteVolume = quoteVolume; + return this; + } + + public PoloniexWSSTickerBuilder setIsFrozen(Boolean isFrozen) { + this.isFrozen = isFrozen; + return this; + } + + public PoloniexWSSTickerBuilder setTwentyFourHourHigh(BigDecimal twentyFourHourHigh) { + this.twentyFourHourHigh = twentyFourHourHigh; + return this; + } + + public PoloniexWSSTickerBuilder setTwentyFourHourLow(BigDecimal twentyFourHourLow) { + this.twentyFourHourLow = twentyFourHourLow; + return this; + } + + public PoloniexWSSTicker buildPoloniexTicker() { + return new PoloniexWSSTicker(currencyPair, lastPrice, lowestAsk, highestBid, percentChange, baseVolume, quoteVolume, isFrozen, twentyFourHourHigh, twentyFourHourLow); + } + } +} diff --git a/src/main/java/com/cf/client/wss/handler/SubscriptionMessageHandler.java b/src/main/java/com/cf/client/wss/handler/IMessageHandler.java similarity index 69% rename from src/main/java/com/cf/client/wss/handler/SubscriptionMessageHandler.java rename to src/main/java/com/cf/client/wss/handler/IMessageHandler.java index 03f9bd5..d4907a9 100644 --- a/src/main/java/com/cf/client/wss/handler/SubscriptionMessageHandler.java +++ b/src/main/java/com/cf/client/wss/handler/IMessageHandler.java @@ -5,6 +5,6 @@ * * @author David */ -public interface SubscriptionMessageHandler { +public interface IMessageHandler { public void handle(String message); } diff --git a/src/main/java/com/cf/client/wss/handler/LoggingSubscriptionMessageHandler.java b/src/main/java/com/cf/client/wss/handler/LoggerMessageHandler.java similarity index 77% rename from src/main/java/com/cf/client/wss/handler/LoggingSubscriptionMessageHandler.java rename to src/main/java/com/cf/client/wss/handler/LoggerMessageHandler.java index c5244e5..e065018 100644 --- a/src/main/java/com/cf/client/wss/handler/LoggingSubscriptionMessageHandler.java +++ b/src/main/java/com/cf/client/wss/handler/LoggerMessageHandler.java @@ -7,7 +7,7 @@ * * @author David */ -public class LoggingSubscriptionMessageHandler implements SubscriptionMessageHandler { +public class LoggerMessageHandler implements IMessageHandler { private final static Logger LOG = LogManager.getLogger(); diff --git a/src/main/java/com/cf/client/wss/handler/OrderBookMessageHandler.java b/src/main/java/com/cf/client/wss/handler/OrderBookMessageHandler.java new file mode 100644 index 0000000..0ab01b5 --- /dev/null +++ b/src/main/java/com/cf/client/wss/handler/OrderBookMessageHandler.java @@ -0,0 +1,14 @@ +package com.cf.client.wss.handler; + +/** + * + * @author David + */ +public class OrderBookMessageHandler implements IMessageHandler { + + @Override + public void handle(String message) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + +} diff --git a/src/main/java/com/cf/client/wss/handler/TickerMessageHandler.java b/src/main/java/com/cf/client/wss/handler/TickerMessageHandler.java new file mode 100644 index 0000000..18f2ebc --- /dev/null +++ b/src/main/java/com/cf/client/wss/handler/TickerMessageHandler.java @@ -0,0 +1,50 @@ +package com.cf.client.wss.handler; + +import com.cf.client.poloniex.wss.model.PoloniexWSSTicker; +import com.google.gson.Gson; +import java.math.BigDecimal; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * + * @author David + */ +public class TickerMessageHandler implements IMessageHandler { + + private final static Logger LOG = LogManager.getLogger(); + private final Gson gson; + + public TickerMessageHandler() { + this.gson = new Gson(); + } + + @Override + public void handle(String message) { + LOG.trace(message); + PoloniexWSSTicker ticker = this.mapMessageToPoloniexTicker(message); + LOG.trace(ticker); + + } + + protected PoloniexWSSTicker mapMessageToPoloniexTicker(String message) { + List results = gson.fromJson(message, List.class); + if (results.size() < 3) return null; + + List olhc = (List) results.get(2); + return new PoloniexWSSTicker.PoloniexWSSTickerBuilder() + .setCurrencyPair((Double) olhc.get(0)) + .setLastPrice(new BigDecimal((String) olhc.get(1))) + .setLowestAsk(new BigDecimal((String) olhc.get(2))) + .setHighestBid(new BigDecimal((String) olhc.get(3))) + .setPercentChange(new BigDecimal((String) olhc.get(4))) + .setBaseVolume(new BigDecimal((String) olhc.get(5))) + .setQuoteVolume(new BigDecimal((String) olhc.get(6))) + .setIsFrozen(((double) olhc.get(7)) == 1) + .setTwentyFourHourHigh(new BigDecimal((String) olhc.get(8))) + .setTwentyFourHourLow(new BigDecimal((String) olhc.get(9))) + .buildPoloniexTicker(); + } + +} diff --git a/src/main/java/com/cf/client/wss/subscription/Subscription.java b/src/main/java/com/cf/client/wss/subscription/Subscription.java deleted file mode 100644 index 5d75bbf..0000000 --- a/src/main/java/com/cf/client/wss/subscription/Subscription.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.cf.client.wss.subscription; - -import com.google.gson.Gson; - -/** - * - * @author David - */ -public class Subscription { - - public final static transient Subscription TICKER = new Subscription("subscribe", "1002"); - public final static transient Subscription HEARTBEAT = new Subscription("subscribe", "1010"); - public final static transient Subscription BASE_COIN_DAILY_VOLUME_STATS = new Subscription("subscribe", "1003"); - public final static transient Subscription USDT_BTC = new Subscription("subscribe", "121"); - public final static transient Subscription USDT_ETH = new Subscription("subscribe", "149"); - - public final String command; - public final String channel; - - public Subscription(String command, String channel) { - this.command = command; - this.channel = channel; - } - - @Override - public String toString() { - return new Gson().toJson(this); - } -} diff --git a/src/main/java/com/cf/example/PoloniexWSSClientExample.java b/src/main/java/com/cf/example/PoloniexWSSClientExample.java index efe35f8..9f89214 100644 --- a/src/main/java/com/cf/example/PoloniexWSSClientExample.java +++ b/src/main/java/com/cf/example/PoloniexWSSClientExample.java @@ -1,8 +1,9 @@ package com.cf.example; import com.cf.client.WSSClient; -import com.cf.client.wss.subscription.Subscription; -import com.cf.client.wss.handler.LoggingSubscriptionMessageHandler; +import com.cf.client.poloniex.wss.model.PoloniexWSSSubscription; +import com.cf.client.wss.handler.LoggerMessageHandler; +import com.cf.client.wss.handler.TickerMessageHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,7 +33,8 @@ public void run() throws Exception { try (WSSClient wssClient = new WSSClient(ENDPOINT_URL)) { - wssClient.addSubscription(Subscription.USDT_ETH, new LoggingSubscriptionMessageHandler()); + wssClient.addSubscription(PoloniexWSSSubscription.USDT_ETH, new LoggerMessageHandler()); + wssClient.addSubscription(PoloniexWSSSubscription.TICKER, new TickerMessageHandler()); wssClient.run(60000); } } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 28db264..ef596bc 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -11,9 +11,6 @@ - - -