Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/wso2/siddhi
Browse files Browse the repository at this point in the history
  • Loading branch information
tishan89 committed Mar 29, 2017
2 parents 7bb7363 + dd44e08 commit 525ed88
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void init(StreamDefinition streamDefinition, String type, OptionHolder tr
* Will be called for initialing the {@link OutputTransport}
*
* @param outputStreamDefinition
* @param optionHolder Option holder containing static and dynamic options related to the {@link OutputTransport}
* @param optionHolder Option holder containing static and dynamic options related to the {@link OutputTransport}
* @param executionPlanContext
*/
protected abstract void init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, ExecutionPlanContext executionPlanContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public void init() {
eventArrived = false;
}


@Test
public void testTcpInputTransport1() throws InterruptedException {
log.info("tcpInputTransport TestCase 1");
Expand Down Expand Up @@ -323,7 +322,7 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
executionPlanRuntime.shutdown();
}

@Test(expected = ExecutionPlanCreationException.class)
@Test
public void testTcpInputTransport4() throws InterruptedException {
ExecutionPlanRuntime executionPlanRuntime = null;
try {
Expand All @@ -341,9 +340,9 @@ public void testTcpInputTransport4() throws InterruptedException {
"select * " +
"insert into outputStream;");
executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query);


executionPlanRuntime.start();
} catch (ExecutionPlanCreationException e) {
Assert.assertNotNull(e);
} finally {
if (executionPlanRuntime != null) {
executionPlanRuntime.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.wso2.siddhi.tcp.transport.TCPNettyClient;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

@Extension(
name = "tcp",
Expand All @@ -47,6 +48,7 @@ public class TCPOutputTransport extends OutputTransport {
private String host;
private int port;
private Option streamIdOption;
private AtomicBoolean connected = new AtomicBoolean(false);

@Override
public String[] getSupportedDynamicOptions() {
Expand All @@ -63,20 +65,26 @@ protected void init(StreamDefinition outputStreamDefinition, OptionHolder option

@Override
public void connect() throws ConnectionUnavailableException {
log.info("TCPOutputTransport:connect()");
TCPNettyClient.connect(host, port);

}

@Override
public void publish(Object payload, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
if (connected.compareAndSet(false, true)) {
log.info("TCPOutputTransport:connect()");
TCPNettyClient.connect(host, port);
}
String streamId = streamIdOption.getValue(dynamicOptions);
TCPNettyClient.send(streamId, (Event[]) payload);
}

@Override
public void disconnect() {
if (TCPNettyClient != null) {
TCPNettyClient.disconnect();
if (connected.compareAndSet(true, false)) {
if (TCPNettyClient != null) {
log.info("TCPOutputTransport:disconnect()");
TCPNettyClient.disconnect();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
Expand Down Expand Up @@ -643,7 +642,7 @@ public void testTcpOutputTransport10() throws InterruptedException {

Assert.assertFalse(eventArrived);
executionPlanRuntime.shutdown();
}catch (Throwable t){
} catch (Throwable t) {
t.printStackTrace();
}

Expand Down Expand Up @@ -701,9 +700,7 @@ public void testTcpOutputTransport12() throws InterruptedException {
}
}

@Ignore
@Test
// till flow control is fixed
public void testTcpOutputTransport13() throws InterruptedException {
log.info("tcpInputTransport TestCase 13");
SiddhiManager siddhiManager = new SiddhiManager();
Expand Down Expand Up @@ -742,6 +739,8 @@ public void onEvent(Event event) {
System.out.println(event);
eventArrived = true;
count++;
/*
commenting this out since we cannot guarantee an event order here
switch (count) {
case 1:
Assert.assertEquals("test", event.getData(0));
Expand All @@ -763,7 +762,8 @@ public void onEvent(Event event) {
break;
default:
org.junit.Assert.fail();
}
}*/
}

@Override
Expand Down Expand Up @@ -925,7 +925,7 @@ public void testTcpOutputTransport15() throws InterruptedException {
"@plan:name('foo') " +
"define stream inputStream (a string, b int, c float, d long, e double, f bool); " +
"@sink(type='tcp', context='foo/inputStream1') " +
"define stream outputStream(a string, b int, c float, d long, e double, f bool);" ;
"define stream outputStream(a string, b int, c float, d long, e double, f bool);";
String query = ("@info(name = 'query1') " +
"" +
"from inputStream " +
Expand Down Expand Up @@ -995,5 +995,81 @@ public void onEvents(Event[] events) {

}

@Test
public void testTcpOutputTransport16() throws InterruptedException {
log.info("tcpInputTransport TestCase 16");
SiddhiManager siddhiManager = new SiddhiManager();

String inStreamDefinition = "" +
"define stream inputStream (a string, b int, c float, d long, e double, f bool); " +
"@sink(type='tcp', context='foo', @map(type='passThrough')) " +
"define stream outputStream (a string, b int, c float, d long, e double, f bool);";
String query = ("@info(name = 'query1') " +
"from inputStream " +
"select * " +
"insert into outputStream;");
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(inStreamDefinition + query);

StreamDefinition streamDefinition = StreamDefinition.id("foo").attribute("a", Attribute.Type.STRING)
.attribute("b", Attribute.Type.INT).attribute("c", Attribute.Type.FLOAT).attribute("d", Attribute.Type.LONG)
.attribute("e", Attribute.Type.DOUBLE).attribute("f", Attribute.Type.BOOL);

TCPNettyServer tcpNettyServer = new TCPNettyServer();
tcpNettyServer.addStreamListener(new StreamListener() {
@Override
public StreamDefinition getStreamDefinition() {
return streamDefinition;
}

@Override
public void onEvent(Event event) {
System.out.println(event);
eventArrived = true;
count++;
switch (count) {
case 1:
Assert.assertEquals("test", event.getData(0));
break;
case 2:
Assert.assertEquals("test1", event.getData(0));
break;
case 3:
Assert.assertEquals("test2", event.getData(0));
break;
default:
org.junit.Assert.fail();
}
}

@Override
public void onEvents(Event[] events) {
for (Event event : events) {
onEvent(event);
}
}
});


executionPlanRuntime.start();
Thread.sleep(2000);
tcpNettyServer.bootServer(new ServerConfig());

InputHandler inputHandler = executionPlanRuntime.getInputHandler("inputStream");

ArrayList<Event> arrayList = new ArrayList<Event>();
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test", 36, 3.0f, 380l, 23.0, true}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test1", 361, 31.0f, 3801l, 231.0, false}));
arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"test2", 362, 32.0f, 3802l, 232.0, true}));
inputHandler.send(arrayList.toArray(new Event[3]));

Thread.sleep(300);

Assert.assertEquals(3, count);
Assert.assertTrue(eventArrived);
executionPlanRuntime.shutdown();
tcpNettyServer.shutdownGracefully();

}

}

Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public void isPaused(boolean paused) {
* an internal {@link CircularFifoQueue} with a user defined size (default is
* {@link org.wso2.siddhi.tcp.transport.utils.Constant#DEFAULT_QUEUE_SIZE_OF_TCP_TRANSPORT}).
*/

class FlowController extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext channelHandlerContext;
private final CircularFifoQueue<Object> queue;
Expand Down Expand Up @@ -166,12 +165,12 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) {

if (!paused) {
// deque the messages if the transport is not paused
queue.forEach(e -> {
Object e;
while ((e = queue.poll()) != null) {
if (!(e instanceof EmptyByteBuf)) {
ctx.fireChannelRead(e);
}
});
queue.clear();
}
}
}

Expand Down

0 comments on commit 525ed88

Please sign in to comment.