Skip to content

Commit

Permalink
Merge pull request #11 from FYP-Live-Query/InitialData
Browse files Browse the repository at this point in the history
Initial data
  • Loading branch information
MaheshMadushan authored Apr 18, 2023
2 parents b33d449 + 6a1432c commit c8a41d0
Show file tree
Hide file tree
Showing 25 changed files with 147 additions and 905 deletions.
36 changes: 18 additions & 18 deletions component/.mvn/local-settings.xml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
<settings>
<mirrors>
<!-- <mirror>-->
<!-- <id>wso2.releases</id>-->
<!-- <mirrorOf>wso2.releases</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/repositories/releases/</url>-->
<!-- </mirror>-->
<!-- <mirror>-->
<!-- <id>wso2.snapshots</id>-->
<!-- <mirrorOf>wso2.snapshots</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>-->
<!-- </mirror>-->
<!-- <mirror>-->
<!-- <id>wso2-nexus</id>-->
<!-- <mirrorOf>wso2-nexus</mirrorOf>-->
<!-- <name></name>-->
<!-- <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>-->
<!-- </mirror>-->
<mirror>
<id>wso2.releases</id>
<mirrorOf>wso2.releases</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
</mirror>
<mirror>
<id>wso2.snapshots</id>
<mirrorOf>wso2.snapshots</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url>
</mirror>
<mirror>
<id>wso2-nexus</id>
<mirrorOf>wso2-nexus</mirrorOf>
<name></name>
<url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
</mirror>
<mirror>
<id>maven</id>
<mirrorOf>maven</mirrorOf>
Expand Down
10 changes: 10 additions & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -140,6 +145,11 @@
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
</dependencies>

<profiles>
Expand Down
133 changes: 106 additions & 27 deletions component/src/main/java/io/siddhi/extension/io/live/source/DBThread.java
Original file line number Diff line number Diff line change
@@ -1,45 +1,124 @@
//package io.siddhi.extension.io.live.source;
//
//import com.c8db.C8Cursor;
//import com.c8db.C8DB;
//import com.c8db.entity.BaseDocument;
//import com.google.gson.Gson;
//import io.siddhi.core.stream.input.source.SourceEventListener;
//import io.siddhi.extension.io.live.source.Thread.AbstractThread;
//import io.siddhi.extension.io.live.utils.Monitor;
////import net.minidev.json.JSONObject;
//import lombok.Builder;
//import org.apache.tapestry5.json.JSONObject;
//
//@Builder
//public class DBThread extends AbstractThread {
// private final SourceEventListener sourceEventListener;
// private String hostName;
// private int port = 443;
// private String apiKey;
// private String user = "root";
// private String selectSQL;
//
// @Override
// public void run() {
// final C8DB c8db = new C8DB.Builder().useSsl(true).host(hostName , port).apiKey(apiKey).user(user).build();
// final C8Cursor<BaseDocument> cursor = c8db.db(null , "_system")
// .query(selectSQL, null, null, BaseDocument.class);
//
// while (cursor.hasNext() && isThreadRunning) {
// if(isPaused) {
// System.out.println("paused - DB thread");
// doPause();
// }
// Gson gson = new Gson();
// String json = gson.toJson(cursor.next());
//
// JSONObject jsonObject = new JSONObject(json);
// JSONObject properties = jsonObject.getJSONObject("properties");
// properties.put("initial_data", "true");
// json = jsonObject.toString();
//
// sourceEventListener.onEvent(json , null);
// }
// }
//}
package io.siddhi.extension.io.live.source;

import com.c8db.C8Cursor;
import com.c8db.C8DB;
import com.c8db.entity.BaseDocument;
import com.google.gson.Gson;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.live.source.Thread.AbstractThread;
import io.siddhi.extension.io.live.utils.Monitor;
//import net.minidev.json.JSONObject;
import lombok.Builder;
import org.apache.tapestry5.json.JSONObject;
//import org.json.JSONObject;
import net.minidev.json.JSONObject;

