Skip to content

Commit

Permalink
[process] [control-plane] [serving] Import disk swapping code from pr…
Browse files Browse the repository at this point in the history
…evious prototype and adjust to new APIs
  • Loading branch information
mcopik committed Oct 14, 2024
1 parent 4c35662 commit 7892974
Show file tree
Hide file tree
Showing 43 changed files with 501 additions and 118 deletions.
25 changes: 19 additions & 6 deletions common/include/praas/common/messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace praas::common::message {
// 34 bytes

// Swap request message
// 2 bytes of identiifer
// 4 bytes of length of swap path
// 2 bytes of identifier
// 40 bytes of length of swap path - prefix + app name
// Message is followed by the swap path

// Swap confirmation
Expand Down Expand Up @@ -277,7 +277,7 @@ namespace praas::common::message {
SwapRequest(Data&& data = Data())
// NOLINTNEXTLINE
: Parent(std::forward<Data>(data), MessageType::SWAP_REQUEST),
path_len(strnlen(reinterpret_cast<const char*>(this->data()), MessageConfig::NAME_LENGTH))
path_len(strnlen(reinterpret_cast<const char*>(this->data()), MessageConfig::NAME_LENGTH + 8))
{
}

Expand All @@ -294,14 +294,15 @@ namespace praas::common::message {

void path(std::string_view path)
{
if (path.length() > MessageConfig::ID_LENGTH) {
constexpr int max_size = MessageConfig::NAME_LENGTH + 8;
if (path.length() > max_size) {
throw common::InvalidArgument{fmt::format(
"Swap location ID too long: {} > {}", path.length(), MessageConfig::ID_LENGTH
"Swap location ID too long: {} > {}", path.length(), max_size
)};
}
std::strncpy(
// NOLINTNEXTLINE
reinterpret_cast<char*>(data()), path.data(), MessageConfig::ID_LENGTH
reinterpret_cast<char*>(data()), path.data(), max_size
);
path_len = path.length();
}
Expand All @@ -325,6 +326,18 @@ namespace praas::common::message {
{
}

double swap_time() const
{
// NOLINTNEXTLINE
return *reinterpret_cast<const double*>(data() + 4);
}

void swap_time(double time)
{
// NOLINTNEXTLINE
*reinterpret_cast<double*>(data() + 4) = time;
}

int32_t swap_size() const
{
// NOLINTNEXTLINE
Expand Down
2 changes: 1 addition & 1 deletion common/tests/unit/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TEST(Messages, SwapRequestMsg)

TEST(Messages, SwapRequestMsgIncorrect)
{
std::string swap_loc(MessageConfig::ID_LENGTH + 1, 't');
std::string swap_loc(MessageConfig::NAME_LENGTH + 10, 't');

SwapRequest<MessageData> req;
EXPECT_THROW(req.path(swap_loc), praas::common::InvalidArgument);
Expand Down
7 changes: 5 additions & 2 deletions control-plane/include/praas/control-plane/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,12 @@ namespace praas::control_plane {
////////////////////////////////////////////////////////////////////////////////
void update_controlplane_process(const std::string& name);

void swap_process(std::string process_name, deployment::Deployment& deployment);
void swap_process(
std::string process_name, deployment::Deployment& deployment,
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
);

void swapped_process(std::string process_name);
void swapped_process(std::string process_name, size_t size, double time);

void closed_process(const process::ProcessPtr& ptr);

Expand Down
23 changes: 15 additions & 8 deletions control-plane/include/praas/control-plane/deployment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ namespace praas::control_plane::config {
namespace praas::control_plane::state {

struct DiskSwapLocation : public SwapLocation {
std::filesystem::path fs_path;

DiskSwapLocation(std::filesystem::path path) : SwapLocation(), fs_path(path) {}
std::string app_name;

std::string_view root_path() const;
std::string path(std::string process_name) const;
DiskSwapLocation(std::string app_name):
app_name(std::move(app_name))
{}

std::string_view root_path() const override;
std::string path(std::string process_name) const override;
};

#if defined(WITH_AWS_DEPLOYMENT)
class AWSS3SwapLocation : SwapLocation {};
class RedisSwapLocation : SwapLocation {};
#endif

} // namespace praas::control_plane::state
Expand All @@ -40,11 +44,14 @@ namespace praas::control_plane::deployment {
#if defined(WITH_AWS_DEPLOYMENT)
AWS,
#endif
REDIS
};

Type deserialize(std::string mode);

class Deployment {
public:
virtual std::unique_ptr<state::SwapLocation> get_location(std::string process_name) = 0;
virtual std::unique_ptr<state::SwapLocation> get_location(std::string app_name) = 0;
virtual void delete_swap(const state::SwapLocation&) = 0;

static std::unique_ptr<Deployment> construct(const config::Config& cfg);
Expand All @@ -55,11 +62,11 @@ namespace praas::control_plane::deployment {
Local() = default;

// FIXME remove the fs path; we no longer need
Local(std::string fs_path) : _path(fs_path) {}
Local(std::string fs_path) : _path(std::move(fs_path)) {}

std::unique_ptr<state::SwapLocation> get_location(std::string process_name) override;
std::unique_ptr<state::SwapLocation> get_location(std::string app_name) override;

void delete_swap(const state::SwapLocation&) override;
void delete_swap(const state::SwapLocation& /*unused*/) override;

private:
std::filesystem::path _path;
Expand Down
6 changes: 5 additions & 1 deletion control-plane/include/praas/control-plane/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ namespace praas::control_plane::process {
*/
write_lock_t write_lock() const;

void swap();
void swap(std::function<void(size_t, double, const std::optional<std::string>&)>&& );

// Modify the map of invocations.
void add_invocation(
Expand All @@ -202,6 +202,8 @@ namespace praas::control_plane::process {

void created_callback(const std::optional<std::string>& error_msg);

void swapped_callback(size_t size, double time, const std::optional<std::string>& error_msg);

private:
void _send_invocation(Invocation&);

Expand All @@ -223,6 +225,8 @@ namespace praas::control_plane::process {

std::function<void(std::shared_ptr<Process>, std::optional<std::string>)> _creation_callback{};

std::function<void(size_t, double, const std::optional<std::string>&)> _swapping_callback{};

// Application reference does not change throughout process lifetime.

Application* _application;
Expand Down
5 changes: 4 additions & 1 deletion control-plane/include/praas/control-plane/tcpserver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ namespace praas::control_plane::tcpserver {

// Calls to process to finish and swap.
// Needs to call the application to handle the change of process state.
void handle_swap(const process::ProcessPtr& ptr);
void handle_swap(
const process::ProcessPtr& process_ptr,
const common::message::SwapConfirmationPtr& msg
);

// Update data plane metrics of a process
// Requires write access to this process component.
Expand Down
6 changes: 4 additions & 2 deletions control-plane/include/praas/control-plane/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ namespace praas::control_plane::worker {
/// @param[in] proc_id new process name
/// @return error message if operation failed; empty optional otherwise
////////////////////////////////////////////////////////////////////////////////
std::optional<std::string>
swap_process(const std::string& app_name, const std::string& proc_id);
void swap_process(
const std::string& app_name, const std::string& proc_id,
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
);
// const process::ProcessPtr& ptr, state::SwapLocation& swap_loc);

////////////////////////////////////////////////////////////////////////////////
Expand Down
15 changes: 11 additions & 4 deletions control-plane/src/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ namespace praas::control_plane {
}
}

void Application::swap_process(std::string process_name, deployment::Deployment& deployment)
void Application::swap_process(
std::string process_name,
deployment::Deployment& deployment,
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
)
{
if (process_name.length() == 0) {
throw praas::common::InvalidConfigurationError("Application name cannot be empty");
Expand Down Expand Up @@ -225,12 +229,12 @@ namespace praas::control_plane {
proc.set_status(process::Status::SWAPPING_OUT);

// Swap the process
proc.state().swap = deployment.get_location(process_name);
proc.state().swap = deployment.get_location(_name);

proc.swap();
proc.swap(std::move(callback));
}

void Application::swapped_process(std::string process_name)
void Application::swapped_process(std::string process_name, size_t size, double time)
{
// Modify internal collections
write_lock_t application_lock(_active_mutex);
Expand All @@ -244,6 +248,7 @@ namespace praas::control_plane {
auto proc_lock = proc.write_lock();

if (proc.status() != process::Status::SWAPPING_OUT) {
proc.swapped_callback(0, 0, "Cannot confirm a swap of non-swapping process");
throw praas::common::InvalidProcessState("Cannot confirm a swap of non-swapping process");
}

Expand All @@ -253,6 +258,8 @@ namespace praas::control_plane {

proc.set_status(process::Status::SWAPPED_OUT);

proc.swapped_callback(size, time, std::nullopt);

// Insert into swapped
write_lock_t swapped_lock(_swapped_mutex);
_swapped_processes.insert(std::move(nh));
Expand Down
4 changes: 4 additions & 0 deletions control-plane/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ namespace praas::control_plane::config {
this->backend = std::move(ptr);
}

std::string deployment_type;
archive(cereal::make_nvp("deployment-type", deployment_type));
this->deployment_type = deployment::deserialize(deployment_type);

archive(cereal::make_nvp("ip-address", public_ip_address));
archive(cereal::make_nvp("http-client-io-threads", http_client_io_threads));

Expand Down
25 changes: 19 additions & 6 deletions control-plane/src/deployment.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <praas/common/exceptions.hpp>
#include <praas/control-plane/config.hpp>
#include <praas/control-plane/deployment.hpp>

Expand All @@ -7,35 +8,47 @@ namespace praas::control_plane::state {

std::string_view DiskSwapLocation::root_path() const
{
return fs_path.c_str();
return "/";
}

std::string DiskSwapLocation::path(std::string process_name) const
{
return (fs_path / "swaps" / process_name).c_str();
return fmt::format("local://{}", app_name);
}

} // namespace praas::control_plane::state

namespace praas::control_plane::deployment {

Type deserialize(std::string mode)
{
if (mode == "local") {
return Type::LOCAL;
} else if (mode == "aws") {
return Type::AWS;
} else if (mode == "redis") {
return Type::REDIS;
}
throw common::PraaSException{"Unknown"};
}

std::unique_ptr<Deployment> Deployment::construct(const config::Config& cfg)
{
if (cfg.deployment_type == Type::LOCAL) {
return std::make_unique<Local>();
}
spdlog::error("Unknown deployment type! {}", static_cast<int>(cfg.deployment_type));
return nullptr;
}

std::unique_ptr<state::SwapLocation> Local::get_location(std::string process_name)
std::unique_ptr<state::SwapLocation> Local::get_location(std::string app_name)
{
return std::make_unique<state::DiskSwapLocation>(_path);
return std::make_unique<state::DiskSwapLocation>(app_name);
}

void Local::delete_swap(const state::SwapLocation&)
{
// FIXME: warning log
// We cannot remove swaps on other machines.
spdlog::error("Deleting swap is not supported for disk operations.");
}

} // namespace praas::control_plane::deployment
30 changes: 22 additions & 8 deletions control-plane/src/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ namespace praas::control_plane {
const std::string& process_name
)
{
_logger->info("Stop process {}", process_name);
_logger->info("Stop and swap process {}", process_name);
_workers.add_task(
&worker::Workers::stop_process, app_name, process_name,
[process_name, callback = std::move(callback)](const std::optional<std::string>& response) {
Expand Down Expand Up @@ -201,14 +201,24 @@ namespace praas::control_plane {
)
{
_logger->info("Swap process {}", process_name);
_workers.add_task([=, this, callback = std::move(callback)]() {
auto response = _workers.swap_process(app_name, process_name);
if (response) {
callback(failed_response(response.value(), drogon::HttpStatusCode::k400BadRequest));
} else {
callback(correct_response(fmt::format("Request swapping of process {}.", process_name)));
auto begin = std::chrono::high_resolution_clock::now();
_workers.add_task(
&worker::Workers::swap_process, app_name, process_name,
[begin, callback = std::move(callback)](
size_t size, double time, const std::optional<std::string>& response
) {

auto end = std::chrono::high_resolution_clock::now();
auto dur = std::chrono::duration_cast<std::chrono::microseconds>(end-begin).count() / 1000.0;
if (response) {
callback(failed_response(response.value(), drogon::HttpStatusCode::k400BadRequest));
} else {
callback(correct_response(
fmt::format("Swapped {} bytes, took: {} ms, total time: {} ms.", size, time, dur)
));
}
}
});
);
}

void HttpServer::list_processes(
Expand All @@ -234,6 +244,8 @@ namespace praas::control_plane {
active.append(proc_name);
}
json["active"] = active;
} else {
json["active"] = Json::Value{};
}

if (!swapped_processes.empty()) {
Expand All @@ -242,6 +254,8 @@ namespace praas::control_plane {
swapped.append(proc_name);
}
json["swapped"] = swapped;
} else {
json["swapped"] = Json::Value{};
}

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
Expand Down
Loading

0 comments on commit 7892974

Please sign in to comment.