Skip to content

Commit

Permalink
Merge pull request #218 from singnet/issue-166-link-creation-main-fea…
Browse files Browse the repository at this point in the history
…tures

[#das166] Create Link Agent main features
  • Loading branch information
eddiebrissow authored Jan 28, 2025
2 parents b64e05a + 884d681 commit 4b7203b
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 46 deletions.
127 changes: 117 additions & 10 deletions src/cpp/link_creation_agent/agent.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#include "agent.h"
#include "RemoteIterator.h"

#include <fstream>
#include <sstream>

#include "RemoteIterator.h"

using namespace std;
using namespace link_creation_agent;
using namespace query_node;
Expand All @@ -14,38 +15,91 @@ LinkCreationAgent::LinkCreationAgent(string config_path) {
load_config();
link_creation_node_server = new LinkCreationNode(link_creation_server_id);
query_node_client = new DASNode(query_node_client_id, query_node_server_id);
service = new LinkCreationService(thread_count);
das_client = new ServerNode(das_client_id, query_node_server_id);

this->agent_thread = new thread(&LinkCreationAgent::run, this);
}

LinkCreationAgent::~LinkCreationAgent() {
stop();
delete link_creation_node_server;
delete query_node_client;
delete service;
delete das_client;
}

void LinkCreationAgent::stop() {
save_buffer();
link_creation_node_server->graceful_shutdown();
query_node_client->graceful_shutdown();
das_client->graceful_shutdown();
if (agent_thread != NULL && agent_thread->joinable()) {
agent_thread->join();
agent_thread = NULL;
}
}

void LinkCreationAgent::run() {
int current_buffer_position = 0;
while (true) {
this_thread::sleep_for(chrono::seconds(loop_interval));
LinkCreationAgentRequest* lca_request = NULL;
if (!link_creation_node_server->is_query_empty()) {
auto request = link_creation_node_server->pop_request();
cout << "Request: " << request << endl;
vector<string> request = link_creation_node_server->pop_request();
lca_request = handle_request(request);
if (lca_request != NULL && (lca_request->infinite || lca_request->repeat > 0)) {
request_buffer.push_back(*lca_request);
}
} else {
if (!request_buffer.empty()) {
current_buffer_position = current_buffer_position % request_buffer.size();
lca_request = &request_buffer[current_buffer_position];
current_buffer_position++;
}
}

if (lca_request == NULL) {
this_thread::sleep_for(chrono::seconds(loop_interval));
continue;
}

if (lca_request->last_execution == 0 ||
lca_request->last_execution + lca_request->current_interval < time(0)) {
shared_ptr<RemoteIterator> iterator =
query(lca_request->query, lca_request->context, lca_request->update_attention_broker);
// cout << "Request query: " << lca_request->query[0] << endl;
// cout << "Request link_template: " << lca_request->link_template[0] << endl;
// cout << "Request max_results: " << lca_request->max_results << endl;
// cout << "Request repeat: " << lca_request->repeat << endl;
// cout << "Request current_interval: " << lca_request->current_interval << endl;
// cout << "Request last_execution: " << lca_request->last_execution << endl;

service->process_request(iterator, das_client, lca_request->link_template);

if (lca_request->infinite || lca_request->repeat > 0) {
lca_request->last_execution = time(0);
lca_request->current_interval = (lca_request->current_interval * 2) % 86400; // Add exponential backoff, resets after 24 hours
if (lca_request->repeat > 1) {
lca_request->repeat--;
} else {
// TODO check for memory leaks here
request_buffer.erase(request_buffer.begin() + current_buffer_position - 1);
}
} else {
delete lca_request;
}
}

cout << "Running" << endl;
}
}

void LinkCreationAgent::clean_requests() {}

shared_ptr<RemoteIterator> LinkCreationAgent::query() { return NULL; }
shared_ptr<RemoteIterator> LinkCreationAgent::query(vector<string>& query_tokens,
string context,
bool update_attention_broker) {
return shared_ptr<RemoteIterator>(
query_node_client->pattern_matcher_query(query_tokens, context, update_attention_broker));
}

void LinkCreationAgent::load_config() {
ifstream file(config_path);
Expand All @@ -70,14 +124,67 @@ void LinkCreationAgent::load_config() {
this->link_creation_server_id = value;
else if (key == "das_client_id")
this->das_client_id = value;
else if (key == "requests_buffer_file")
this->requests_buffer_file = value;
else if (key == "context")
this->context = value;
}
}
}
file.close();
}

void LinkCreationAgent::save_buffer() {}
void LinkCreationAgent::save_buffer() {
ofstream file(requests_buffer_file, ios::binary);
for (LinkCreationAgentRequest request : request_buffer) {
file.write((char*) &request, sizeof(request));
}
file.close();
}

void LinkCreationAgent::load_buffer() {
ifstream file(requests_buffer_file, ios::binary);
while (file) {
LinkCreationAgentRequest request;
file.read((char*) &request, sizeof(request));
request_buffer.push_back(request);
}
file.close();
}

