-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
15 changed files
with
492 additions
and
206 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,96 +1,117 @@ | ||
package com.cf.client; | ||
|
||
import com.cf.data.handler.poloniex.PoloniexSubscription; | ||
import com.cf.data.handler.poloniex.PoloniexSubscriptionExceptionHandler; | ||
import com.cf.client.poloniex.wss.model.PoloniexWSSSubscription; | ||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import com.cf.client.poloniex.PoloniexWSSClientRouter; | ||
|
||
import io.netty.bootstrap.Bootstrap; | ||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelPipeline; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.nio.NioEventLoopGroup; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioSocketChannel; | ||
import io.netty.handler.codec.http.HttpClientCodec; | ||
import io.netty.handler.codec.http.HttpObjectAggregator; | ||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.WebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; | ||
import io.netty.handler.ssl.SslContext; | ||
import io.netty.handler.ssl.SslContextBuilder; | ||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; | ||
import java.net.URISyntaxException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import rx.functions.Action1; | ||
import ws.wamp.jawampa.ApplicationError; | ||
import ws.wamp.jawampa.PubSubData; | ||
import ws.wamp.jawampa.WampClient; | ||
import ws.wamp.jawampa.WampClientBuilder; | ||
import ws.wamp.jawampa.transport.netty.NettyWampClientConnectorProvider; | ||
import java.util.stream.Collectors; | ||
import com.cf.client.wss.handler.IMessageHandler; | ||
|
||
/** | ||
* | ||
* @author David | ||
* @author thiko | ||
*/ | ||
public class WSSClient implements AutoCloseable | ||
{ | ||
private final static Logger LOG = LogManager.getLogger(); | ||
private final WampClient wampClient; | ||
private final Map<String, Action1<PubSubData>> subscriptions; | ||
|
||
public WSSClient(String uri, String realm) throws ApplicationError, Exception | ||
{ | ||
this.subscriptions = new HashMap<>(); | ||
WampClientBuilder builder = new WampClientBuilder(); | ||
builder.withConnectorProvider(new NettyWampClientConnectorProvider()) | ||
.withUri(uri) | ||
.withRealm(realm) | ||
.withInfiniteReconnects() | ||
.withReconnectInterval(5, TimeUnit.SECONDS); | ||
|
||
wampClient = builder.build(); | ||
public class WSSClient implements AutoCloseable { | ||
|
||
private static final int MAX_CONTENT_BYTES = 8192; | ||
private static final String SCHEME_WSS = "wss"; | ||
|
||
private final URI uri; | ||
private final SslContext sslCtx; | ||
private final EventLoopGroup group; | ||
|
||
private Map<PoloniexWSSSubscription, IMessageHandler> subscriptions; | ||
|
||
public WSSClient(String url) throws Exception { | ||
uri = new URI(url); | ||
|
||
if (!SCHEME_WSS.equalsIgnoreCase(uri.getScheme())) { | ||
throw new IllegalArgumentException("Only WSS is supported"); | ||
} | ||
|
||
// FIXME: use secure trust manager | ||
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); | ||
group = new NioEventLoopGroup(); | ||
subscriptions = new HashMap<>(); | ||
} | ||
|
||
public void subscribe(PoloniexSubscription feedEventHandler) | ||
{ | ||
this.subscriptions.put(feedEventHandler.feedName, feedEventHandler); | ||
/** | ||
* * | ||
* | ||
* @param subscription | ||
* @param subscriptionMessageHandler | ||
*/ | ||
public void addSubscription(PoloniexWSSSubscription subscription, IMessageHandler subscriptionMessageHandler) { | ||
this.subscriptions.put(subscription, subscriptionMessageHandler); | ||
} | ||
|
||
/*** | ||
* | ||
* @param runTimeInMillis The subscription time expressed in milliseconds. The minimum runtime is 1 minute. | ||
/** | ||
* * | ||
* | ||
* @param runTimeInMillis The subscription time expressed in milliseconds. | ||
* The minimum runtime is 1 minute. | ||
* @throws InterruptedException | ||
* @throws IOException | ||
* @throws java.net.URISyntaxException | ||
*/ | ||
public void run(long runTimeInMillis) | ||
{ | ||
try | ||
{ | ||
wampClient.statusChanged() | ||
.subscribe((WampClient.State newState) | ||
-> | ||
{ | ||
if (newState instanceof WampClient.ConnectedState) | ||
{ | ||
LOG.trace("Connected"); | ||
|
||
for (Entry<String, Action1<PubSubData>> subscription : this.subscriptions.entrySet()) | ||
{ | ||
wampClient.makeSubscription(subscription.getKey()).subscribe(subscription.getValue(), new PoloniexSubscriptionExceptionHandler(subscription.getKey())); | ||
} | ||
} | ||
else if (newState instanceof WampClient.DisconnectedState) | ||
{ | ||
LOG.trace("Disconnected"); | ||
} | ||
else if (newState instanceof WampClient.ConnectingState) | ||
{ | ||
LOG.trace("Connecting..."); | ||
} | ||
}); | ||
|
||
wampClient.open(); | ||
long startTime = System.currentTimeMillis(); | ||
|
||
while (wampClient.getTerminationFuture().isDone() == false && (startTime + runTimeInMillis > System.currentTimeMillis())) | ||
{ | ||
TimeUnit.MINUTES.sleep(1); | ||
public void run(long runTimeInMillis) throws InterruptedException, IOException, URISyntaxException { | ||
|
||
final PoloniexWSSClientRouter router = new PoloniexWSSClientRouter(uri, subscriptions.entrySet().stream() | ||
.collect(Collectors.toMap((Map.Entry<PoloniexWSSSubscription, IMessageHandler> e) -> Double.parseDouble(e.getKey().channel), Map.Entry::getValue))); | ||
|
||
Bootstrap b = new Bootstrap(); | ||
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { | ||
@Override | ||
protected void initChannel(SocketChannel ch) { | ||
ChannelPipeline p = ch.pipeline(); | ||
p.addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), 443)); | ||
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(MAX_CONTENT_BYTES), | ||
WebSocketClientCompressionHandler.INSTANCE, router); | ||
} | ||
}); | ||
|
||
Channel ch = b.connect(uri.getHost(), 443).sync().channel(); | ||
router.handshakeFuture().sync(); | ||
|
||
for (Entry<PoloniexWSSSubscription, IMessageHandler> subscription : subscriptions.entrySet()) { | ||
WebSocketFrame frame = new TextWebSocketFrame(subscription.getKey().toString()); | ||
ch.writeAndFlush(frame); | ||
} | ||
catch (Exception ex) | ||
{ | ||
LOG.error(("Caught exception - " + ex.getMessage()), ex); | ||
|
||
long startTime = System.currentTimeMillis(); | ||
|
||
while (router.isRunning() == true && (startTime + runTimeInMillis > System.currentTimeMillis())) { | ||
TimeUnit.MINUTES.sleep(1); | ||
} | ||
|
||
throw new InterruptedException("Runtime exceeded"); | ||
} | ||
|
||
@Override | ||
public void close() throws Exception | ||
{ | ||
wampClient.close().toBlocking().last(); | ||
public void close() throws Exception { | ||
group.shutdownGracefully(); | ||
} | ||
} |
124 changes: 124 additions & 0 deletions
124
src/main/java/com/cf/client/poloniex/PoloniexWSSClientRouter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
package com.cf.client.poloniex; | ||
|
||
import com.cf.client.wss.handler.LoggerMessageHandler; | ||
import com.google.gson.Gson; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelPromise; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import io.netty.handler.codec.http.DefaultHttpHeaders; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; | ||
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; | ||
import io.netty.handler.codec.http.websocketx.WebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; | ||
import io.netty.handler.codec.http.websocketx.WebSocketVersion; | ||
import io.netty.util.CharsetUtil; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import com.cf.client.wss.handler.IMessageHandler; | ||
|
||
public class PoloniexWSSClientRouter extends SimpleChannelInboundHandler<Object> { | ||
|
||
private final static Logger LOG = LogManager.getLogger(); | ||
private static final int MAX_FRAME_LENGTH = 1262144; | ||
|
||
private final WebSocketClientHandshaker handshaker; | ||
private ChannelPromise handshakeFuture; | ||
private boolean running; | ||
|
||
private Map<Double, IMessageHandler> subscriptions; | ||
private final IMessageHandler defaultSubscriptionMessageHandler; | ||
private final Gson gson; | ||
|
||
public PoloniexWSSClientRouter(URI url, Map<Double, IMessageHandler> subscriptions) throws URISyntaxException { | ||
this(WebSocketClientHandshakerFactory | ||
.newHandshaker(url, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), MAX_FRAME_LENGTH), subscriptions); | ||
} | ||
|
||
public PoloniexWSSClientRouter(WebSocketClientHandshaker handshaker, Map<Double, IMessageHandler> subscriptions) { | ||
this.handshaker = handshaker; | ||
this.subscriptions = subscriptions; | ||
this.defaultSubscriptionMessageHandler = new LoggerMessageHandler(); | ||
this.gson = new Gson(); | ||
} | ||
|
||
public ChannelFuture handshakeFuture() { | ||
return handshakeFuture; | ||
} | ||
|
||
@Override | ||
public void handlerAdded(ChannelHandlerContext ctx) { | ||
handshakeFuture = ctx.newPromise(); | ||
} | ||
|
||
@Override | ||
public void channelActive(ChannelHandlerContext ctx) { | ||
handshaker.handshake(ctx.channel()); | ||
} | ||
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | ||
LOG.trace("WebSocket Client disconnected!"); | ||
} | ||
|
||
@Override | ||
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
Channel ch = ctx.channel(); | ||
if (!handshaker.isHandshakeComplete()) { | ||
try { | ||
handshaker.finishHandshake(ch, (FullHttpResponse) msg); | ||
running = true; | ||
LOG.trace("WebSocket Client connected!"); | ||
handshakeFuture.setSuccess(); | ||
} catch (WebSocketHandshakeException e) { | ||
LOG.trace("WebSocket Client failed to connect"); | ||
running = false; | ||
handshakeFuture.setFailure(e); | ||
} | ||
return; | ||
} | ||
|
||
if (msg instanceof FullHttpResponse) { | ||
FullHttpResponse response = (FullHttpResponse) msg; | ||
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" | ||
+ response.content().toString(CharsetUtil.UTF_8) + ')'); | ||
} | ||
|
||
WebSocketFrame frame = (WebSocketFrame) msg; | ||
if (frame instanceof TextWebSocketFrame) { | ||
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; | ||
LOG.trace("WebSocket Client received message: " + textFrame.text()); | ||
List results = this.gson.fromJson(textFrame.text(), List.class); | ||
this.subscriptions.getOrDefault(results.get(0), this.defaultSubscriptionMessageHandler).handle(textFrame.text()); | ||
|
||
} else if (frame instanceof CloseWebSocketFrame) { | ||
LOG.trace("WebSocket Client received closing"); | ||
running = false; | ||
ch.close(); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | ||
LOG.error(cause); | ||
if (!handshakeFuture.isDone()) { | ||
handshakeFuture.setFailure(cause); | ||
} | ||
running = false; | ||
ctx.close(); | ||
} | ||
|
||
public boolean isRunning() { | ||
return running; | ||
} | ||
} |
Oops, something went wrong.