Skip to content

Commit

Permalink
finished ticker handler, reorganized packages
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCookieLab committed Mar 12, 2018
1 parent a3d6bf0 commit 29170e4
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 55 deletions.
16 changes: 8 additions & 8 deletions src/main/java/com/cf/client/WSSClient.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.cf.client;

import com.cf.client.wss.subscription.Subscription;
import com.cf.client.wss.handler.SubscriptionMessageHandler;
import com.cf.client.poloniex.wss.model.PoloniexWSSSubscription;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;

import com.cf.client.wss.router.WebSocketClientRouter;
import com.cf.client.poloniex.PoloniexWSSClientRouter;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -29,6 +28,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import com.cf.client.wss.handler.IMessageHandler;

/**
*
Expand All @@ -43,7 +43,7 @@ public class WSSClient implements AutoCloseable {
private final SslContext sslCtx;
private final EventLoopGroup group;

private Map<Subscription, SubscriptionMessageHandler> subscriptions;
private Map<PoloniexWSSSubscription, IMessageHandler> subscriptions;

public WSSClient(String url) throws Exception {
uri = new URI(url);
Expand All @@ -64,7 +64,7 @@ public WSSClient(String url) throws Exception {
* @param subscription
* @param subscriptionMessageHandler
*/
public void addSubscription(Subscription subscription, SubscriptionMessageHandler subscriptionMessageHandler) {
public void addSubscription(PoloniexWSSSubscription subscription, IMessageHandler subscriptionMessageHandler) {
this.subscriptions.put(subscription, subscriptionMessageHandler);
}

Expand All @@ -79,8 +79,8 @@ public void addSubscription(Subscription subscription, SubscriptionMessageHandle
*/
public void run(long runTimeInMillis) throws InterruptedException, IOException, URISyntaxException {

final WebSocketClientRouter router = new WebSocketClientRouter(uri, subscriptions.entrySet().stream()
.collect(Collectors.toMap((Map.Entry<Subscription, SubscriptionMessageHandler> e) -> Double.parseDouble(e.getKey().channel), Map.Entry::getValue)));
final PoloniexWSSClientRouter router = new PoloniexWSSClientRouter(uri, subscriptions.entrySet().stream()
.collect(Collectors.toMap((Map.Entry<PoloniexWSSSubscription, IMessageHandler> e) -> Double.parseDouble(e.getKey().channel), Map.Entry::getValue)));

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
Expand All @@ -96,7 +96,7 @@ protected void initChannel(SocketChannel ch) {
Channel ch = b.connect(uri.getHost(), 443).sync().channel();
router.handshakeFuture().sync();

for (Entry<Subscription, SubscriptionMessageHandler> subscription : subscriptions.entrySet()) {
for (Entry<PoloniexWSSSubscription, IMessageHandler> subscription : subscriptions.entrySet()) {
WebSocketFrame frame = new TextWebSocketFrame(subscription.getKey().toString());
ch.writeAndFlush(frame);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.cf.client.wss.router;
package com.cf.client.poloniex;

import com.cf.client.wss.handler.LoggingSubscriptionMessageHandler;
import com.cf.client.wss.handler.SubscriptionMessageHandler;
import com.cf.client.wss.handler.LoggerMessageHandler;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -25,8 +24,9 @@
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import com.cf.client.wss.handler.IMessageHandler;

public class WebSocketClientRouter extends SimpleChannelInboundHandler<Object> {
public class PoloniexWSSClientRouter extends SimpleChannelInboundHandler<Object> {

private final static Logger LOG = LogManager.getLogger();
private static final int MAX_FRAME_LENGTH = 1262144;
Expand All @@ -35,18 +35,20 @@ public class WebSocketClientRouter extends SimpleChannelInboundHandler<Object> {
private ChannelPromise handshakeFuture;
private boolean running;

private Map<Double, SubscriptionMessageHandler> subscriptions;
private final SubscriptionMessageHandler defaultSubscriptionMessageHandler;
private Map<Double, IMessageHandler> subscriptions;
private final IMessageHandler defaultSubscriptionMessageHandler;
private final Gson gson;

public WebSocketClientRouter(URI url, Map<Double, SubscriptionMessageHandler> subscriptions) throws URISyntaxException {
public PoloniexWSSClientRouter(URI url, Map<Double, IMessageHandler> subscriptions) throws URISyntaxException {
this(WebSocketClientHandshakerFactory
.newHandshaker(url, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), MAX_FRAME_LENGTH), subscriptions);
}

public WebSocketClientRouter(WebSocketClientHandshaker handshaker, Map<Double, SubscriptionMessageHandler> subscriptions) {
public PoloniexWSSClientRouter(WebSocketClientHandshaker handshaker, Map<Double, IMessageHandler> subscriptions) {
this.handshaker = handshaker;
this.subscriptions = subscriptions;
this.defaultSubscriptionMessageHandler = new LoggingSubscriptionMessageHandler();
this.defaultSubscriptionMessageHandler = new LoggerMessageHandler();
this.gson = new Gson();
}

public ChannelFuture handshakeFuture() {
Expand Down Expand Up @@ -95,7 +97,7 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
LOG.trace("WebSocket Client received message: " + textFrame.text());
List<Map> results = new Gson().fromJson(textFrame.text(), List.class);
List<Map> results = this.gson.fromJson(textFrame.text(), List.class);
this.subscriptions.getOrDefault(results.get(0), this.defaultSubscriptionMessageHandler).handle(textFrame.text());

} else if (frame instanceof CloseWebSocketFrame) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.cf.client.poloniex.wss.model;

import com.google.gson.Gson;

/**
*
* @author David
*/
public class PoloniexWSSSubscription {

public final static transient PoloniexWSSSubscription TICKER = new PoloniexWSSSubscription("1002");
public final static transient PoloniexWSSSubscription HEARTBEAT = new PoloniexWSSSubscription("1010");
public final static transient PoloniexWSSSubscription BASE_COIN_DAILY_VOLUME_STATS = new PoloniexWSSSubscription("1003");
public final static transient PoloniexWSSSubscription USDT_BTC = new PoloniexWSSSubscription("121");
public final static transient PoloniexWSSSubscription USDT_ETH = new PoloniexWSSSubscription("149");

public final String command;
public final String channel;

public PoloniexWSSSubscription(String channel) {
this.command = "subscribe";
this.channel = channel;
}

@Override
public String toString() {
return new Gson().toJson(this);
}
}
111 changes: 111 additions & 0 deletions src/main/java/com/cf/client/poloniex/wss/model/PoloniexWSSTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.cf.client.poloniex.wss.model;

import com.google.gson.Gson;
import java.math.BigDecimal;

/**
*
* @author David
*/
public class PoloniexWSSTicker {

public final Double currencyPair;
public final BigDecimal lastPrice;
public final BigDecimal lowestAsk;
public final BigDecimal highestBid;
public final BigDecimal percentChange;
public final BigDecimal baseVolume;
public final BigDecimal quoteVolume;
public final Boolean isFrozen;
public final BigDecimal twentyFourHourHigh;
public final BigDecimal twentyFourHourLow;

public PoloniexWSSTicker(Double currencyPair, BigDecimal lastPrice, BigDecimal lowestAsk, BigDecimal highestBid, BigDecimal percentChange, BigDecimal baseVolume, BigDecimal quoteVolume, Boolean isFrozen, BigDecimal twentyFourHourHigh, BigDecimal twentyFourHourLow) {
this.currencyPair = currencyPair;
this.lastPrice = lastPrice;
this.lowestAsk = lowestAsk;
this.highestBid = highestBid;
this.percentChange = percentChange;
this.baseVolume = baseVolume;
this.quoteVolume = quoteVolume;
this.isFrozen = isFrozen;
this.twentyFourHourHigh = twentyFourHourHigh;
this.twentyFourHourLow = twentyFourHourLow;
}

@Override
public String toString() {
return new Gson().toJson(this);
}

public static class PoloniexWSSTickerBuilder {

private Double currencyPair;
private BigDecimal lastPrice;
private BigDecimal lowestAsk;
private BigDecimal highestBid;
private BigDecimal percentChange;
private BigDecimal baseVolume;
private BigDecimal quoteVolume;
private Boolean isFrozen;
private BigDecimal twentyFourHourHigh;
private BigDecimal twentyFourHourLow;

public PoloniexWSSTickerBuilder() {
}

public PoloniexWSSTickerBuilder setCurrencyPair(Double currencyPair) {
this.currencyPair = currencyPair;
return this;
}

public PoloniexWSSTickerBuilder setLastPrice(BigDecimal lastPrice) {
this.lastPrice = lastPrice;
return this;
}

public PoloniexWSSTickerBuilder setLowestAsk(BigDecimal lowestAsk) {
this.lowestAsk = lowestAsk;
return this;
}

public PoloniexWSSTickerBuilder setHighestBid(BigDecimal highestBid) {
this.highestBid = highestBid;
return this;
}

public PoloniexWSSTickerBuilder setPercentChange(BigDecimal percentChange) {
this.percentChange = percentChange;
return this;
}

public PoloniexWSSTickerBuilder setBaseVolume(BigDecimal baseVolume) {
this.baseVolume = baseVolume;
return this;
}

public PoloniexWSSTickerBuilder setQuoteVolume(BigDecimal quoteVolume) {
this.quoteVolume = quoteVolume;
return this;
}

public PoloniexWSSTickerBuilder setIsFrozen(Boolean isFrozen) {
this.isFrozen = isFrozen;
return this;
}

public PoloniexWSSTickerBuilder setTwentyFourHourHigh(BigDecimal twentyFourHourHigh) {
this.twentyFourHourHigh = twentyFourHourHigh;
return this;
}

public PoloniexWSSTickerBuilder setTwentyFourHourLow(BigDecimal twentyFourHourLow) {
this.twentyFourHourLow = twentyFourHourLow;
return this;
}

public PoloniexWSSTicker buildPoloniexTicker() {
return new PoloniexWSSTicker(currencyPair, lastPrice, lowestAsk, highestBid, percentChange, baseVolume, quoteVolume, isFrozen, twentyFourHourHigh, twentyFourHourLow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
*
* @author David
*/
public interface SubscriptionMessageHandler {
public interface IMessageHandler {
public void handle(String message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*
* @author David
*/
public class LoggingSubscriptionMessageHandler implements SubscriptionMessageHandler {
public class LoggerMessageHandler implements IMessageHandler {

private final static Logger LOG = LogManager.getLogger();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.cf.client.wss.handler;

/**
*
* @author David
*/
public class OrderBookMessageHandler implements IMessageHandler {

@Override
public void handle(String message) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}

}
50 changes: 50 additions & 0 deletions src/main/java/com/cf/client/wss/handler/TickerMessageHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.cf.client.wss.handler;

import com.cf.client.poloniex.wss.model.PoloniexWSSTicker;
import com.google.gson.Gson;
import java.math.BigDecimal;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
*
* @author David
*/
public class TickerMessageHandler implements IMessageHandler {

private final static Logger LOG = LogManager.getLogger();
private final Gson gson;

public TickerMessageHandler() {
this.gson = new Gson();
}

@Override
public void handle(String message) {
LOG.trace(message);
PoloniexWSSTicker ticker = this.mapMessageToPoloniexTicker(message);
LOG.trace(ticker);

}

protected PoloniexWSSTicker mapMessageToPoloniexTicker(String message) {
List results = gson.fromJson(message, List.class);
if (results.size() < 3) return null;

List olhc = (List) results.get(2);
return new PoloniexWSSTicker.PoloniexWSSTickerBuilder()
.setCurrencyPair((Double) olhc.get(0))
.setLastPrice(new BigDecimal((String) olhc.get(1)))
.setLowestAsk(new BigDecimal((String) olhc.get(2)))
.setHighestBid(new BigDecimal((String) olhc.get(3)))
.setPercentChange(new BigDecimal((String) olhc.get(4)))
.setBaseVolume(new BigDecimal((String) olhc.get(5)))
.setQuoteVolume(new BigDecimal((String) olhc.get(6)))
.setIsFrozen(((double) olhc.get(7)) == 1)
.setTwentyFourHourHigh(new BigDecimal((String) olhc.get(8)))
.setTwentyFourHourLow(new BigDecimal((String) olhc.get(9)))
.buildPoloniexTicker();
}

}
29 changes: 0 additions & 29 deletions src/main/java/com/cf/client/wss/subscription/Subscription.java

This file was deleted.

8 changes: 5 additions & 3 deletions src/main/java/com/cf/example/PoloniexWSSClientExample.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.cf.example;

import com.cf.client.WSSClient;
import com.cf.client.wss.subscription.Subscription;
import com.cf.client.wss.handler.LoggingSubscriptionMessageHandler;
import com.cf.client.poloniex.wss.model.PoloniexWSSSubscription;
import com.cf.client.wss.handler.LoggerMessageHandler;
import com.cf.client.wss.handler.TickerMessageHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -32,7 +33,8 @@ public void run() throws Exception
{
try (WSSClient wssClient = new WSSClient(ENDPOINT_URL))
{
wssClient.addSubscription(Subscription.USDT_ETH, new LoggingSubscriptionMessageHandler());
wssClient.addSubscription(PoloniexWSSSubscription.USDT_ETH, new LoggerMessageHandler());
wssClient.addSubscription(PoloniexWSSSubscription.TICKER, new TickerMessageHandler());
wssClient.run(60000);
}
}
Expand Down
Loading

0 comments on commit 29170e4

Please sign in to comment.