Skip to content

Commit

Permalink
Merge pull request miyurud#247 from Sakeerthan/master
Browse files Browse the repository at this point in the history
Fix metadb table updates for Native Store creation
  • Loading branch information
miyurud authored Jun 23, 2024
2 parents 4eeb486 + 6963aa4 commit 70a0d57
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
20 changes: 16 additions & 4 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ static void remove_graph_command(std::string masterIP, int connFd, SQLiteDBInter
static void add_model_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, cppkafka::Configuration &configs,
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions, bool *loop_exit_p);
vector<DataPublisher *> &workerClients, int numberOfPartitions,
SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p);
static void process_dataset_command(int connFd, bool *loop_exit_p);
static void triangles_command(std::string masterIP, int connFd, SQLiteDBInterface *sqlite,
Expand Down Expand Up @@ -186,7 +187,7 @@ void *frontendservicesesion(void *dummyPt) {
workerClientsInitialized = true;
}
add_stream_kafka_command(connFd, kafka_server_IP, configs, kstream, input_stream_handler, workerClients,
numberOfPartitions, &loop_exit);
numberOfPartitions, sqlite, &loop_exit);
} else if (line.compare(STOP_STREAM_KAFKA) == 0) {
stop_stream_kafka_command(connFd, kstream, &loop_exit);
} else if (line.compare(RMGR) == 0) {
Expand Down Expand Up @@ -1125,7 +1126,7 @@ static void add_model_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_
static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, cppkafka::Configuration &configs,
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions,
bool *loop_exit_p) {
SQLiteDBInterface *sqlite, bool *loop_exit_p) {
string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?";
int result_wr = write(connFd, msg_1.c_str(), msg_1.length());
if (result_wr < 0) {
Expand All @@ -1150,9 +1151,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
c = tolower(c);
}
// use default kafka consumer details
string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER
if (user_res_s == "y") {
kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host");
configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", "knnect"}};
configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}};
} else {
// user need to start relevant kafka cluster using relevant IP address
// read relevant IP address from given file path
Expand Down Expand Up @@ -1235,6 +1237,16 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients);

string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
string uploadStartTime = ctime(&time);
string sqlStatement =
"INSERT INTO graph (name,upload_path,upload_start_time,upload_end_time,graph_status_idgraph_status,"
"vertexcount,centralpartitioncount,edgecount) VALUES(\"" +
topic_name_s + "\", \"" + path + "\", \"" + uploadStartTime + "\", \"\",\"" +
to_string(Conts::GRAPH_STATUS::STREAMING) + "\", \"\", \"\", \"\")";
int newGraphID = sqlite->runInsert(sqlStatement);

frontend_logger.info("Start listening to " + topic_name_s);
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}
Expand Down
7 changes: 4 additions & 3 deletions src/util/Conts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ int Conts::SCHEDULER_SLEEP_TIME = 2;
int Conts::STREAMING_STRAIN_GAP = 10;

const int Conts::GRAPH_STATUS::LOADING = 1;
const int Conts::GRAPH_STATUS::OPERATIONAL = 2;
const int Conts::GRAPH_STATUS::DELETING = 3;
const int Conts::GRAPH_STATUS::NONOPERATIONAL = 4;
const int Conts::GRAPH_STATUS::STREAMING = 2;
const int Conts::GRAPH_STATUS::OPERATIONAL = 3;
const int Conts::GRAPH_STATUS::DELETING = 4;
const int Conts::GRAPH_STATUS::NONOPERATIONAL = 5;

const std::string Conts::TRAIN_STATUS::TRAINED = "trained";
const std::string Conts::TRAIN_STATUS::NOT_TRAINED = "not_trained";
Expand Down
1 change: 1 addition & 0 deletions src/util/Conts.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class Conts {

struct GRAPH_STATUS {
static const int LOADING; // Graph partitions are being uploaded
static const int STREAMING;
static const int
OPERATIONAL; // Graph is uploaded and all its partitions are accessible in the current hosts setting
static const int DELETING; // Graph partitions are being deleted
Expand Down

0 comments on commit 70a0d57

Please sign in to comment.