From 4164db35bb44df24976350be4448a0383ec3bf11 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 15 Oct 2024 16:34:46 +0200 Subject: [PATCH] [control-plane] [sdk] Finalize features of sticky sessions --- common/include/praas/common/http.hpp | 2 + common/src/http.cpp | 5 + .../praas/control-plane/application.hpp | 6 +- .../include/praas/control-plane/worker.hpp | 3 +- control-plane/src/application.cpp | 19 ++- control-plane/src/http.cpp | 4 +- control-plane/src/process.cpp | 1 + control-plane/src/worker.cpp | 18 ++- sdk/include/praas/sdk/invocation.hpp | 2 + sdk/include/praas/sdk/praas.hpp | 20 ++- sdk/src/praas.cpp | 126 ++++++++++++++---- tests/integration/invocation.cpp | 26 +++- 12 files changed, 189 insertions(+), 43 deletions(-) diff --git a/common/include/praas/common/http.hpp b/common/include/praas/common/http.hpp index 1a8f843..22b6669 100644 --- a/common/include/praas/common/http.hpp +++ b/common/include/praas/common/http.hpp @@ -53,6 +53,8 @@ namespace praas::common::http { static response_ptr_t failed_response(const std::string& reason); + std::shared_ptr& handle(); + private: void request(request_ptr_t& req, parameters_t&& params, callback_t&& callback); diff --git a/common/src/http.cpp b/common/src/http.cpp index d6d8eb6..cae0772 100644 --- a/common/src/http.cpp +++ b/common/src/http.cpp @@ -114,6 +114,11 @@ namespace praas::common::http { return resp; } + std::shared_ptr& HTTPClient::handle() + { + return _http_client; + } + void HTTPClientFactory::initialize(int thread_num) { HTTPClientFactory::_pool = std::make_unique(thread_num); diff --git a/control-plane/include/praas/control-plane/application.hpp b/control-plane/include/praas/control-plane/application.hpp index 861f299..837320d 100644 --- a/control-plane/include/praas/control-plane/application.hpp +++ b/control-plane/include/praas/control-plane/application.hpp @@ -153,6 +153,9 @@ namespace praas::control_plane { backend::Backend& backend, tcpserver::TCPServer& poller, process::Resources&& resources ); + std::optional> + get_controlplane_process(const std::string& process_name); + //////////////////////////////////////////////////////////////////////////////// /// @brief Obtain a process that can be used for FaaS invocations. /// Finds the first process with a spare capacity. Otherwise, it allocates one. @@ -236,7 +239,8 @@ namespace praas::control_plane { std::unordered_map _swapped_processes; lock_t _controlplane_mutex; - std::vector _controlplane_processes; + std::unordered_map _controlplane_processes; + int _controlplane_counter = 0; }; } // namespace praas::control_plane diff --git a/control-plane/include/praas/control-plane/worker.hpp b/control-plane/include/praas/control-plane/worker.hpp index fa3444a..834bd57 100644 --- a/control-plane/include/praas/control-plane/worker.hpp +++ b/control-plane/include/praas/control-plane/worker.hpp @@ -55,7 +55,8 @@ namespace praas::control_plane::worker { void handle_invocation( HttpServer::request_t request, HttpServer::callback_t&& callback, const std::string& app_id, - std::string function_name, std::chrono::high_resolution_clock::time_point start + std::string function_name, std::chrono::high_resolution_clock::time_point start, + std::optional process_name ); //////////////////////////////////////////////////////////////////////////////// diff --git a/control-plane/src/application.cpp b/control-plane/src/application.cpp index b6f7a2d..25a41e6 100644 --- a/control-plane/src/application.cpp +++ b/control-plane/src/application.cpp @@ -118,6 +118,19 @@ namespace praas::control_plane { throw praas::common::ObjectDoesNotExist{name}; } } + + std::optional> + Application::get_controlplane_process(const std::string& process_name) + { + read_lock_t lock(_controlplane_mutex); + + auto iter = _controlplane_processes.find(process_name); + if(iter != _controlplane_processes.end()) { + return std::make_tuple(iter->second->write_lock(), iter->second.get()); + } + + return std::nullopt; + } void Application::get_controlplane_process( backend::Backend& backend, tcpserver::TCPServer& poller, process::Resources&& resources, @@ -130,7 +143,7 @@ namespace praas::control_plane { // FIXME: this needs to be a parameter - store it in the process int max_funcs_per_process = 1; - for (auto& proc : _controlplane_processes) { + for (auto& [name, proc] : _controlplane_processes) { int active_funcs = proc->active_invocations(); if (active_funcs < max_funcs_per_process) { @@ -142,7 +155,7 @@ namespace praas::control_plane { } // No process? create - std::string name = fmt::format("controlplane-{}", _controlplane_processes.size()); + std::string name = fmt::format("controlplane-{}", _controlplane_counter++); process::ProcessPtr process = std::make_shared(name, this, std::move(resources)); process->set_creation_callback(std::move(callback), true); @@ -168,7 +181,7 @@ namespace praas::control_plane { { write_lock_t lock(_controlplane_mutex); - _controlplane_processes.emplace_back(process); + _controlplane_processes[name] = process; } } else { diff --git a/control-plane/src/http.cpp b/control-plane/src/http.cpp index 2038633..91e0078 100644 --- a/control-plane/src/http.cpp +++ b/control-plane/src/http.cpp @@ -171,11 +171,13 @@ namespace praas::control_plane { const std::string& function_name ) { + std::string pid = request->getParameter("process_name"); + _logger->info("Push new invocation request of {}", function_name); auto start = std::chrono::high_resolution_clock::now(); _workers.add_task( &worker::Workers::handle_invocation, request, std::move(callback), app_name, function_name, - start + start, !pid.empty() ? std::make_optional(pid) : std::nullopt ); } diff --git a/control-plane/src/process.cpp b/control-plane/src/process.cpp index c2359c7..7bdee49 100644 --- a/control-plane/src/process.cpp +++ b/control-plane/src/process.cpp @@ -233,6 +233,7 @@ namespace praas::control_plane::process { spdlog::error("Responding to client of invocation {}", invocation_id); Json::Value json; json["function"] = (*iter).function_name; + json["process_name"] = _name; json["invocation_id"] = invocation_id; json["return_code"] = return_code; json["result"] = std::string{buf, len}; diff --git a/control-plane/src/worker.cpp b/control-plane/src/worker.cpp index 9397ac4..72b2cb2 100644 --- a/control-plane/src/worker.cpp +++ b/control-plane/src/worker.cpp @@ -21,7 +21,8 @@ namespace praas::control_plane::worker { void Workers::handle_invocation( HttpServer::request_t request, HttpServer::callback_t&& callback, const std::string& app_id, - std::string function_name, std::chrono::high_resolution_clock::time_point start + std::string function_name, std::chrono::high_resolution_clock::time_point start, + std::optional process_name ) { Resources::RWAccessor acc; @@ -34,7 +35,20 @@ namespace praas::control_plane::worker { common::util::assert_true(_server != nullptr); - { + // Choose sticky process + if(process_name.has_value()) { + auto val = acc.get()->get_controlplane_process(process_name.value()); + + if(val.has_value()) { + spdlog::debug("[Workers] Schedule new invocation on proc {}", process_name.value()); + std::get<1>(val.value())->add_invocation( + std::move(request), std::move(callback), function_name, start + ); + } else { + callback(HttpServer::failed_response("Process unknown!", drogon::k404NotFound)); + } + } else { + // Get a process or allocate one. // FIXME: make resources configurable acc.get()->get_controlplane_process( diff --git a/sdk/include/praas/sdk/invocation.hpp b/sdk/include/praas/sdk/invocation.hpp index 69a761d..d5a6944 100644 --- a/sdk/include/praas/sdk/invocation.hpp +++ b/sdk/include/praas/sdk/invocation.hpp @@ -20,6 +20,8 @@ namespace praas::sdk { struct ControlPlaneInvocationResult { + std::string process_name; + std::string invocation_id; int return_code; diff --git a/sdk/include/praas/sdk/praas.hpp b/sdk/include/praas/sdk/praas.hpp index 115dec3..b311eca 100644 --- a/sdk/include/praas/sdk/praas.hpp +++ b/sdk/include/praas/sdk/praas.hpp @@ -4,14 +4,16 @@ #include #include +#include #include #include +#include "praas/common/http.hpp" namespace praas::sdk { struct PraaS { - PraaS(const std::string& control_plane_addr); + PraaS(const std::string& control_plane_addr, int thread_num = 8); void disconnect(); @@ -28,7 +30,14 @@ namespace praas::sdk { ControlPlaneInvocationResult invoke( const std::string& app_name, const std::string& function_name, - const std::string& invocation_data + const std::string& invocation_data, + std::optional process_name = std::nullopt + ); + + std::future invoke_async( + const std::string& app_name, const std::string& function_name, + const std::string& invocation_data, + std::optional process_name = std::nullopt ); std::tuple swap_process(const Process& process); @@ -46,9 +55,12 @@ namespace praas::sdk { private: std::string _last_error; - trantor::EventLoopThread _loop; + std::mutex _clients_mutex; + std::condition_variable _cv; + std::queue _clients; - drogon::HttpClientPtr _http_client; + praas::common::http::HTTPClient _get_client(); + void _return_client(praas::common::http::HTTPClient& client); }; } // namespace praas::sdk diff --git a/sdk/src/praas.cpp b/sdk/src/praas.cpp index d4d126f..0e7d967 100644 --- a/sdk/src/praas.cpp +++ b/sdk/src/praas.cpp @@ -5,23 +5,48 @@ #include #include #include +#include + +#include namespace praas::sdk { - PraaS::PraaS(const std::string& control_plane_addr) + PraaS::PraaS(const std::string& control_plane_addr, int thread_num) { - _loop.run(); - _http_client = - drogon::HttpClient::newHttpClient(control_plane_addr, _loop.getLoop(), false, false); + praas::common::http::HTTPClientFactory::initialize(thread_num); + + + for(int i = 0; i < thread_num; ++i) { + _clients.emplace( + praas::common::http::HTTPClientFactory::create_client(control_plane_addr) + ); + } } - void PraaS::disconnect() + praas::common::http::HTTPClient PraaS::_get_client() { - _http_client.reset(); - _loop.getLoop()->quit(); - _loop.wait(); + std::unique_lock lock{_clients_mutex}; + + if(_clients.empty()) { + _cv.wait(lock); + } + + auto client = std::move(_clients.front()); + _clients.pop(); + + return client; } + void PraaS::_return_client(praas::common::http::HTTPClient& client) + { + std::unique_lock l{_clients_mutex}; + + _clients.push(std::move(client)); + } + + void PraaS::disconnect() + {} + bool PraaS::create_application(const std::string& application, const std::string& cloud_resource_name) { @@ -32,7 +57,8 @@ namespace praas::sdk { std::promise p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { if (result == drogon::ReqResult::Ok) { @@ -42,6 +68,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } @@ -54,7 +82,8 @@ namespace praas::sdk { std::promise p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { if (result == drogon::ReqResult::Ok) { @@ -64,6 +93,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } @@ -75,7 +106,8 @@ namespace praas::sdk { std::promise p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { if (result == drogon::ReqResult::Ok) { @@ -85,8 +117,9 @@ namespace praas::sdk { } } ); - return p.get_future().get(); + _return_client(http_client); + return p.get_future().get(); } std::optional PraaS::create_process( @@ -102,7 +135,8 @@ namespace praas::sdk { std::promise> p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { spdlog::info("Received callback Created process"); @@ -117,6 +151,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } @@ -128,7 +164,8 @@ namespace praas::sdk { std::promise p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { @@ -140,37 +177,61 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } ControlPlaneInvocationResult PraaS::invoke( const std::string& app_name, const std::string& function_name, - const std::string& invocation_data + const std::string& invocation_data, std::optional process_name ) { - std::promise p; - ControlPlaneInvocationResult res; + return invoke_async(app_name, function_name, invocation_data, process_name).get(); + } + + std::future PraaS::invoke_async( + const std::string& app_name, const std::string& function_name, + const std::string& invocation_data, std::optional process_name + ) + { + // We need a shared_ptr because we cannot move it to the lambda later + // Drogon request requires a std::function which must be CopyConstructible + auto p = std::make_shared>(); + auto fut = p->get_future(); auto req = drogon::HttpRequest::newHttpRequest(); req->setMethod(drogon::Post); req->setPath(fmt::format("/apps/{}/invoke/{}", app_name, function_name)); req->setBody(invocation_data); + if(process_name.has_value()) { + req->setParameter("process_name", process_name.value()); + } req->setContentTypeCode(drogon::ContentType::CT_APPLICATION_JSON); - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, - [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { + [&, p = std::move(p)](drogon::ReqResult result, const drogon::HttpResponsePtr& response) mutable { + + ControlPlaneInvocationResult res; if (result != drogon::ReqResult::Ok || response->getStatusCode() != drogon::k200OK) { res.return_code = 1; - auto json = response->getJsonObject(); - res.error_message = (*json)["reason"].asString(); - p.set_value(); + if(response) { + auto json = response->getJsonObject(); + res.error_message = (*json)["reason"].asString(); + } else { + res.error_message = "request failed"; + } + + p->set_value(res); return; } auto json = response->getJsonObject(); res.invocation_id = (*json)["invocation_id"].asString(); + res.process_name = (*json)["process_name"].asString(); res.return_code = (*json)["return_code"].asInt(); if (res.return_code < 0) { @@ -179,12 +240,12 @@ namespace praas::sdk { res.response = std::move((*json)["result"].asString()); } - p.set_value(); + p->set_value(res); } ); - p.get_future().wait(); + _return_client(http_client); - return res; + return fut; } std::tuple PraaS::swap_process(const Process& process) @@ -195,7 +256,8 @@ namespace praas::sdk { std::promise> p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { @@ -206,6 +268,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } @@ -219,7 +283,8 @@ namespace praas::sdk { std::promise> p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { @@ -235,6 +300,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } @@ -246,7 +313,8 @@ namespace praas::sdk { std::promise p; - _http_client->sendRequest( + auto http_client = _get_client(); + http_client.handle()->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { @@ -257,6 +325,8 @@ namespace praas::sdk { } } ); + _return_client(http_client); + return p.get_future().get(); } diff --git a/tests/integration/invocation.cpp b/tests/integration/invocation.cpp index 935d0d2..513ac53 100644 --- a/tests/integration/invocation.cpp +++ b/tests/integration/invocation.cpp @@ -50,9 +50,29 @@ TEST_F(IntegrationLocalInvocation, Invoke) ASSERT_TRUE(praas.create_application("test_invoc", "spcleth/praas-examples:hello-world-cpp")); } - auto invoc = praas.invoke("test_invoc", "hello-world", ""); + auto invoc = praas.invoke_async("test_invoc", "hello-world", ""); + auto invoc2 = praas.invoke_async("test_invoc", "hello-world", ""); - ASSERT_EQ(invoc.return_code, 0); - EXPECT_EQ("Hello, world!", get_output_binary(invoc.response)); + auto invoc_res = invoc.get(); + auto invoc2_res = invoc2.get(); + std::cerr << invoc_res.process_name << std::endl; + std::cerr << invoc2_res.process_name << std::endl; + + for(auto & res : {invoc_res, invoc2_res}) { + ASSERT_EQ(res.return_code, 0); + EXPECT_EQ("Hello, world!", get_output_binary(res.response)); + } + + invoc = praas.invoke_async("test_invoc", "hello-world", "", invoc_res.process_name); + invoc2 = praas.invoke_async("test_invoc", "hello-world", "", invoc2_res.process_name); + + auto new_invoc_res = invoc.get(); + auto new_invoc2_res = invoc2.get(); + for(auto & res : {new_invoc_res, new_invoc2_res}) { + ASSERT_EQ(res.return_code, 0); + EXPECT_EQ("Hello, world!", get_output_binary(res.response)); + } + ASSERT_EQ(invoc_res.process_name, new_invoc_res.process_name); + ASSERT_EQ(invoc2_res.process_name, new_invoc2_res.process_name); } }