import java.sql.*;

@Builder
public class DBThread extends AbstractThread {
private final SourceEventListener sourceEventListener;
private String hostName;
private int port = 443;
private String apiKey;
private String user = "root";
private int port;
private String username;
private String password;
private String dbName;
private String selectSQL;

@Override
public void run() {
final C8DB c8db = new C8DB.Builder().useSsl(true).host(hostName , port).apiKey(apiKey).user(user).build();
final C8Cursor<BaseDocument> cursor = c8db.db(null , "_system")
.query(selectSQL, null, null, BaseDocument.class);

while (cursor.hasNext() && isThreadRunning) {
if(isPaused) {
System.out.println("paused - DB thread");
doPause();
}
Gson gson = new Gson();
String json = gson.toJson(cursor.next());
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;

JSONObject jsonObject = new JSONObject(json);
JSONObject properties = jsonObject.getJSONObject("properties");
properties.put("initial_data", "true");
json = jsonObject.toString();
try {
// Create a connection to the MySQL database
String jdbcUrl = "jdbc:mysql://" + hostName + ":" + port + "/" + dbName;

sourceEventListener.onEvent(json , null);
connection = DriverManager.getConnection(jdbcUrl, username, password);
statement = connection.createStatement();
// Execute the selectSQL query and process the results
String select = selectSQL.replaceAll("@\\w+", "");
System.out.println("query: "+select);
resultSet = statement.executeQuery(select);
while (resultSet.next() && isThreadRunning) {
if (isPaused) {
System.out.println("paused - DB thread");
doPause();
}
// Convert the row to a JSON object and add the "initial_data" property
JSONObject jsonObject = new JSONObject();
int numColumns = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= numColumns; i++) {
String columnName = resultSet.getMetaData().getColumnName(i);
Object columnValue = resultSet.getObject(i);
System.out.println("col"+columnValue);
jsonObject.put(columnName, columnValue);
}
jsonObject.put("initial_data", "true");
JSONObject jsonObject2 = new JSONObject(jsonObject);
JSONObject properties = new JSONObject();
properties.put("properties",jsonObject2);
String json = properties.toString();
//
// Send the event to the Siddhi source listener
sourceEventListener.onEvent(json, null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// Close the database resources
try {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,16 @@ public void connect(ConnectionCallback connectionCallback , State state) throws
.activeConsumerRecordHandler(new ActiveConsumerRecordHandler<>())
.build();

// dbThread = DBThread.builder()
// .sourceEventListener(sourceEventListener)
// .apiKey(apiKey)
// .port(443)
// .selectSQL(selectQuery)
// .hostName(hostName)
// .user("root")
// .build();
// Thread threadDB = new Thread(dbThread, "Initial database thread");
dbThread = DBThread.builder()
.sourceEventListener(sourceEventListener)
.port(3306)
.selectSQL(selectQuery)
.hostName("10.8.100.246")
.username("root")
.password("debezium")
.dbName("inventory")
.build();
Thread threadDB = new Thread(dbThread, "Initial database thread");

consumerThread = StreamThread.builder()
.sourceEventListener(sourceEventListener)
Expand All @@ -235,10 +236,10 @@ public void connect(ConnectionCallback connectionCallback , State state) throws


threadCon.start();
// threadDB.start();
threadDB.start();
try {
threadCon.join();
// threadDB.join();
threadDB.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void SQLtoSiddhiQLCompilerWithDebeziumMySQLTest() throws InterruptedExcep
System.out.println(siddhiAppString);

persistenceStore.save("SiddhiApp-dev-test","table.name",siddhiApp.getTableName().getBytes());
persistenceStore.save("SiddhiApp-dev-test","database.name","database".getBytes());
persistenceStore.save("SiddhiApp-dev-test","database.name","inventory".getBytes());
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiAppString);

siddhiAppRuntime.addCallback("SQL-SiddhiQL-dev-test", new QueryCallback() {
Expand Down
Loading

0 comments on commit c8a41d0

Please sign in to comment.