void LinkCreationAgent::load_buffer() {}
LinkCreationAgentRequest* LinkCreationAgent::handle_request(vector<string> request) {
try {
LinkCreationAgentRequest* lca_request = new LinkCreationAgentRequest();
int cursor = 0;
bool is_link_create = false;
for (string arg : request) {
cursor++;
is_link_create = (arg == "LINK_CREATE") ^ is_link_create;
if (!is_link_create) {
lca_request->query.push_back(arg);
}
if (is_link_create && cursor < request.size() - 3) {
lca_request->link_template.push_back(arg);
}
if (cursor == request.size() - 3) {
lca_request->max_results = stoi(arg);
}
if (cursor == request.size() - 2) {
lca_request->repeat = stoi(arg);
}
if (cursor == request.size() - 1) {
lca_request->context = arg;
}
if (cursor == request.size()) {
lca_request->update_attention_broker = (arg == "true");
}
}
lca_request->infinite = (lca_request->repeat == -1);
lca_request->current_interval = default_interval;

void LinkCreationAgent::handleRequest(string request) {}
return lca_request;
} catch (exception& e) {
cout << "Error parsing request: " << e.what() << endl;
return NULL;
}
}
16 changes: 10 additions & 6 deletions src/cpp/link_creation_agent/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ namespace link_creation_agent
{
struct LinkCreationAgentRequest
{
string query;
string link_template;
vector<string> query;
vector<string> link_template;
int max_results = 1000;
int repeat = 1;
long last_execution;
long last_execution = 0;
int current_interval;
bool infinite = false;
string context = "";
bool update_attention_broker = false;
};

class LinkCreationAgent
Expand All @@ -59,7 +61,7 @@ namespace link_creation_agent
* @brief Sends a query to DAS Query Agent
* @returns Returns a shared_ptr<RemoteIterator>, to iterate through the requests
*/
shared_ptr<RemoteIterator> query();
shared_ptr<RemoteIterator> query(vector<string> &query_tokens, string context, bool update_attention_broker);
/**
* @brief Load config file
*/
Expand All @@ -76,7 +78,7 @@ namespace link_creation_agent
* @brief Handle the create link request.
* @param request String received from a DAS Node server
*/
void handleRequest(string request);
LinkCreationAgentRequest* handle_request(vector<string> request);

void stop();

Expand All @@ -88,11 +90,13 @@ namespace link_creation_agent
string query_node_server_id; // ID of the query node server
string link_creation_server_id; // ID of the link creation server
string das_client_id; // ID of the DAS client
string requests_buffer_file;
string context;


// Other attributes
LinkCreationService *service;
vector<LinkCreationAgentRequest> *request_buffer;
vector<LinkCreationAgentRequest> request_buffer;
query_engine::DASNode *query_node_client;
LinkCreationNode *link_creation_node_server;
das::ServerNode *das_client;
Expand Down
10 changes: 3 additions & 7 deletions src/cpp/link_creation_agent/das_link_creation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ LinkCreationNode::~LinkCreationNode()

}

string LinkCreationNode::pop_request()
vector<string> LinkCreationNode::pop_request()
{
return request_queue.dequeue();
}
Expand All @@ -33,7 +33,7 @@ bool LinkCreationNode::is_shutting_down()
return shutting_down;
}

void LinkCreationNode::add_request(string &request)
void LinkCreationNode::add_request(vector<string> request)
{
request_queue.enqueue(request);
}
Expand Down Expand Up @@ -65,9 +65,5 @@ void LinkCreationRequest::act(shared_ptr<MessageFactory> node)
{
auto link_node = dynamic_pointer_cast<LinkCreationNode>(node);
string request;
for (auto arg : this->args)
{
request += arg + " ";
}
link_node->add_request(request);
link_node->add_request(this->args);
}
6 changes: 3 additions & 3 deletions src/cpp/link_creation_agent/das_link_creation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LinkCreationNode : public StarNode {
/**
* @brief Retrieves the next request
*/
string pop_request();
vector<string> pop_request();
/**
* @brief Return true if the request's queue is empty
*/
Expand All @@ -31,14 +31,14 @@ class LinkCreationNode : public StarNode {
*/
bool is_shutting_down();

void add_request(string& request);
void add_request(vector<string> request);

string to_string() { return "LinkCreationNode"; }

virtual shared_ptr<Message> message_factory(string& command, vector<string>& args);

private:
Queue<string> request_queue;
Queue<vector<string>> request_queue;
const string CREATE_LINK = "create_link";
bool shutting_down = false;
};
Expand Down
23 changes: 23 additions & 0 deletions src/cpp/link_creation_agent/das_server_node.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "das_server_node.h"

using namespace das;
using namespace std;
using namespace distributed_algorithm_node;

ServerNode::ServerNode(const string &node_id, const string &server_id) : StarNode(node_id)
{
}

ServerNode::~ServerNode()
{
DistributedAlgorithmNode::graceful_shutdown();
}

void ServerNode::create_link(vector<string> &request)
{
cout << "Creating link" << endl;
for (string token : request)
{
cout << token << endl;
}
}
3 changes: 1 addition & 2 deletions src/cpp/link_creation_agent/das_server_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ namespace das {
* Destructor
*/
~ServerNode();
void create_link(string &request);
void create_link(vector<string> &request);
private:
const string CREATE_ATOM = "create_atom";
Queue<string> requests_queue;


};
Expand Down
Loading

0 comments on commit 4b7203b

Please sign in to comment.