diff --git a/sdk/include/praas/sdk/process.hpp b/sdk/include/praas/sdk/process.hpp index 9a2249d..a1e8da4 100644 --- a/sdk/include/praas/sdk/process.hpp +++ b/sdk/include/praas/sdk/process.hpp @@ -1,6 +1,7 @@ #ifndef PRAAS_SDK_PROCESS_HPP #define PRAAS_SDK_PROCESS_HPP +#include #include #include @@ -25,6 +26,8 @@ namespace praas::sdk { void disconnect(); + bool is_alive(); + InvocationResult invoke(std::string_view function_name, std::string invocation_id, char* ptr, size_t len); sockpp::tcp_connector& connection() @@ -38,8 +41,12 @@ namespace praas::sdk { private: + praas::common::message::MessageData _response; + bool _disable_nagle = true; + bool _disconnected = true; + sockpp::inet_address _addr; sockpp::tcp_connector _dataplane; diff --git a/sdk/src/process.cpp b/sdk/src/process.cpp index 8c17585..b5ea23c 100644 --- a/sdk/src/process.cpp +++ b/sdk/src/process.cpp @@ -1,10 +1,11 @@ #include +#include + #include #include -#include -#include "praas/common/exceptions.hpp" +#include namespace praas::sdk { @@ -23,6 +24,7 @@ namespace praas::sdk { void Process::disconnect() { _dataplane.close(); + _disconnected = true; } bool Process::connect() @@ -37,7 +39,7 @@ namespace praas::sdk { } praas::common::message::ProcessConnectionData req; - req.process_name("DATAPLANE"); + req.process_name("DATA_PLANE"); _dataplane.write_n(req.bytes(), req.BUF_SIZE); // Now wait for the confirmation; @@ -54,9 +56,41 @@ namespace praas::sdk { } auto& result = std::get(parsed_msg); + _disconnected = false; + return result.process_name() == "CORRECT"; } + bool Process::is_alive() + { + // Not sure this is fully robust but seems to work for our case + if(_disconnected) { + return false; + } + + ssize_t read_bytes = recv(_dataplane.handle(), _response.data(), praas::common::message::MessageConfig::BUF_SIZE, MSG_DONTWAIT); + + if(read_bytes == -1 && errno == EAGAIN) { + return true; + } + + if(read_bytes == 0) { + return false; + } + + if(read_bytes != praas::common::message::MessageConfig::BUF_SIZE) { + auto parsed_msg = praas::common::message::MessageParser::parse(_response); + if (!std::holds_alternative(parsed_msg)) { + spdlog::error("Unknown message received from the process!"); + } else { + _disconnected = true; + } + + return false; + } + + } + InvocationResult Process::invoke(std::string_view function_name, std::string invocation_id, char* ptr, size_t len) { @@ -68,18 +102,24 @@ namespace praas::sdk { msg.invocation_id(invocation_id); msg.payload_size(len); - _dataplane.write_n(msg.bytes(), msg.BUF_SIZE); + auto written = _dataplane.write_n(msg.bytes(), msg.BUF_SIZE); + if(written != msg.BUF_SIZE) { + return {1, nullptr, 0}; + } + if (len > 0) { - _dataplane.write_n(ptr, len); + auto written = _dataplane.write_n(ptr, len); + if(written != len) { + return {1, nullptr, 0}; + } } - praas::common::message::MessageData response; - auto read_bytes = _dataplane.read_n(response.data(), praas::common::message::MessageConfig::BUF_SIZE); + auto read_bytes = _dataplane.read_n(_response.data(), praas::common::message::MessageConfig::BUF_SIZE); if(read_bytes < praas::common::message::MessageConfig::BUF_SIZE) { return {1, nullptr, 0}; } - auto parsed_msg = praas::common::message::MessageParser::parse(response); + auto parsed_msg = praas::common::message::MessageParser::parse(_response); if (!std::holds_alternative(parsed_msg)) { return {1, nullptr, 0}; }