Skip to content

Commit

Permalink
[sdk] Detect closure
Browse files Browse the repository at this point in the history
  • Loading branch information
mcopik committed Oct 16, 2024
1 parent 6ca8e9e commit aae47fe
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
7 changes: 7 additions & 0 deletions sdk/include/praas/sdk/process.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef PRAAS_SDK_PROCESS_HPP
#define PRAAS_SDK_PROCESS_HPP

#include <praas/common/messages.hpp>
#include <praas/sdk/invocation.hpp>

#include <sockpp/stream_socket.h>
Expand All @@ -25,6 +26,8 @@ namespace praas::sdk {

void disconnect();

bool is_alive();

InvocationResult invoke(std::string_view function_name, std::string invocation_id, char* ptr, size_t len);

sockpp::tcp_connector& connection()
Expand All @@ -38,8 +41,12 @@ namespace praas::sdk {

private:

praas::common::message::MessageData _response;

bool _disable_nagle = true;

bool _disconnected = true;

sockpp::inet_address _addr;

sockpp::tcp_connector _dataplane;
Expand Down
56 changes: 48 additions & 8 deletions sdk/src/process.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#include <praas/sdk/process.hpp>

#include <variant>

#include <praas/common/messages.hpp>
#include <praas/common/sockets.hpp>

#include <variant>
#include "praas/common/exceptions.hpp"
#include <spdlog/spdlog.h>

namespace praas::sdk {

Expand All @@ -23,6 +24,7 @@ namespace praas::sdk {
void Process::disconnect()
{
_dataplane.close();
_disconnected = true;
}

bool Process::connect()
Expand All @@ -37,7 +39,7 @@ namespace praas::sdk {
}

praas::common::message::ProcessConnectionData req;
req.process_name("DATAPLANE");
req.process_name("DATA_PLANE");
_dataplane.write_n(req.bytes(), req.BUF_SIZE);

// Now wait for the confirmation;
Expand All @@ -54,9 +56,41 @@ namespace praas::sdk {
}
auto& result = std::get<common::message::ProcessConnectionPtr>(parsed_msg);

_disconnected = false;

return result.process_name() == "CORRECT";
}

bool Process::is_alive()
{
// Not sure this is fully robust but seems to work for our case
if(_disconnected) {
return false;
}

ssize_t read_bytes = recv(_dataplane.handle(), _response.data(), praas::common::message::MessageConfig::BUF_SIZE, MSG_DONTWAIT);

if(read_bytes == -1 && errno == EAGAIN) {
return true;
}

if(read_bytes == 0) {
return false;
}

if(read_bytes != praas::common::message::MessageConfig::BUF_SIZE) {
auto parsed_msg = praas::common::message::MessageParser::parse(_response);
if (!std::holds_alternative<common::message::ProcessClosurePtr>(parsed_msg)) {
spdlog::error("Unknown message received from the process!");
} else {
_disconnected = true;
}

return false;
}

}

InvocationResult
Process::invoke(std::string_view function_name, std::string invocation_id, char* ptr, size_t len)
{
Expand All @@ -68,18 +102,24 @@ namespace praas::sdk {
msg.invocation_id(invocation_id);
msg.payload_size(len);

_dataplane.write_n(msg.bytes(), msg.BUF_SIZE);
auto written = _dataplane.write_n(msg.bytes(), msg.BUF_SIZE);
if(written != msg.BUF_SIZE) {
return {1, nullptr, 0};
}

if (len > 0) {
_dataplane.write_n(ptr, len);
auto written = _dataplane.write_n(ptr, len);
if(written != len) {
return {1, nullptr, 0};
}
}

praas::common::message::MessageData response;
auto read_bytes = _dataplane.read_n(response.data(), praas::common::message::MessageConfig::BUF_SIZE);
auto read_bytes = _dataplane.read_n(_response.data(), praas::common::message::MessageConfig::BUF_SIZE);
if(read_bytes < praas::common::message::MessageConfig::BUF_SIZE) {
return {1, nullptr, 0};
}

auto parsed_msg = praas::common::message::MessageParser::parse(response);
auto parsed_msg = praas::common::message::MessageParser::parse(_response);
if (!std::holds_alternative<common::message::InvocationResultPtr>(parsed_msg)) {
return {1, nullptr, 0};
}
Expand Down

0 comments on commit aae47fe

Please sign in to comment.