Skip to content

Commit

Permalink
select from collection implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vinoja98 committed Sep 8, 2022
1 parent 62059f4 commit a8f1d51
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 58 deletions.
47 changes: 47 additions & 0 deletions component/.mvn/local-settings.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<settings>
<mirrors>
<mirror>
<id>wso2.releases</id>
<mirrorOf>wso2.releases</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
</mirror>
<mirror>
<id>wso2.snapshots</id>
<mirrorOf>wso2.snapshots</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
</mirror>
<mirror>
<id>wso2-nexus</id>
<mirrorOf>wso2-nexus</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
</mirror>
</mirrors>
</settings>

<!--<settings xmlns="http://maven.apache.org/POM/4.0.0"-->
<!-- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"-->
<!-- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">-->
<!-- <mirrors>-->
<!-- <mirror>-->
<!-- <id>wso2.releases</id>-->
<!-- <mirrorOf>wso2.releases</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>-->
<!-- </mirror>-->
<!-- <mirror>-->
<!-- <id>wso2.snapshots</id>-->
<!-- <mirrorOf>wso2.snapshots</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>-->
<!-- </mirror>-->
<!-- <mirror>-->
<!-- <id>wso2-nexus</id>-->
<!-- <mirrorOf>wso2-nexus</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>-->
<!-- </mirror>-->
<!-- </mirrors>-->
<!--</settings>-->
1 change: 1 addition & 0 deletions component/.mvn/maven.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--settings ./.mvn/local-settings.xml
1 change: 1 addition & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>
<groupId>io.siddhi.extension.io.live</groupId>
<artifactId>siddhi-io-live-parent</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.siddhi.extension.io.live.source;

import com.arangodb.velocypack.VPackSlice;
import com.c8db.C8DB;
import com.c8db.http.HTTPEndPoint;
import com.c8db.http.HTTPMethod;
import com.c8db.http.HTTPRequest;
import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
Expand All @@ -19,14 +24,10 @@
import java.util.HashMap;
import java.util.Map;

import com.arangodb.ArangoDBException;
import com.arangodb.DbName;
import com.arangodb.internal.http.HttpConnection;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.mapping.ArangoJack;
import com.arangodb.velocystream.Request;
import com.arangodb.velocystream.RequestType;
import com.arangodb.velocystream.Response;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.security.auth.login.CredentialException;

