From 38f24942cd1aa1823e67ea8a3c2e9162b2af12e3 Mon Sep 17 00:00:00 2001 From: Marcin Copik Date: Tue, 15 Oct 2024 03:00:56 +0200 Subject: [PATCH] [control-plane] [serving] [processs] [sdk] Consolidate S3 implementation for swapping --- .../include/praas/control-plane/aws.hpp | 18 -- .../include/praas/control-plane/backend.hpp | 26 +- .../include/praas/control-plane/config.hpp | 16 +- .../praas/control-plane/deployment.hpp | 31 +- .../include/praas/control-plane/server.hpp | 4 +- control-plane/src/application.cpp | 5 + control-plane/src/backend.cpp | 23 +- control-plane/src/config.cpp | 19 ++ control-plane/src/deployment.cpp | 33 ++ control-plane/src/server.cpp | 12 +- control-plane/tests/mocks.hpp | 6 +- process/CMakeLists.txt | 7 + .../praas/process/controller/controller.hpp | 2 +- .../praas/process/controller/swapper.hpp | 32 +- process/controller/src/cli.cpp | 15 +- process/controller/src/controller.cpp | 73 ++++- process/controller/src/swapper.cpp | 297 +++++++++++++++++- sdk/include/praas/sdk/praas.hpp | 2 +- sdk/src/praas.cpp | 8 +- serving/src/server.cpp | 16 +- tests/integration/state_swap.cpp | 11 +- 21 files changed, 576 insertions(+), 80 deletions(-) delete mode 100644 control-plane/include/praas/control-plane/aws.hpp diff --git a/control-plane/include/praas/control-plane/aws.hpp b/control-plane/include/praas/control-plane/aws.hpp deleted file mode 100644 index 50d2714..0000000 --- a/control-plane/include/praas/control-plane/aws.hpp +++ /dev/null @@ -1,18 +0,0 @@ - -#ifndef PRAAS_CONTROLL_PLANE_AWS_HPP -#define PRAAS_CONTROLL_PLANE_AWS_HPP - -#if defined(WITH_AWS_BACKEND) - -namespace praas::control_plane::state { - - struct AWSS3SwapLocation { - - std::string location() const = 0; - }; - -}; // namespace praas::control_plane::state - -#endif - -#endif diff --git a/control-plane/include/praas/control-plane/backend.hpp b/control-plane/include/praas/control-plane/backend.hpp index b41ddce..8eda50e 100644 --- a/control-plane/include/praas/control-plane/backend.hpp +++ b/control-plane/include/praas/control-plane/backend.hpp @@ -2,6 +2,7 @@ #define PRAAS_CONTROLL_PLANE_BACKEND_HPP #include +#include #include #if defined(WITH_FARGATE_BACKEND) @@ -74,7 +75,10 @@ namespace praas::control_plane::backend { struct Backend { - Backend() = default; + Backend(deployment::Deployment& deployment): + _deployment(deployment) + {} + Backend(const Backend&) = default; Backend(Backend&&) = delete; Backend& operator=(const Backend&) = default; @@ -89,9 +93,9 @@ namespace praas::control_plane::backend { * @param resources [TODO:description] */ virtual void allocate_process( - process::ProcessPtr, const process::Resources& resources, - std::function&&, std::optional)>&& - callback + process::ProcessPtr, const process::Resources& resources, + std::function&&, std::optional)>&& + callback ) = 0; virtual void shutdown( @@ -122,13 +126,20 @@ namespace praas::control_plane::backend { * @param {name} initialized backend instance, where instance type is one of Backend's * childrens. */ - static std::unique_ptr construct(const config::Config&); + static std::unique_ptr construct(const config::Config&, deployment::Deployment&); void configure_tcpserver(const std::string& ip, int port); + const deployment::Deployment& deployment() const + { + return _deployment; + } + protected: std::string _tcp_ip; int _tcp_port; + + deployment::Deployment& _deployment; }; struct DockerBackend : Backend { @@ -149,8 +160,7 @@ namespace praas::control_plane::backend { std::string container_id; }; - DockerBackend(const config::BackendDocker& cfg); - + DockerBackend(const config::BackendDocker& cfg, deployment::Deployment& deployment); ~DockerBackend() override; void allocate_process( @@ -217,7 +227,7 @@ namespace praas::control_plane::backend { std::shared_ptr _ec2_client; }; - FargateBackend(const config::BackendFargate& cfg); + FargateBackend(const config::BackendFargate& cfg, deployment::Deployment& deployment); ~FargateBackend() override; diff --git a/control-plane/include/praas/control-plane/config.hpp b/control-plane/include/praas/control-plane/config.hpp index 53e6b18..eac50c2 100644 --- a/control-plane/include/praas/control-plane/config.hpp +++ b/control-plane/include/praas/control-plane/config.hpp @@ -93,7 +93,20 @@ namespace praas::control_plane::config { void set_defaults(); }; - struct Deployment {}; + struct Deployment { + virtual ~Deployment() = default; + }; + + struct LocalDeployment : Deployment { + void load(cereal::JSONInputArchive& archive) {} + }; + + struct AWSDeployment : Deployment { + std::string s3_bucket; + + void load(cereal::JSONInputArchive& archive); + void set_defaults(); + }; struct Config { @@ -103,6 +116,7 @@ namespace praas::control_plane::config { TCPServer tcpserver; deployment::Type deployment_type; + std::unique_ptr deployment; backend::Type backend_type; std::unique_ptr backend; diff --git a/control-plane/include/praas/control-plane/deployment.hpp b/control-plane/include/praas/control-plane/deployment.hpp index 5f6bdaf..77b876f 100644 --- a/control-plane/include/praas/control-plane/deployment.hpp +++ b/control-plane/include/praas/control-plane/deployment.hpp @@ -4,10 +4,6 @@ #include -#if defined(WITH_AWS_DEPLOYMENT) -#include -#endif - #include namespace praas::control_plane::config { @@ -31,7 +27,17 @@ namespace praas::control_plane::state { }; #if defined(WITH_AWS_DEPLOYMENT) - class AWSS3SwapLocation : SwapLocation {}; + struct AWSS3SwapLocation : SwapLocation { + std::string app_name; + + AWSS3SwapLocation(std::string app_name): + app_name(std::move(app_name)) + {} + + std::string_view root_path() const override; + std::string path(std::string process_name) const override; + }; + class RedisSwapLocation : SwapLocation {}; #endif @@ -72,6 +78,21 @@ namespace praas::control_plane::deployment { std::filesystem::path _path; }; +#if defined(WITH_AWS_DEPLOYMENT) + class AWS : public Deployment { + public: + std::string s3_bucket; + + AWS(std::string s3_bucket): + s3_bucket(s3_bucket) + {} + + std::unique_ptr get_location(std::string app_name) override; + + void delete_swap(const state::SwapLocation& /*unused*/) override; + }; +#endif + } // namespace praas::control_plane::deployment #endif diff --git a/control-plane/include/praas/control-plane/server.hpp b/control-plane/include/praas/control-plane/server.hpp index 0951ef4..e0aa278 100644 --- a/control-plane/include/praas/control-plane/server.hpp +++ b/control-plane/include/praas/control-plane/server.hpp @@ -42,10 +42,10 @@ namespace praas::control_plane { Resources _resources; - std::unique_ptr _backend; - std::unique_ptr _deployment; + std::unique_ptr _backend; + worker::Workers _workers; tcpserver::TCPServer _tcp_server; diff --git a/control-plane/src/application.cpp b/control-plane/src/application.cpp index 4ddb233..b6f7a2d 100644 --- a/control-plane/src/application.cpp +++ b/control-plane/src/application.cpp @@ -343,7 +343,12 @@ namespace praas::control_plane { auto iter = _active_processes.find(ptr->name()); if (iter != _active_processes.end()) { + + // If process failed during swapping, then notify client. + (*iter).second->swapped_callback(0, 0, "Process closed unexpectedly during swapping"); + _active_processes.erase(iter); + } else { // FIXME: check for processes allocated by the control plane diff --git a/control-plane/src/backend.cpp b/control-plane/src/backend.cpp index 466709f..890fe5b 100644 --- a/control-plane/src/backend.cpp +++ b/control-plane/src/backend.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #if defined(WITH_FARGATE_BACKEND) #include @@ -43,23 +44,27 @@ namespace praas::control_plane::backend { } } - std::unique_ptr Backend::construct(const config::Config& cfg) + std::unique_ptr Backend::construct(const config::Config& cfg, deployment::Deployment& deployment) { if (cfg.backend_type == Type::DOCKER) { - return std::make_unique(*dynamic_cast(cfg.backend.get() - )); + return std::make_unique( + *dynamic_cast(cfg.backend.get()), + deployment + ); } #if defined(WITH_FARGATE_BACKEND) if (cfg.backend_type == Type::AWS_FARGATE) { return std::make_unique( - *dynamic_cast(cfg.backend.get()) + *dynamic_cast(cfg.backend.get()), + deployment ); } #endif return nullptr; } - DockerBackend::DockerBackend(const config::BackendDocker& cfg) + DockerBackend::DockerBackend(const config::BackendDocker& cfg, deployment::Deployment& deployment): + Backend(deployment) { _logger = common::util::create_logger("LocalBackend"); @@ -92,6 +97,10 @@ namespace praas::control_plane::backend { body["swap-location"] = process->state().swap->path(process->name()); } + if(auto * ptr = dynamic_cast(&_deployment)) { + body["s3-swapping-bucket"] = ptr->s3_bucket; + } + _http_client.post( "/create", { @@ -185,7 +194,8 @@ namespace praas::control_plane::backend { } #if defined(WITH_FARGATE_BACKEND) - FargateBackend::FargateBackend(const config::BackendFargate& cfg) + FargateBackend::FargateBackend(const config::BackendFargate& cfg, deployment::Deployment& deployment): + Backend(deployment) { _logger = common::util::create_logger("FargateBackend"); @@ -308,6 +318,7 @@ namespace praas::control_plane::backend { std::function&&, std::optional)>&& callback ) { + // FIXME: swap bucket std::string cluster_name = _fargate_config["cluster_name"].asString(); Aws::ECS::Model::RunTaskRequest req; diff --git a/control-plane/src/config.cpp b/control-plane/src/config.cpp index cbd1d11..ee16049 100644 --- a/control-plane/src/config.cpp +++ b/control-plane/src/config.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -101,6 +102,16 @@ namespace praas::control_plane::config { fargate_config = ""; } + void AWSDeployment::load(cereal::JSONInputArchive& archive) + { + archive(CEREAL_NVP(s3_bucket)); + } + + void AWSDeployment::set_defaults() + { + s3_bucket = ""; + } + Config Config::deserialize(std::istream& in_stream) { Config cfg; @@ -142,6 +153,9 @@ namespace praas::control_plane::config { public_ip_address = "127.0.0.1"; http_client_io_threads = 1; + deployment_type = deployment::Type::LOCAL; + deployment = std::make_unique(); + http.set_defaults(); workers.set_defaults(); down_scaler.set_defaults(); @@ -168,6 +182,11 @@ namespace praas::control_plane::config { std::string deployment_type; archive(cereal::make_nvp("deployment-type", deployment_type)); this->deployment_type = deployment::deserialize(deployment_type); + if(this->deployment_type == deployment::Type::AWS) { + auto ptr = std::make_unique(); + common::util::cereal_load_optional(archive, "deployment", *ptr); + this->deployment = std::move(ptr); + } archive(cereal::make_nvp("ip-address", public_ip_address)); archive(cereal::make_nvp("http-client-io-threads", http_client_io_threads)); diff --git a/control-plane/src/deployment.cpp b/control-plane/src/deployment.cpp index ab22f06..75ee5f9 100644 --- a/control-plane/src/deployment.cpp +++ b/control-plane/src/deployment.cpp @@ -16,6 +16,18 @@ namespace praas::control_plane::state { return fmt::format("local://{}", app_name); } +#if defined(WITH_AWS_DEPLOYMENT) + std::string_view AWSS3SwapLocation::root_path() const + { + return "/"; + } + + std::string AWSS3SwapLocation::path(std::string process_name) const + { + return fmt::format("s3://{}", app_name); + } +#endif + } // namespace praas::control_plane::state namespace praas::control_plane::deployment { @@ -36,6 +48,15 @@ namespace praas::control_plane::deployment { { if (cfg.deployment_type == Type::LOCAL) { return std::make_unique(); + } else if (cfg.deployment_type == Type::AWS) { + + auto cfg_ptr = dynamic_cast(cfg.deployment.get()); + if(!cfg_ptr) { + spdlog::error("Wrong deployment config type!"); + return nullptr; + } + + return std::make_unique(cfg_ptr->s3_bucket); } spdlog::error("Unknown deployment type! {}", static_cast(cfg.deployment_type)); return nullptr; @@ -51,4 +72,16 @@ namespace praas::control_plane::deployment { spdlog::error("Deleting swap is not supported for disk operations."); } +#if defined(WITH_AWS_DEPLOYMENT) + std::unique_ptr AWS::get_location(std::string app_name) + { + return std::make_unique(app_name); + } + + void AWS::delete_swap(const state::SwapLocation&) + { + spdlog::error("Deleting swap is not yet supported!"); + } +#endif + } // namespace praas::control_plane::deployment diff --git a/control-plane/src/server.cpp b/control-plane/src/server.cpp index fb373da..14a6afd 100644 --- a/control-plane/src/server.cpp +++ b/control-plane/src/server.cpp @@ -15,12 +15,12 @@ extern void signal_handler(int); namespace praas::control_plane { - Server::Server(config::Config& cfg) - : _backend(backend::Backend::construct(cfg)), - _deployment(deployment::Deployment::construct(cfg)), - _workers(worker::Workers(cfg.workers, *_backend, *_deployment, _resources)), - _tcp_server(tcpserver::TCPServer(cfg.tcpserver, _workers)), - _http_server(std::make_shared(cfg.http, _workers)) + Server::Server(config::Config& cfg): + _deployment(deployment::Deployment::construct(cfg)), + _backend(backend::Backend::construct(cfg, *_deployment)), + _workers(worker::Workers(cfg.workers, *_backend, *_deployment, _resources)), + _tcp_server(tcpserver::TCPServer(cfg.tcpserver, _workers)), + _http_server(std::make_shared(cfg.http, _workers)) { _logger = common::util::create_logger("Server"); diff --git a/control-plane/tests/mocks.hpp b/control-plane/tests/mocks.hpp index cefd53d..fa49955 100644 --- a/control-plane/tests/mocks.hpp +++ b/control-plane/tests/mocks.hpp @@ -28,6 +28,10 @@ class MockBackendInstance : public backend::ProcessInstance { class MockBackend : public backend::Backend { public: + MockDeployment deployment; + + MockBackend() : backend::Backend(deployment) {} + MOCK_METHOD( void, allocate_process, (process::ProcessPtr, const process::Resources&, @@ -40,7 +44,7 @@ class MockBackend : public backend::Backend { process::ProcessObserver instance, std::function)>&& callback), () -); + ); MOCK_METHOD(double, max_memory, (), (const)); MOCK_METHOD(double, max_vcpus, (), (const)); }; diff --git a/process/CMakeLists.txt b/process/CMakeLists.txt index 072db5a..d59fddc 100644 --- a/process/CMakeLists.txt +++ b/process/CMakeLists.txt @@ -86,6 +86,13 @@ add_executable(process_exe controller/src/cli.cpp) set_target_properties(process_exe PROPERTIES RUNTIME_OUTPUT_DIRECTORY bin) target_link_libraries(process_exe PRIVATE controller_lib) +if(PRAAS_WITH_DEPLOYMENT_AWS) + + target_include_directories(controller_lib PRIVATE ${AWSSDK_INCLUDE_DIRS}) + target_link_libraries(controller_lib PRIVATE ${AWSSDK_LIBRARIES}) + +endif() + ### # C++ invoker ### diff --git a/process/controller/include/praas/process/controller/controller.hpp b/process/controller/include/praas/process/controller/controller.hpp index 5765f91..a930cf1 100644 --- a/process/controller/include/praas/process/controller/controller.hpp +++ b/process/controller/include/praas/process/controller/controller.hpp @@ -47,7 +47,7 @@ namespace praas::process { void shutdown_channels(); - void swap_in(const std::string& location); + bool swap_in(const std::string& location); void swap_out(const std::string& location); diff --git a/process/controller/include/praas/process/controller/swapper.hpp b/process/controller/include/praas/process/controller/swapper.hpp index e22803b..c24c62b 100644 --- a/process/controller/include/praas/process/controller/swapper.hpp +++ b/process/controller/include/praas/process/controller/swapper.hpp @@ -6,6 +6,8 @@ #include #include +#include +#include #include namespace praas::process::swapper { @@ -24,12 +26,38 @@ namespace praas::process::swapper { std::string swap_location; DiskSwapper(); - virtual ~DiskSwapper() = default; + ~DiskSwapper() override = default; - bool swap_in(const std::string& location, message::MessageStore & mailbox); + bool swap_in(const std::string& location, message::MessageStore & mailbox) override; size_t swap_out(const std::string& location, std::vector>& msgs) override; }; +#if defined(WITH_AWS_DEPLOYMENT) + + struct S3Swapper : Swapper + { + struct S3API + { + Aws::SDKOptions _s3_options; + + S3API(); + ~S3API(); + }; + + std::optional _s3_client; + std::string swap_bucket; + + static constexpr int MAX_CONNECTIONS = 64; + static constexpr std::string_view SWAPS_DIRECTORY = "swaps"; + + S3Swapper(std::string swap_bucket); + + bool swap_in(const std::string& location, message::MessageStore & mailbox) override; + size_t swap_out(const std::string& location, std::vector>& msgs) override; + + static std::optional api; + }; +#endif } // namespace praas::process::swapper diff --git a/process/controller/src/cli.cpp b/process/controller/src/cli.cpp index d4d31b2..1959c6d 100644 --- a/process/controller/src/cli.cpp +++ b/process/controller/src/cli.cpp @@ -28,6 +28,15 @@ int main(int argc, char** argv) praas::process::Controller controller{config}; instance = &controller; + char* swapin_loc = std::getenv("SWAPIN_LOCATION"); + if(swapin_loc) { + // TODO: consider in future lazy loading + if(!controller.swap_in(swapin_loc)) { + controller.shutdown(); + return 1; + } + } + praas::process::remote::TCPServer server{controller, config}; controller.set_remote(&server); if(config.control_plane_addr.has_value()) { @@ -36,12 +45,6 @@ int main(int argc, char** argv) server.poll(); } - char* swapin_loc = std::getenv("SWAPIN_LOCATION"); - if(swapin_loc) { - // TODO: consider in future lazy loading - controller.swap_in(swapin_loc); - } - controller.start(); spdlog::info("Process controller is closing down"); diff --git a/process/controller/src/controller.cpp b/process/controller/src/controller.cpp index 208fc82..c2a5612 100644 --- a/process/controller/src/controller.cpp +++ b/process/controller/src/controller.cpp @@ -259,7 +259,7 @@ namespace praas::process { } } - void Controller::swap_in(const std::string& location) + bool Controller::swap_in(const std::string& location) { spdlog::info("Request swapping in!"); @@ -280,12 +280,43 @@ namespace praas::process { "Swapped in finished in {} msec", std::chrono::duration_cast(end - begin).count() / 1000.0 ); + return true; } else { spdlog::error("Swapped in failed!"); + return false; + } + + } else if(location.starts_with("s3://")) { + + auto* ptr = std::getenv("S3_SWAPPING_BUCKET"); + if(!ptr) { + spdlog::error("Request S3 swap, but not configured!"); + return false; + } + + swapper = std::make_unique(ptr); + + auto loc = location.substr(std::string_view{"s3://"}.size()); + loc = std::filesystem::path{loc} / this->process_id(); + + auto begin = std::chrono::high_resolution_clock::now(); + auto success = swapper->swap_in(loc, _mailbox); + auto end = std::chrono::high_resolution_clock::now(); + + if(success) { + spdlog::info( + "Swapped in finished in {} msec", + std::chrono::duration_cast(end - begin).count() / 1000.0 + ); + return true; + } else { + spdlog::error("Swapped in failed!"); + return false; } } else { spdlog::error("unimplemented"); + return false; } } @@ -299,6 +330,9 @@ namespace praas::process { std::vector> msgs; _mailbox.all_state(msgs); + double duration = 0; + size_t size; + std::unique_ptr swapper; if(location.starts_with("local://")) { @@ -308,19 +342,46 @@ namespace praas::process { swapper = std::make_unique(); auto begin = std::chrono::high_resolution_clock::now(); - auto size = swapper->swap_out(loc, msgs); + size = swapper->swap_out(loc, msgs); auto end = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration_cast(end - begin).count() / 1000.0; - _server->swap_confirmation( - size, std::chrono::duration_cast(end - begin).count() / 1000.0 - ); + } else if(location.starts_with("s3://")) { - this->shutdown(); + auto* ptr = std::getenv("S3_SWAPPING_BUCKET"); + if(!ptr) { + spdlog::error("Request S3 swap, but not configured!"); + _server->swap_confirmation( + 0, 0 + ); + this->shutdown(); + return; + } + + swapper = std::make_unique(ptr); + + auto loc = location.substr(std::string_view{"s3://"}.size()); + loc = std::filesystem::path{loc} / this->process_id(); + + auto begin = std::chrono::high_resolution_clock::now(); + size = swapper->swap_out(loc, msgs); + auto end = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration_cast(end - begin).count() / 1000.0; + + spdlog::debug( + "Swapped to S3 finished in {} msec", duration + ); } else { spdlog::error("unimplemented"); abort(); } + + _server->swap_confirmation( + size, duration + ); + + this->shutdown(); } void Controller::_process_external_message(ExternalMessage& msg) diff --git a/process/controller/src/swapper.cpp b/process/controller/src/swapper.cpp index 79c90c4..4c8f78c 100644 --- a/process/controller/src/swapper.cpp +++ b/process/controller/src/swapper.cpp @@ -1,13 +1,21 @@ #include +#include #include #include #include +#include +#include +#include +#include +#include + namespace fs = std::filesystem; namespace praas::process::swapper { - using std::ifstream; + + std::optional S3Swapper::api; template size_t directory_recursive(const fs::path& source, const fs::path& destination, F && f) @@ -38,13 +46,36 @@ namespace praas::process::swapper { return total_size; } + template + size_t directory_recursive(const fs::path& source, F && f) + { + size_t total_size = 0; + + if (!fs::exists(source) || !fs::is_directory(source)) { + return 0; + } + + for (const auto& entry : fs::directory_iterator(source)) { + + const auto& path = entry.path(); + + if (fs::is_directory(path)) { + total_size += directory_recursive(path, std::forward(f)); + } else if (fs::is_regular_file(path)) { + f(path); + total_size += fs::file_size(path); + } + } + + return total_size; + } + DiskSwapper::DiskSwapper() { auto *ptr = std::getenv("SWAPS_LOCATION"); if(ptr == nullptr) { spdlog::error("Disk swapper created but no location specified!"); } - swap_location = std::string{ptr}; } template @@ -92,10 +123,10 @@ namespace praas::process::swapper { fs::path full_path = fs::path{swap_location} / location / "state"; if(fs::exists(full_path)) { - spdlog::info("Reading state swap data from {}", full_path.string()); + spdlog::debug("Reading state swap data from {}", full_path.string()); success = _process_swapin_dir(full_path, [&mailbox](const std::string& key, runtime::internal::Buffer && data) mutable { - spdlog::info("Put state {}, data size{}", key, data.len); + spdlog::debug("Put state {}, data size{}", key, data.len); mailbox.state(key, data); return true; } @@ -108,7 +139,7 @@ namespace praas::process::swapper { full_path = fs::path{swap_location} / location / "messages"; if(fs::exists(full_path)) { - spdlog::info("Reading messages swap data from {}", full_path.string()); + spdlog::debug("Reading messages swap data from {}", full_path.string()); success = _process_swapin_dir(full_path, [&mailbox](const std::string& key, runtime::internal::Buffer && data) mutable { @@ -118,7 +149,7 @@ namespace praas::process::swapper { return false; } - spdlog::info("Put message {}, data size{}", key, data.len); + spdlog::debug("Put message {}, data size{}", key, data.len); mailbox.put(key.substr(0, pos), key.substr(pos + 1), data); return true; @@ -135,7 +166,7 @@ namespace praas::process::swapper { [](auto & src, auto & dest) { fs::copy_file(src, dest, fs::copy_options::overwrite_existing); - spdlog::info("Copied state file from {} to {}", src.string(), dest.string()); + spdlog::debug("Copied state file from {} to {}", src.string(), dest.string()); } ); @@ -182,11 +213,261 @@ namespace praas::process::swapper { [](auto & src, auto & dest) { fs::copy_file(src, dest, fs::copy_options::overwrite_existing); - spdlog::info("Copied state file from {} to {}", src.string(), dest.string()); + spdlog::debug("Copied state file from {} to {}", src.string(), dest.string()); + + } + ); + + return total_size; + } + + S3Swapper::S3API::S3API() + { + Aws::InitAPI(_s3_options); + } + + S3Swapper::S3API::~S3API() + { + Aws::ShutdownAPI(_s3_options); + } + + S3Swapper::S3Swapper(std::string swap_bucket): + swap_bucket(std::move(swap_bucket)) + { + if(!api.has_value()) { + api.emplace(); + } + + // https://github.com/aws/aws-sdk-cpp/issues/1410 + putenv("AWS_EC2_METADATA_DISABLED=true"); + Aws::Client::ClientConfiguration client_cfg("default", true); + client_cfg.maxConnections = MAX_CONNECTIONS; + _s3_client.emplace(client_cfg); + } + + // https://stackoverflow.com/questions/21073655/c-how-do-i-ignore-the-first-directory-path-when-comparing-paths-in-boostfil + inline fs::path strip(fs::path p) + { + p = p.relative_path(); + if (p.empty()) return {}; + return p.lexically_relative(*p.begin()); + } + + struct CustomContext : public Aws::Client::AsyncCallerContext + { + CustomContext(size_t size): + data(new char[size], size), + streambuf(reinterpret_cast(data.data()), size) + { + data.len = size; + } + + runtime::internal::Buffer data; + Aws::Utils::Stream::PreallocatedStreamBuf streambuf; + }; + + template + size_t find_nth(const std::string& input, char pattern, int start_pos = 0) + { + if constexpr (N <= 0) { + return start_pos; + } + + size_t pos = input.find(pattern, start_pos); + if(pos == std::string::npos) return pos; + return find_nth(input, pattern, pos + 1); + } + + template<> + size_t find_nth<0>(const std::string& input, char pattern, int start_pos) + { + return start_pos; + } + + bool S3Swapper::swap_in(const std::string& location, message::MessageStore & mailbox) + { + size_t total_size = 0; + fs::path full_path = fs::path{SWAPS_DIRECTORY} / location; + + Aws::S3::Model::ListObjectsV2Request list_request; + list_request.SetBucket(swap_bucket); + list_request.SetPrefix(full_path.string()); + int requests = 0, requests_total = 0; + std::condition_variable cv; + std::mutex mutex; + + auto callback = [&requests, &cv, &mutex]( + const Aws::S3::S3Client* client, const Aws::S3::Model::GetObjectRequest& req, + const Aws::S3::Model::GetObjectOutcome& outcome, const std::shared_ptr& + ) { + + if (outcome.IsSuccess()) { + spdlog::debug("Successfully downloaded object {}", req.GetKey()); + + std::lock_guard lock(mutex); + requests++; + cv.notify_one(); + } else { + spdlog::error("Error downloading object {}, error {}", req.GetKey(), outcome.GetError().GetMessage()); + } + }; + + while(true) { + + auto result = _s3_client->ListObjectsV2(list_request); + + if(!result.IsSuccess()) { + break; + } + + const auto& objects = result.GetResult().GetContents(); + for (const auto& object : objects) { + + spdlog::debug(object.GetKey()); + size_t size = object.GetSize(); + std::string key_path{object.GetKey()}; + + // strip swaps/app/proc + auto pos = find_nth<3>(key_path, '/'); + auto next_pos = key_path.find('/', pos + 1); + std::string type = key_path.substr(pos, next_pos - pos); + auto actual_key = key_path.substr(next_pos + 1, key_path.size() - next_pos); + + if(type == "state" || type == "messages") { + + auto ctx = Aws::MakeShared("whydoineedtag", size); + + Aws::S3::Model::GetObjectRequest req_ptr; + req_ptr.SetBucket(swap_bucket); + req_ptr.SetKey(object.GetKey()); + req_ptr.SetResponseStreamFactory([ctx]() { return Aws::New("", &ctx->streambuf); }); + + if(type == "state") { + spdlog::debug("Put state {}, data size{}", actual_key, ctx->data.len); + mailbox.state(actual_key, ctx->data); + } else { + + size_t pos = actual_key.find('('); + if(pos == std::string::npos) { + spdlog::error("Couldn't parse the message key {}", actual_key); + continue; + } + + spdlog::debug("Put message {}, data size {}", actual_key, ctx->data.len); + mailbox.put(actual_key.substr(0, pos), actual_key.substr(pos + 1), ctx->data); + } + + _s3_client->GetObjectAsync(req_ptr, callback, std::move(ctx)); + requests_total++; + } else if (type == "files") { + + fs::path path = fs::path{FILES_DIRECTORY} / actual_key; + fs::create_directories(path.parent_path()); + + Aws::S3::Model::GetObjectRequest req_ptr; + req_ptr.SetBucket(swap_bucket); + req_ptr.SetKey(object.GetKey()); + req_ptr.SetResponseStreamFactory([=]() { + return Aws::New("anothertag", path.string().c_str(), std::ios_base::out | std::ios_base::binary); + }); + + _s3_client->GetObjectAsync(req_ptr, callback); + requests_total++; + } else { + spdlog::error("Unknown type of swap data: {}", type); + } + + } + + if (result.GetResult().GetIsTruncated()) { + list_request.SetContinuationToken(result.GetResult().GetNextContinuationToken()); + } else { + break; + } + } + + std::unique_lock lock(mutex); + cv.wait(lock, [&requests, requests_total]() { return requests == requests_total; }); + + spdlog::debug("Swap in finished!"); + + return true; + } + + size_t S3Swapper::swap_out(const std::string& location, std::vector>& msgs) + { + size_t total_size = 0; + fs::path full_path = fs::path{SWAPS_DIRECTORY} / location; + + int requests = 0, requests_min = msgs.size(), requests_total = msgs.size(); + std::condition_variable cv; + std::mutex mutex; + + auto callback = [&requests, requests_min, &cv, &mutex]( + const Aws::S3::S3Client* client, const Aws::S3::Model::PutObjectRequest& req, + const Aws::S3::Model::PutObjectOutcome& outcome, const std::shared_ptr& + ) { + + if (outcome.IsSuccess()) { + spdlog::debug("Successfully uploaded object {}", req.GetKey()); + } else { + spdlog::error("Error uploading object {}, error {}", req.GetKey(), outcome.GetError().GetMessage()); + } + + { + std::lock_guard lock(mutex); + requests++; + if(requests >= requests_min) { + cv.notify_one(); + } + } + }; + + auto make_request = [this, &callback](const fs::path& new_path, std::shared_ptr&& ptr) + { + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(swap_bucket); + req.SetKey(new_path); + req.SetBody(ptr); + + _s3_client->PutObjectAsync(req, callback); + }; + + for(const auto& [key, message] : msgs) { + + fs::path new_path; + if(message.source.empty()) { + new_path = full_path / "state" / key; + } else { + new_path = full_path / "messages" / fmt::format("{}({}", key, message.source); + } + spdlog::debug("Swapping out to location {}", new_path.string()); + + std::shared_ptr input_data = std::make_shared(message.data.data(), message.data.len); + make_request(new_path, std::move(input_data)); + + total_size += message.data.len; + } + + full_path = fs::path{SWAPS_DIRECTORY} / location / "files"; + total_size += directory_recursive( + FILES_DIRECTORY, + [&full_path, &requests_total, &make_request](auto & src) { + + fs::path new_path = full_path / strip(src); + spdlog::debug("Swapping out to location {}", new_path.string()); + + std::shared_ptr input_data = Aws::MakeShared("SampleAllocationTag", src.c_str(), std::ios_base::in | std::ios_base::binary); + make_request(new_path, std::move(input_data)); + + requests_total++; } ); + std::unique_lock lock(mutex); + cv.wait(lock, [&requests, requests_total]() { return requests == requests_total; }); + return total_size; } diff --git a/sdk/include/praas/sdk/praas.hpp b/sdk/include/praas/sdk/praas.hpp index 12c7eee..3204135 100644 --- a/sdk/include/praas/sdk/praas.hpp +++ b/sdk/include/praas/sdk/praas.hpp @@ -29,7 +29,7 @@ namespace praas::sdk { const std::string& invocation_data ); - std::optional swap_process(const Process& process); + std::tuple swap_process(const Process& process); std::optional swapin_process( const std::string& application, const std::string& process_name diff --git a/sdk/src/praas.cpp b/sdk/src/praas.cpp index 51f135e..b3ccfaa 100644 --- a/sdk/src/praas.cpp +++ b/sdk/src/praas.cpp @@ -165,22 +165,22 @@ namespace praas::sdk { return res; } - std::optional PraaS::swap_process(const Process& process) + std::tuple PraaS::swap_process(const Process& process) { auto req = drogon::HttpRequest::newHttpRequest(); req->setMethod(drogon::Post); req->setPath(fmt::format("/apps/{}/processes/{}/swap", process.app_name, process.process_id)); - std::promise> p; + std::promise> p; _http_client->sendRequest( req, [&](drogon::ReqResult result, const drogon::HttpResponsePtr& response) { if (result == drogon::ReqResult::Ok && response->getStatusCode() == drogon::k200OK) { - p.set_value(std::make_optional(response->getBody())); + p.set_value(std::make_tuple(true, std::string{response->getBody()})); } else { - p.set_value(std::nullopt); + p.set_value(std::make_tuple(false, std::string{response->getBody()})); } } ); diff --git a/serving/src/server.cpp b/serving/src/server.cpp index 1cef333..898f1a7 100644 --- a/serving/src/server.cpp +++ b/serving/src/server.cpp @@ -158,7 +158,8 @@ namespace praas::serving::docker { port_bindings[fmt::format("{}/tcp", opts.process_port)] = bind_ports; Json::Value host_config; host_config["PortBindings"] = port_bindings; - host_config["AutoRemove"] = true; + //host_config["AutoRemove"] = true; + host_config["AutoRemove"] = false; //host_config["NetworkMode"] = "bridge"; Json::Value extra_hosts; @@ -195,10 +196,12 @@ namespace praas::serving::docker { auto container_name_obj = (*req_body)["container-name"]; auto controlplane_addr_obj = (*req_body)["controlplane-address"]; auto swapinlocation = (*req_body)["swap-location"]; + auto s3_bucket = (*req_body)["s3-swapping-bucket"]; if (container_name_obj.isNull() || controlplane_addr_obj.isNull()) { callback(common::http::HTTPClient::failed_response("Missing arguments in request body!")); return; } + std::string container_name = container_name_obj.asString(); std::string controlplane_addr = controlplane_addr_obj.asString(); @@ -220,6 +223,17 @@ namespace praas::serving::docker { if(!swapinlocation.isNull()) { env_data.append(fmt::format("SWAPIN_LOCATION={}", swapinlocation.asString())); } + if(!s3_bucket.isNull()) { + env_data.append(fmt::format("S3_SWAPPING_BUCKET={}", s3_bucket.asString())); + } + + char* access_key = std::getenv("AWS_ACCESS_KEY_ID"); + char* secret_key = std::getenv("AWS_SECRET_ACCESS_KEY"); + if((access_key != nullptr) && (secret_key != nullptr)) { + env_data.append(fmt::format("AWS_ACCESS_KEY_ID={}", access_key)); + env_data.append(fmt::format("AWS_SECRET_ACCESS_KEY={}", secret_key)); + } + body["Env"] = env_data; _http_client.post( diff --git a/tests/integration/state_swap.cpp b/tests/integration/state_swap.cpp index 9ff3983..1f93511 100644 --- a/tests/integration/state_swap.cpp +++ b/tests/integration/state_swap.cpp @@ -77,8 +77,8 @@ TEST_F(IntegrationLocalInvocation, AllocationInvoke) auto invoc = proc->invoke("state-put", "invocation-id", res.data(), res.size()); ASSERT_EQ(invoc.return_code, 0); - auto swap_res = praas.swap_process(proc.value()); - ASSERT_TRUE(swap_res.has_value()); + auto [swap_res, swap_msg] = praas.swap_process(proc.value()); + ASSERT_TRUE(swap_res); auto new_proc = praas.swapin_process(app_name, "alloc_invoc_process"); ASSERT_TRUE(new_proc.has_value()); @@ -90,11 +90,14 @@ TEST_F(IntegrationLocalInvocation, AllocationInvoke) auto res2 = get_output_binary(invoc.payload.get(), invoc.payload_len); ASSERT_EQ(msg.msg, res2); - swap_res = praas.swap_process(new_proc.value()); - ASSERT_TRUE(swap_res.has_value()); + std::tie(swap_res, swap_msg) = praas.swap_process(new_proc.value()); + ASSERT_TRUE(swap_res); ASSERT_TRUE(praas.delete_process(new_proc.value())); + auto failed_proc = praas.swapin_process(app_name, "alloc_invoc_process"); + ASSERT_FALSE(failed_proc.has_value()); + ASSERT_TRUE(praas.delete_application(app_name)); } }