From cb8533d00f2afa59d141630478f575710783bdef Mon Sep 17 00:00:00 2001 From: Edgar Brissow Date: Tue, 28 Jan 2025 12:12:07 -0300 Subject: [PATCH 1/2] Core funcionalities ok --- src/cpp/link_creation_agent/agent.cc | 127 ++++++++++++++++-- src/cpp/link_creation_agent/agent.h | 16 ++- .../das_link_creation_node.cc | 10 +- .../das_link_creation_node.h | 6 +- .../link_creation_agent/das_server_node.cc | 23 ++++ src/cpp/link_creation_agent/das_server_node.h | 3 +- src/cpp/link_creation_agent/link.cc | 93 +++++++++++++ src/cpp/link_creation_agent/link.h | 6 +- src/cpp/link_creation_agent/service.h | 10 +- src/cpp/link_creation_agent/servive.cc | 36 +++++ src/cpp/link_creation_agent/thread_pool.h | 77 +++++++++-- 11 files changed, 361 insertions(+), 46 deletions(-) create mode 100644 src/cpp/link_creation_agent/das_server_node.cc create mode 100644 src/cpp/link_creation_agent/link.cc create mode 100644 src/cpp/link_creation_agent/servive.cc diff --git a/src/cpp/link_creation_agent/agent.cc b/src/cpp/link_creation_agent/agent.cc index 0149c25..eea3c38 100644 --- a/src/cpp/link_creation_agent/agent.cc +++ b/src/cpp/link_creation_agent/agent.cc @@ -1,9 +1,10 @@ #include "agent.h" -#include "RemoteIterator.h" #include #include +#include "RemoteIterator.h" + using namespace std; using namespace link_creation_agent; using namespace query_node; @@ -14,6 +15,8 @@ LinkCreationAgent::LinkCreationAgent(string config_path) { load_config(); link_creation_node_server = new LinkCreationNode(link_creation_server_id); query_node_client = new DASNode(query_node_client_id, query_node_server_id); + service = new LinkCreationService(thread_count); + das_client = new ServerNode(das_client_id, query_node_server_id); this->agent_thread = new thread(&LinkCreationAgent::run, this); } @@ -21,10 +24,16 @@ LinkCreationAgent::LinkCreationAgent(string config_path) { LinkCreationAgent::~LinkCreationAgent() { stop(); delete link_creation_node_server; + delete query_node_client; + delete service; + delete das_client; } void LinkCreationAgent::stop() { + save_buffer(); link_creation_node_server->graceful_shutdown(); + query_node_client->graceful_shutdown(); + das_client->graceful_shutdown(); if (agent_thread != NULL && agent_thread->joinable()) { agent_thread->join(); agent_thread = NULL; @@ -32,20 +41,65 @@ void LinkCreationAgent::stop() { } void LinkCreationAgent::run() { + int current_buffer_position = 0; while (true) { - this_thread::sleep_for(chrono::seconds(loop_interval)); + LinkCreationAgentRequest* lca_request = NULL; if (!link_creation_node_server->is_query_empty()) { - auto request = link_creation_node_server->pop_request(); - cout << "Request: " << request << endl; + vector request = link_creation_node_server->pop_request(); + lca_request = handle_request(request); + if (lca_request != NULL && (lca_request->infinite || lca_request->repeat > 0)) { + request_buffer.push_back(*lca_request); + } + } else { + if (!request_buffer.empty()) { + current_buffer_position = current_buffer_position % request_buffer.size(); + lca_request = &request_buffer[current_buffer_position]; + current_buffer_position++; + } + } + + if (lca_request == NULL) { + this_thread::sleep_for(chrono::seconds(loop_interval)); + continue; + } + + if (lca_request->last_execution == 0 || + lca_request->last_execution + lca_request->current_interval < time(0)) { + shared_ptr iterator = + query(lca_request->query, lca_request->context, lca_request->update_attention_broker); + // cout << "Request query: " << lca_request->query[0] << endl; + // cout << "Request link_template: " << lca_request->link_template[0] << endl; + // cout << "Request max_results: " << lca_request->max_results << endl; + // cout << "Request repeat: " << lca_request->repeat << endl; + // cout << "Request current_interval: " << lca_request->current_interval << endl; + // cout << "Request last_execution: " << lca_request->last_execution << endl; + + service->process_request(iterator, das_client, lca_request->link_template); + + if (lca_request->infinite || lca_request->repeat > 0) { + lca_request->last_execution = time(0); + lca_request->current_interval = (lca_request->current_interval * 2) % 86400; // Add exponential backoff, resets after 24 hours + if (lca_request->repeat > 1) { + lca_request->repeat--; + } else { + // TODO check for memory leaks here + request_buffer.erase(request_buffer.begin() + current_buffer_position - 1); + } + } else { + delete lca_request; + } } - - cout << "Running" << endl; } } void LinkCreationAgent::clean_requests() {} -shared_ptr LinkCreationAgent::query() { return NULL; } +shared_ptr LinkCreationAgent::query(vector& query_tokens, + string context, + bool update_attention_broker) { + return shared_ptr( + query_node_client->pattern_matcher_query(query_tokens, context, update_attention_broker)); +} void LinkCreationAgent::load_config() { ifstream file(config_path); @@ -70,14 +124,67 @@ void LinkCreationAgent::load_config() { this->link_creation_server_id = value; else if (key == "das_client_id") this->das_client_id = value; + else if (key == "requests_buffer_file") + this->requests_buffer_file = value; + else if (key == "context") + this->context = value; } } } file.close(); } -void LinkCreationAgent::save_buffer() {} +void LinkCreationAgent::save_buffer() { + ofstream file(requests_buffer_file, ios::binary); + for (LinkCreationAgentRequest request : request_buffer) { + file.write((char*) &request, sizeof(request)); + } + file.close(); +} + +void LinkCreationAgent::load_buffer() { + ifstream file(requests_buffer_file, ios::binary); + while (file) { + LinkCreationAgentRequest request; + file.read((char*) &request, sizeof(request)); + request_buffer.push_back(request); + } + file.close(); +} -void LinkCreationAgent::load_buffer() {} +LinkCreationAgentRequest* LinkCreationAgent::handle_request(vector request) { + try { + LinkCreationAgentRequest* lca_request = new LinkCreationAgentRequest(); + int cursor = 0; + bool is_link_create = false; + for (string arg : request) { + cursor++; + is_link_create = (arg == "LINK_CREATE") ^ is_link_create; + if (!is_link_create) { + lca_request->query.push_back(arg); + } + if (is_link_create && cursor < request.size() - 3) { + lca_request->link_template.push_back(arg); + } + if (cursor == request.size() - 3) { + lca_request->max_results = stoi(arg); + } + if (cursor == request.size() - 2) { + lca_request->repeat = stoi(arg); + } + if (cursor == request.size() - 1) { + lca_request->context = arg; + } + if (cursor == request.size()) { + lca_request->update_attention_broker = (arg == "true"); + } + } + lca_request->infinite = (lca_request->repeat == -1); + lca_request->current_interval = default_interval; -void LinkCreationAgent::handleRequest(string request) {} \ No newline at end of file + return lca_request; + } catch (exception& e) { + cout << "Error parsing request: " << e.what() << endl; + return NULL; + } +} \ No newline at end of file diff --git a/src/cpp/link_creation_agent/agent.h b/src/cpp/link_creation_agent/agent.h index 59ba194..ea7838c 100644 --- a/src/cpp/link_creation_agent/agent.h +++ b/src/cpp/link_creation_agent/agent.h @@ -27,13 +27,15 @@ namespace link_creation_agent { struct LinkCreationAgentRequest { - string query; - string link_template; + vector query; + vector link_template; int max_results = 1000; int repeat = 1; - long last_execution; + long last_execution = 0; int current_interval; bool infinite = false; + string context = ""; + bool update_attention_broker = false; }; class LinkCreationAgent @@ -59,7 +61,7 @@ namespace link_creation_agent * @brief Sends a query to DAS Query Agent * @returns Returns a shared_ptr, to iterate through the requests */ - shared_ptr query(); + shared_ptr query(vector &query_tokens, string context, bool update_attention_broker); /** * @brief Load config file */ @@ -76,7 +78,7 @@ namespace link_creation_agent * @brief Handle the create link request. * @param request String received from a DAS Node server */ - void handleRequest(string request); + LinkCreationAgentRequest* handle_request(vector request); void stop(); @@ -88,11 +90,13 @@ namespace link_creation_agent string query_node_server_id; // ID of the query node server string link_creation_server_id; // ID of the link creation server string das_client_id; // ID of the DAS client + string requests_buffer_file; + string context; // Other attributes LinkCreationService *service; - vector *request_buffer; + vector request_buffer; query_engine::DASNode *query_node_client; LinkCreationNode *link_creation_node_server; das::ServerNode *das_client; diff --git a/src/cpp/link_creation_agent/das_link_creation_node.cc b/src/cpp/link_creation_agent/das_link_creation_node.cc index 9def1bd..3539d76 100644 --- a/src/cpp/link_creation_agent/das_link_creation_node.cc +++ b/src/cpp/link_creation_agent/das_link_creation_node.cc @@ -17,7 +17,7 @@ LinkCreationNode::~LinkCreationNode() } -string LinkCreationNode::pop_request() +vector LinkCreationNode::pop_request() { return request_queue.dequeue(); } @@ -33,7 +33,7 @@ bool LinkCreationNode::is_shutting_down() return shutting_down; } -void LinkCreationNode::add_request(string &request) +void LinkCreationNode::add_request(vector request) { request_queue.enqueue(request); } @@ -65,9 +65,5 @@ void LinkCreationRequest::act(shared_ptr node) { auto link_node = dynamic_pointer_cast(node); string request; - for (auto arg : this->args) - { - request += arg + " "; - } - link_node->add_request(request); + link_node->add_request(this->args); } \ No newline at end of file diff --git a/src/cpp/link_creation_agent/das_link_creation_node.h b/src/cpp/link_creation_agent/das_link_creation_node.h index 8756c6c..a3975d3 100644 --- a/src/cpp/link_creation_agent/das_link_creation_node.h +++ b/src/cpp/link_creation_agent/das_link_creation_node.h @@ -21,7 +21,7 @@ class LinkCreationNode : public StarNode { /** * @brief Retrieves the next request */ - string pop_request(); + vector pop_request(); /** * @brief Return true if the request's queue is empty */ @@ -31,14 +31,14 @@ class LinkCreationNode : public StarNode { */ bool is_shutting_down(); - void add_request(string& request); + void add_request(vector request); string to_string() { return "LinkCreationNode"; } virtual shared_ptr message_factory(string& command, vector& args); private: - Queue request_queue; + Queue> request_queue; const string CREATE_LINK = "create_link"; bool shutting_down = false; }; diff --git a/src/cpp/link_creation_agent/das_server_node.cc b/src/cpp/link_creation_agent/das_server_node.cc new file mode 100644 index 0000000..f57cdac --- /dev/null +++ b/src/cpp/link_creation_agent/das_server_node.cc @@ -0,0 +1,23 @@ +#include "das_server_node.h" + +using namespace das; +using namespace std; +using namespace distributed_algorithm_node; + +ServerNode::ServerNode(const string &node_id, const string &server_id) : StarNode(node_id) +{ +} + +ServerNode::~ServerNode() +{ + DistributedAlgorithmNode::graceful_shutdown(); +} + +void ServerNode::create_link(vector &request) +{ + cout << "Creating link" << endl; + for (string token : request) + { + cout << token << endl; + } +} \ No newline at end of file diff --git a/src/cpp/link_creation_agent/das_server_node.h b/src/cpp/link_creation_agent/das_server_node.h index 16e53f3..e9b0188 100644 --- a/src/cpp/link_creation_agent/das_server_node.h +++ b/src/cpp/link_creation_agent/das_server_node.h @@ -21,10 +21,9 @@ namespace das { * Destructor */ ~ServerNode(); - void create_link(string &request); + void create_link(vector &request); private: const string CREATE_ATOM = "create_atom"; - Queue requests_queue; }; diff --git a/src/cpp/link_creation_agent/link.cc b/src/cpp/link_creation_agent/link.cc new file mode 100644 index 0000000..c4a306d --- /dev/null +++ b/src/cpp/link_creation_agent/link.cc @@ -0,0 +1,93 @@ +#include "link.h" + + +using namespace link_creation_agent; +using namespace std; +using namespace query_engine; + + +Link::Link(string type, vector targets) +{ + this->type = type; + this->targets = targets; +} + +Link::Link(QueryAnswer query_answer, vector link_template) +{ + string query_tokens = query_answer.tokenize(); + string token = ""; + for(char token_char : query_tokens){ + if(token_char == ' '){ + this->targets.push_back(token); + token = ""; + }else{ + token += token_char; + } + } +} + + +Link::Link() +{ +} + +Link::~Link() +{ +} + +string Link::get_type() +{ + return this->type; +} + +vector Link::get_targets() +{ + return this->targets; +} + +void Link::set_type(string type) +{ + this->type = type; +} + +void Link::set_targets(vector targets) +{ + this->targets = targets; +} + +void Link::add_target(string target) +{ + this->targets.push_back(target); +} + +vector Link::tokenize() +{ + return targets; +} + + +Link Link::untokenize(string link) +{ + vector tokens; + string token; + for (char c : link) + { + if (c == ' ') + { + tokens.push_back(token); + token = ""; + } + else + { + token += c; + } + } + tokens.push_back(token); + Link answer; + answer.set_type(tokens[0]); + for (unsigned int i = 1; i < tokens.size(); i++) + { + answer.add_target(tokens[i]); + } + return answer; +} diff --git a/src/cpp/link_creation_agent/link.h b/src/cpp/link_creation_agent/link.h index e68e1e8..462c62a 100644 --- a/src/cpp/link_creation_agent/link.h +++ b/src/cpp/link_creation_agent/link.h @@ -4,8 +4,11 @@ #pragma once #include #include +#include "QueryAnswer.h" using namespace std; +using namespace query_engine; + namespace link_creation_agent { @@ -13,6 +16,7 @@ namespace link_creation_agent { public: Link(string type, vector targets); + Link(QueryAnswer query_answer, vector link_template); Link(); ~Link(); /** @@ -42,7 +46,7 @@ namespace link_creation_agent * @brief Tokenize the link * @returns Returns the tokenized link */ - string tokenize(); + vector tokenize(); Link untokenize(string link); private: diff --git a/src/cpp/link_creation_agent/service.h b/src/cpp/link_creation_agent/service.h index 5ba6249..c94819f 100644 --- a/src/cpp/link_creation_agent/service.h +++ b/src/cpp/link_creation_agent/service.h @@ -12,6 +12,8 @@ #include "link.h" #include #include +#include +#include using namespace das; using namespace query_node; @@ -28,7 +30,7 @@ namespace link_creation_agent * @param iterator RemoteIterator object * @param das_client DAS Node client */ - void process_request(shared_ptr iterator, ServerNode &das_client, string& link_template); + void process_request(shared_ptr iterator, ServerNode *das_client, vector& link_template); /** * @brief Destructor */ @@ -39,13 +41,15 @@ namespace link_creation_agent ThreadPool thread_pool; // this can be changed to a better data structure set processed_link_handles; + std::mutex m_mutex; + std::condition_variable m_cond; /** * @brief Create a link, blocking the client until the link is created - * @param link_representation Link object + * @param link Link object * @param das_client DAS Node client */ - void create_link(Link link_representation, ServerNode &das_client); + void create_link(Link &link, ServerNode &das_client); }; } \ No newline at end of file diff --git a/src/cpp/link_creation_agent/servive.cc b/src/cpp/link_creation_agent/servive.cc new file mode 100644 index 0000000..59f2da3 --- /dev/null +++ b/src/cpp/link_creation_agent/servive.cc @@ -0,0 +1,36 @@ +#include "service.h" + +using namespace link_creation_agent; +using namespace std; +using namespace query_node; + +LinkCreationService::LinkCreationService(int thread_count) : thread_pool(thread_count) +{ +} + +LinkCreationService::~LinkCreationService() +{ +} + +void LinkCreationService::process_request(shared_ptr iterator, ServerNode *das_client, vector &link_template) +{ + auto job = [this, iterator, das_client, link_template]() { + while (!iterator->finished()) + { + QueryAnswer* query_answer = iterator->pop(); + Link link(*query_answer, link_template); + this->create_link(link, *das_client); + delete query_answer; + } + }; + thread_pool.enqueue(job); +} + +void LinkCreationService::create_link(Link &link, ServerNode &das_client) +{ + std::unique_lock lock(m_mutex); + cout << "Creating link" << endl; + vector link_tokens = link.tokenize(); + das_client.create_link(link_tokens); + m_cond.notify_one(); +} diff --git a/src/cpp/link_creation_agent/thread_pool.h b/src/cpp/link_creation_agent/thread_pool.h index e002a6d..50d8676 100644 --- a/src/cpp/link_creation_agent/thread_pool.h +++ b/src/cpp/link_creation_agent/thread_pool.h @@ -2,29 +2,78 @@ * @brief Example of thread pool implementation */ #pragma once + +#include #include -#include #include +#include +#include +#include #include #include -#include -#include -#include - +#include +#include /** * @brief Thread pool class example - */ +*/ + class ThreadPool { public: - explicit ThreadPool(size_t thread_count) : stop_(false) {}; - ~ThreadPool(); - void enqueue(std::function task); + explicit ThreadPool(size_t threads) { + for (size_t i = 0; i < threads; ++i) { + workers.emplace_back([this] { + while (true) { + std::function task; + { + std::unique_lock lock(queueMutex); + condition.wait(lock, [this] { return stop || !tasks.empty(); }); + if (stop && tasks.empty()) + return; + task = std::move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + ~ThreadPool() { + { + std::unique_lock lock(queueMutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) + worker.join(); + } + + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type> { + using returnType = typename std::invoke_result::type; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + std::unique_lock lock(queueMutex); + if (stop) + throw std::runtime_error("Enqueue on stopped ThreadPool"); + + tasks.emplace([task]() { (*task)(); }); + } + condition.notify_one(); + return res; + } private: - std::vector workers_; - std::queue> tasks_; - std::mutex mutex_; - std::condition_variable condition_; - std::atomic stop_; + std::vector workers; + std::queue> tasks; + std::mutex queueMutex; + std::condition_variable condition; + bool stop = false; }; From 884d6817a638687b5310de8ebe2420d88149fc01 Mon Sep 17 00:00:00 2001 From: Edgar Brissow Date: Tue, 28 Jan 2025 12:14:57 -0300 Subject: [PATCH 2/2] fixing link untokenize --- src/cpp/link_creation_agent/link.cc | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/src/cpp/link_creation_agent/link.cc b/src/cpp/link_creation_agent/link.cc index c4a306d..3be4e11 100644 --- a/src/cpp/link_creation_agent/link.cc +++ b/src/cpp/link_creation_agent/link.cc @@ -68,26 +68,5 @@ vector Link::tokenize() Link Link::untokenize(string link) { - vector tokens; - string token; - for (char c : link) - { - if (c == ' ') - { - tokens.push_back(token); - token = ""; - } - else - { - token += c; - } - } - tokens.push_back(token); - Link answer; - answer.set_type(tokens[0]); - for (unsigned int i = 1; i < tokens.size(); i++) - { - answer.add_target(tokens[i]); - } - return answer; + return Link(); }