/**
* This is a sample class-level comment, explaining what the extension class does.
Expand Down Expand Up @@ -90,15 +91,28 @@
type = {DataType.INT, DataType.BOOL, DataType.STRING, DataType.DOUBLE, }),
type = {DataType.INT, DataType.BOOL, DataType.STRING, DataType.DOUBLE, }),*/
@Parameter(
name = "sql",
name = "sql.query",
description = "The SQL select query",
type = DataType.STRING,
dynamic = true
)
),
@Parameter(
name = "host.name",
description = "The Hostname",
type = DataType.STRING,
dynamic = true
),
@Parameter(
name = "api.key",
description = "The api Key",
type = DataType.STRING,
dynamic = true
)
},
examples = {
@Example(
syntax = "@source(type = 'live', sql='Select * from table', " +
syntax = "@source(type = 'live', sql.query='Select * from table', host.name='api-varden-example'," +
"\napi.key = 'apikey-xxxxxxxxx', " +
"\n@map(type='keyvalue'), @attributes(id = 'id', name = 'name'))" +
"\ndefine stream inputStream (id int, name string)",
description = "In this example, the Live source executes the select query. The" +
Expand All @@ -108,7 +122,13 @@
)
// for more information refer https://siddhi.io/en/v5.0/docs/query-guide/#source
public class LiveSource extends Source {
private static final Logger logger = LogManager.getLogger(LiveSource.class);
private String siddhiAppName;
private String selectQuery;
private String hostName;
private String apiKey;
protected SourceEventListener sourceEventListener;
protected String[] requestedTransportPropertyNames;
/**
* The initialization method for {@link Source}, will be called before other methods. It used to validate
* all configurations and to get initial values.
Expand All @@ -127,11 +147,13 @@ public StateFactory init(SourceEventListener sourceEventListener, OptionHolder o
String[] requestedTransportPropertyNames, ConfigReader configReader,
SiddhiAppContext siddhiAppContext) {
String streamName = sourceEventListener.getStreamDefinition().getId();
String selectQuery;
Map<String, String> deploymentConfigMap = new HashMap();
deploymentConfigMap.putAll(configReader.getAllConfigs());
siddhiAppName = siddhiAppContext.getName();
selectQuery = deploymentConfigMap.get(LiveSourceConstants.SQLQUERY);
this.selectQuery = optionHolder.validateAndGetOption(LiveSourceConstants.SQLQUERY).getValue();
this.hostName = optionHolder.validateAndGetOption(LiveSourceConstants.HOSTNAME).getValue();
this.apiKey = optionHolder.validateAndGetOption(LiveSourceConstants.APIKEY).getValue();
this.requestedTransportPropertyNames = requestedTransportPropertyNames.clone();
return null;
}

Expand Down Expand Up @@ -165,37 +187,41 @@ protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
* @throws ConnectionUnavailableException if it cannot connect to the source backend immediately.
*/
@Override
public void connect(ConnectionCallback connectionCallback, State state) throws ConnectionUnavailableException {
HttpConnection arangoHttpConnection = new HttpConnection.Builder()
.useSsl(true)
.host(new HostDescription("api-varden-4f0f3c4f.paas.macrometa.io", 443))
.serializationUtil(new ArangoJack())
.build();
arangoHttpConnection.setJwt("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEuNjYxOTI2NjY5MjU4OTYxNWUrNiwiZXhwIjoxNjYxOTY5ODY5LCJpc3MiOiJtYWNyb21ldGEiLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJyb290Iiwic3ViIjoibWFkdTE0MF9nbWFpbC5jb20iLCJ0ZW5hbnQiOiJtYWR1MTQwX2dtYWlsLmNvbSJ9.dVQccQomvpT2VktQJtvvuLKrdeART38Ek4Y6V5tzrB4=");
System.out.println(arangoHttpConnection.toString());
Request req = new Request(DbName.SYSTEM, RequestType.GET,"/_db/_system/_api/database");

while(true) {
Response res;
try {
res = arangoHttpConnection.execute(req);
System.out.println(res.getBody().toString());

} catch (ArangoDBException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

try {
// res.wait();
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
public void connect(ConnectionCallback connectionCallback,State state) throws ConnectionUnavailableException {
C8DB db;
try {
db = new C8DB.Builder()
.hostName(hostName)
.port(443)
.apiKey(apiKey)
.build();
} catch (CredentialException e) {
throw new RuntimeException(e);
}

HTTPEndPoint endPoint = new HTTPEndPoint("/_api/collection/network_traffic/count");

HTTPRequest request = new HTTPRequest.Builder()
.RequestType(HTTPMethod.GET)
.EndPoint(endPoint)
.build();

try {
VPackSlice responseBody = db.execute(request);
} catch (IOException e) {
throw new RuntimeException(e);
}

VPackSlice r;
try {
r = db.execute(request);
} catch (IOException e) {
throw new RuntimeException(e);
}
// System.out.println(r.toString());
logger.info("Event " + r.toString());
sourceEventListener.onEvent(r.toString(), requestedTransportPropertyNames);

}

/**
Expand Down Expand Up @@ -230,4 +256,5 @@ public void destroy() {

}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.siddhi.extension.io.live.utils;

public class LiveSourceConstants {
public static final String SQLQUERY = "sql";
public static final String SQLQUERY = "sql.query";
public static final String HOSTNAME = "host.name";
public static final String APIKEY = "api.key";
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,101 @@
package io.siddhi.extension.io.live.source;

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.query.output.callback.QueryCallback;
import io.siddhi.core.util.EventPrinter;
import io.siddhi.core.util.SiddhiTestHelper;
import io.siddhi.core.util.persistence.InMemoryPersistenceStore;
import io.siddhi.core.util.persistence.PersistenceStore;
import io.siddhi.extension.map.xml.sourcemapper.XmlSourceMapper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Testcase of LiveSource.
*/
public class TestCaseOfLiveSource {
// If you will know about this related testcase,
//refer https://github.com/siddhi-io/siddhi-io-file/blob/master/component/src/test
private static final Logger logObj = (Logger) LogManager.getLogger(TestCaseOfLiveSource.class);
private AtomicInteger eventCount = new AtomicInteger(0);
private int waitTime = 50;
private int timeout = 30000;

@BeforeMethod
public void init() {
eventCount.set(0);
}

/**
* Creating test.
*
* @throws Exception Interrupted exception
*/
@Test
public void liveSelect() throws Exception {
logObj.info(" Creating test for publishing events without URL.");
// URI baseURI = URI.create(String.format("http://%s:%d", "0.0.0.0", 8280));
List<String> receivedEventNameList = new ArrayList<>(2);
PersistenceStore persistenceStore = new InMemoryPersistenceStore();
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setPersistenceStore(persistenceStore);
siddhiManager.setExtension("xml-input-mapper", XmlSourceMapper.class);
String inStreamDefinition = "@App:name('TestSiddhiApp')" +
"@source(type='live', sql.query='select count from network_traffic', host.name='api-varden-4f0f3c4f.paas.macrometa.io'," +
"api.key = 'madu140_gmail.com.AccessPortal.2PL8EeyIAMn2sx7YHKWMM58tmJLES4NyIWq6Cnsj0BTMjygJyF3b14zb2sidcauXccccb8'," +
" @map(type='keyvalue'), @attributes(id = 'id', name = 'name'))," +
"define stream inputStream (count int)";
String query = ("@info(name = 'query') "
+ "from inputStream "
+ "select * "
+ "insert into outputStream;"
);
SiddhiAppRuntime siddhiAppRuntime = siddhiManager
.createSiddhiAppRuntime(inStreamDefinition + query);

siddhiAppRuntime.addCallback("query", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
for (Event event : inEvents) {
eventCount.incrementAndGet();
receivedEventNameList.add(event.getData(0).toString());
}
}
});
siddhiAppRuntime.start();
// publishing events
List<String> expected = new ArrayList<>(1);
expected.add("99");
// expected.add("Mike");
// String event1 = "<events>"
// + "<event>"
// + "<name>John</name>"
// + "<age>100</age>"
// + "<country>AUS</country>"
// + "</event>"
// + "</events>";
// String event2 = "<events>"
// + "<event>"
// + "<name>Mike</name>"
// + "<age>20</age>"
// + "<country>USA</country>"
// + "</event>"
// + "</events>";
// HttpTestUtil.httpPublishEventDefault(event1, baseURI);
// HttpTestUtil.httpPublishEventDefault(event2, baseURI);
SiddhiTestHelper.waitForEvents(waitTime, 1, eventCount, timeout);
Assert.assertEquals(receivedEventNameList.toString(), expected.toString());
siddhiAppRuntime.shutdown();
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
12 changes: 0 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,4 @@
<siddhi.map.xml.version>5.0.2</siddhi.map.xml.version>
<jacoco.plugin.version>0.7.9</jacoco.plugin.version>
</properties>


<dependencies>


<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>6.18.0</version>
</dependency>

</dependencies>
</project>

0 comments on commit a8f1d51

Please sign in to comment.