Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aiming to improve readability of PArSEC (EVM) transaction flow #239

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 97 additions & 94 deletions src/parsec/agent/impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ namespace cbdc::parsec::agent {
switch(m_state) {
// In these states we can start again from the beginning
case state::init:
case state::begin_sent:
case state::begin_failed:
case state::ticket_number_request_sent:
case state::ticket_number_request_failed:
break;

// We already have a ticket number but need to start again
case state::rollback_complete:
m_result = std::nullopt;
m_wounded = false;
m_restarted = true;
do_start();
do_start_function();
return true;

// Re-run commit
Expand Down Expand Up @@ -92,35 +92,38 @@ namespace cbdc::parsec::agent {
}

m_result = std::nullopt;
m_state = state::begin_sent;
auto success = m_broker->begin(
m_state = state::ticket_number_request_sent;
auto success = m_broker->get_new_ticket_number(
[&](broker::interface::ticketnum_or_errcode_type res) {
handle_begin(res);
handle_new_ticket_number(res);
});

if(!success) {
m_state = state::begin_failed;
m_log->error("Failed to contact broker to begin");
m_state = state::ticket_number_request_failed;
m_log->error(
"Failed to contact broker to get a new ticket number");
m_result = error_code::broker_unreachable;
do_result();
}

return true;
}

void impl::handle_begin(broker::interface::ticketnum_or_errcode_type res) {
void impl::handle_new_ticket_number(
broker::interface::ticketnum_or_errcode_type res) {
std::unique_lock l(m_mut);
if(m_state != state::begin_sent) {
m_log->warn("handle_begin while not in begin_sent state");
if(m_state != state::ticket_number_request_sent) {
m_log->warn("handle_new_ticket_number while not in "
"ticket_number_request_sent state");
return;
}
std::visit(
overloaded{[&](const ticket_machine::ticket_number_type& n) {
m_ticket_number = n;
do_start();
do_start_function();
},
[&](const broker::interface::error_code& /* e */) {
m_state = state::begin_failed;
m_state = state::ticket_number_request_failed;
m_log->error(
"Broker failed to assign a ticket number");
m_result = error_code::ticket_number_assignment;
Expand All @@ -129,10 +132,10 @@ namespace cbdc::parsec::agent {
res);
}

void impl::do_start() {
void impl::do_start_function() {
std::unique_lock l(m_mut);
assert(m_ticket_number.has_value());
assert(m_state == state::begin_sent
assert(m_state == state::ticket_number_request_sent
|| m_state == state::rollback_complete);
m_state = state::function_get_sent;

Expand All @@ -149,7 +152,7 @@ namespace cbdc::parsec::agent {
// transaction as m_param and let the runner figure it out.
handle_function(broker::value_type(get_function()));
} else {
m_log->trace("do_start ", get_function().to_hex());
m_log->trace("do_start_function ", get_function().to_hex());

auto tl_success = m_broker->try_lock(
m_ticket_number.value(),
Expand Down Expand Up @@ -237,88 +240,85 @@ namespace cbdc::parsec::agent {
});
}

void
impl::handle_function(const broker::interface::try_lock_return_type& res) {
void impl::handle_function(
const broker::interface::try_lock_return_type& res_variant) {
std::unique_lock l(m_mut);
if(m_state != state::function_get_sent) {
m_log->warn(
"handle_function while not in function_get_sent state");
return;
}
std::visit(
overloaded{
[&](const broker::value_type& v) {
m_state = state::function_started;
auto reacq_locks
= std::make_shared<broker::held_locks_set_type>();
(*reacq_locks).swap(m_requested_locks);

if(reacq_locks->empty()) {
do_runner(v);
return;
}

// Re-acquire previously held locks upon retries
// immediately
m_log->trace("Re-acquiring locks for",
m_ticket_number.value());
auto reacquired = std::make_shared<std::atomic<size_t>>();
for(auto& it : *reacq_locks) {
m_log->trace("Re-acquiring lock on",
it.first.to_hex(),
"type",
static_cast<int>(it.second),
"for",

if(std::holds_alternative<broker::value_type>(res_variant)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find this easier to read with std::visit, as it makes clear that all possible variant types are covered

const auto& v = std::get<broker::value_type>(res_variant);
m_state = state::function_started;
auto reacq_locks = std::make_shared<broker::held_locks_set_type>();
(*reacq_locks).swap(m_requested_locks);

if(reacq_locks->empty()) {
do_runner(v);
return;
}

// Re-acquire previously held locks upon retries
// immediately
m_log->trace("Re-acquiring locks for", m_ticket_number.value());
auto reacquired = std::make_shared<std::atomic<size_t>>();
for(auto& it : *reacq_locks) {
m_log->trace("Re-acquiring lock on",
it.first.to_hex(),
"type",
static_cast<int>(it.second),
"for",
m_ticket_number.value());
auto success = do_try_lock_request(
it.first,
it.second,
[this, reacquired, v, reacq_locks](
const broker::interface::try_lock_return_type&) {
std::unique_lock ll(m_mut);
auto reacq = (*reacquired)++;
m_log->trace("Re-acquired",
reacq + 1,
"of",
reacq_locks->size(),
"locks for",
m_ticket_number.value());
auto success = do_try_lock_request(
it.first,
it.second,
[this, reacquired, v, reacq_locks](
const broker::interface::
try_lock_return_type&) {
std::unique_lock ll(m_mut);
auto reacq = (*reacquired)++;
m_log->trace("Re-acquired",
reacq + 1,
"of",
reacq_locks->size(),
"locks for",
m_ticket_number.value());

if(reacq + 1 == reacq_locks->size()) {
do_runner(v);
}
});
if(!success) {
m_log->error("Try lock request failed for",
m_ticket_number.value());
m_state = state::function_get_failed;
m_result = error_code::function_retrieval;
do_result();
return;

if(reacq + 1 == reacq_locks->size()) {
do_runner(v);
}
}
},
[&](broker::interface::error_code /* e */) {
});
if(!success) {
m_log->error("Try lock request failed for",
m_ticket_number.value());
m_state = state::function_get_failed;
m_log->error("Failed to retrieve function");
m_result = error_code::function_retrieval;
do_result();
},
[&](const runtime_locking_shard::shard_error& e) {
if(e.m_error_code
== runtime_locking_shard::error_code::wounded) {
m_state = state::function_get_failed;
m_log->trace("Shard wounded ticket while "
"retrieving function");
} else {
m_state = state::function_get_error;
m_log->error("Shard error retrieving function");
}
m_result = error_code::function_retrieval;
do_result();
}},
res);
return;
}
}
} else if(std::holds_alternative<broker::interface::error_code>(
res_variant)) {
m_state = state::function_get_failed;
m_log->error("Failed to retrieve function");
m_result = error_code::function_retrieval;
do_result();
} else if(std::holds_alternative<runtime_locking_shard::shard_error>(
res_variant)) {
const auto& e
= std::get<runtime_locking_shard::shard_error>(res_variant);
if(e.m_error_code == runtime_locking_shard::error_code::wounded) {
m_state = state::function_get_failed;
m_log->trace("Shard wounded ticket while "
"retrieving function");
} else {
m_state = state::function_get_error;
m_log->error("Shard error retrieving function");
}
m_result = error_code::function_retrieval;
do_result();
}
}

void impl::do_runner(broker::value_type v) {
Expand All @@ -329,7 +329,7 @@ namespace cbdc::parsec::agent {
get_param(),
m_is_readonly_run,
[this](const runner::interface::run_return_type& run_res) {
handle_run(run_res);
handle_run_result(run_res);
},
[this](broker::key_type key,
broker::lock_type locktype,
Expand Down Expand Up @@ -382,10 +382,12 @@ namespace cbdc::parsec::agent {
}
}

void impl::handle_run(const runner::interface::run_return_type& res) {
void
impl::handle_run_result(const runner::interface::run_return_type& res) {
std::unique_lock l(m_mut);
if(m_state != state::function_started) {
m_log->warn("handle_run while not in function_started state");
m_log->warn(
"handle_run_result while not in function_started state");
return;
}
std::visit(
Expand Down Expand Up @@ -418,7 +420,7 @@ namespace cbdc::parsec::agent {
}},
res);
m_log->trace(this,
"Agent handle_run complete for",
"Agent handle_run_result complete for",
m_ticket_number.value());
}

Expand Down Expand Up @@ -480,8 +482,9 @@ namespace cbdc::parsec::agent {
case state::init:
m_log->fatal("Result reported in initial state");
// System terminated by fatal()
case state::begin_sent:
m_log->fatal("Result reported in begin_sent state");
case state::ticket_number_request_sent:
m_log->fatal(
"Result reported in ticket_number_request_sent state");
// System terminated by fatal()
case state::function_get_sent:
m_log->fatal("Result reported in function_get_sent state");
Expand All @@ -508,7 +511,7 @@ namespace cbdc::parsec::agent {
break;

// Failure due to transient problems, should retry
case state::begin_failed:
case state::ticket_number_request_failed:
// Couldn't get a ticket number, no need to rollback
break;

Expand Down
15 changes: 8 additions & 7 deletions src/parsec/agent/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace cbdc::parsec::agent {
enum class state {
/// Initial state.
init,
/// Begin request sent to broker.
begin_sent,
/// Begin request failed.
begin_failed,
/// Request for new ticket number sent to broker.
ticket_number_request_sent,
/// Request for new ticket number failed.
ticket_number_request_failed,
/// Function bytecode lock sent to broker.
function_get_sent,
/// Function bytecode lock request failed.
Expand Down Expand Up @@ -124,16 +124,17 @@ namespace cbdc::parsec::agent {
broker::held_locks_set_type m_requested_locks{};
bool m_restarted{false};

void handle_begin(broker::interface::ticketnum_or_errcode_type res);
void handle_new_ticket_number(
broker::interface::ticketnum_or_errcode_type res);

void
handle_function(const broker::interface::try_lock_return_type& res);

void handle_run(const runner::interface::run_return_type& res);
void handle_run_result(const runner::interface::run_return_type& res);

void handle_commit(broker::interface::commit_return_type res);

void do_start();
void do_start_function();

void do_result();

Expand Down
Loading