Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(state-machine): Panic in the case of non-handled non-deferred e… #1464

Open
wants to merge 5 commits into
base: feature-c++-client
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api/auth/auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ error::Error Authenticator::RequestNewToken(optional<AuthenticatedAction> opt_ac
}
});
if (err != error::NoError) {
// A sync DBus error.
// Async DBus error.
mlog::Error("Failed to request new token fetching: " + err.String());
token_fetch_in_progress_ = false;
ExpectedAuthData ex_auth_data = expected::unexpected(err);
Expand Down
57 changes: 35 additions & 22 deletions src/common/dbus/platform/asio_libdbus/dbus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ namespace log = mender::common::log;

using namespace std;

void dbus_error_deleter(DBusError *e) {
if (e) {
dbus_error_free(e);
delete e;
}
}

// The code below integrates ASIO and libdbus. Or, more precisely, it uses
// asio::io_context as the main/event loop for libdbus.
//
Expand All @@ -67,31 +74,36 @@ void HandleReply(DBusPendingCall *pending, void *data);
DBusHandlerResult MsgFilter(DBusConnection *connection, DBusMessage *message, void *data);

error::Error DBusPeer::InitializeConnection() {
DBusError dbus_error;
dbus_error_init(&dbus_error);
dbus_conn_.reset(dbus_bus_get_private(DBUS_BUS_SYSTEM, &dbus_error));
std::unique_ptr<DBusError, void (*)(DBusError *)> dbus_error {
new DBusError, dbus_error_deleter};
dbus_error_init(dbus_error.get());
dbus_conn_.reset(dbus_bus_get_private(DBUS_BUS_SYSTEM, dbus_error.get()));
if (!dbus_conn_) {
auto err = MakeError(
ConnectionError,
string("Failed to get connection to system bus: ") + dbus_error.message + "["
+ dbus_error.name + "]");
dbus_error_free(&dbus_error);
string("Failed to get connection to system bus: ") + dbus_error.get()->message + "["
+ dbus_error.get()->name + "]");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() should only be used when it's required to, here it can be omitted.

return err;
}

dbus_connection_set_exit_on_disconnect(dbus_conn_.get(), FALSE);
if (!dbus_connection_set_watch_functions(
dbus_conn_.get(), AddDBusWatch, RemoveDBusWatch, ToggleDBusWatch, this, NULL)) {
dbus_conn_.get(), AddDBusWatch, RemoveDBusWatch, ToggleDBusWatch, this, nullptr)) {
dbus_conn_.reset();
return MakeError(ConnectionError, "Failed to set watch functions");
}
if (!dbus_connection_set_timeout_functions(
dbus_conn_.get(), AddDBusTimeout, RemoveDBusTimeout, ToggleDBusTimeout, this, NULL)) {
dbus_conn_.get(),
AddDBusTimeout,
RemoveDBusTimeout,
ToggleDBusTimeout,
this,
nullptr)) {
dbus_conn_.reset();
return MakeError(ConnectionError, "Failed to set timeout functions");
}

dbus_connection_set_dispatch_status_function(dbus_conn_.get(), HandleDispatch, this, NULL);
dbus_connection_set_dispatch_status_function(dbus_conn_.get(), HandleDispatch, this, nullptr);

return error::NoError;
}
Expand All @@ -102,7 +114,7 @@ error::Error DBusClient::InitializeConnection() {
return err;
}

if (!dbus_connection_add_filter(dbus_conn_.get(), MsgFilter, this, NULL)) {
if (!dbus_connection_add_filter(dbus_conn_.get(), MsgFilter, this, nullptr)) {
dbus_conn_.reset();
return MakeError(ConnectionError, "Failed to set message filter");
}
Expand Down Expand Up @@ -213,13 +225,14 @@ error::Error DBusClient::RegisterSignalHandler(
// function below takes care of actually invoking the right handler.
const string match_rule = GetSignalMatchRule(iface, signal);

DBusError dbus_error;
dbus_error_init(&dbus_error);
dbus_bus_add_match(dbus_conn_.get(), match_rule.c_str(), &dbus_error);
if (dbus_error_is_set(&dbus_error)) {
std::unique_ptr<DBusError, void (*)(DBusError *)> dbus_error {
new DBusError, dbus_error_deleter};
dbus_error_init(dbus_error.get());
dbus_bus_add_match(dbus_conn_.get(), match_rule.c_str(), dbus_error.get());
if (dbus_error_is_set(dbus_error.get())) {
auto err = MakeError(
ConnectionError, string("Failed to register signal reception: ") + dbus_error.message);
dbus_error_free(&dbus_error);
ConnectionError,
string("Failed to register signal reception: ") + dbus_error.get()->message);
return err;
}
AddSignalHandler<SignalValueType>(match_rule, handler);
Expand Down Expand Up @@ -326,14 +339,14 @@ dbus_bool_t AddDBusWatch(DBusWatch *w, void *data) {

// Assign the stream_descriptor so that we have access to it in
// RemoveDBusWatch() and we can delete it.
dbus_watch_set_data(w, sd.release(), NULL);
dbus_watch_set_data(w, sd.release(), nullptr);
return TRUE;
}

static void RemoveDBusWatch(DBusWatch *w, void *data) {
asio::posix::stream_descriptor *sd =
static_cast<asio::posix::stream_descriptor *>(dbus_watch_get_data(w));
dbus_watch_set_data(w, NULL, NULL);
dbus_watch_set_data(w, nullptr, nullptr);
if (sd != nullptr) {
sd->cancel();
delete sd;
Expand Down Expand Up @@ -367,14 +380,14 @@ dbus_bool_t AddDBusTimeout(DBusTimeout *t, void *data) {
}
});

dbus_timeout_set_data(t, timer, NULL);
dbus_timeout_set_data(t, timer, nullptr);

return TRUE;
}

static void RemoveDBusTimeout(DBusTimeout *t, void *data) {
asio::steady_timer *timer = static_cast<asio::steady_timer *>(dbus_timeout_get_data(t));
dbus_timeout_set_data(t, NULL, NULL);
dbus_timeout_set_data(t, nullptr, nullptr);
if (timer != nullptr) {
timer->cancel();
delete timer;
Expand Down Expand Up @@ -797,7 +810,7 @@ DBusHandlerResult HandleMethodCall(DBusConnection *connection, DBusMessage *mess
}
}

if (!dbus_connection_send(connection, reply_msg.get(), NULL)) {
if (!dbus_connection_send(connection, reply_msg.get(), nullptr)) {
// can only happen in case of no memory
log::Error("Failed to send reply DBus message when handling method " + spec);
return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
Expand Down Expand Up @@ -854,7 +867,7 @@ error::Error DBusServer::EmitSignal(
return MakeError(MessageError, "Failed to add data to the signal message");
}

if (!dbus_connection_send(dbus_conn_.get(), signal_msg.get(), NULL)) {
if (!dbus_connection_send(dbus_conn_.get(), signal_msg.get(), nullptr)) {
// can only happen in case of no memory
return MakeError(ConnectionError, "Failed to send signal message");
}
Expand Down
50 changes: 37 additions & 13 deletions src/common/state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,24 @@ class StateMachineRunner : virtual public EventPoster<EventType> {
}

private:
void RunOne() {
vector<State<ContextType, EventType> *> to_run;
vector<State<ContextType, EventType> *> FillRunQueueFrom(queue<EventType> &event_queue) {
vector<State<ContextType, EventType> *> run_queue;

for (auto machine : machines_) {
if (!machine->state_entered_) {
to_run.push_back(machine->current_state_);
run_queue.push_back(machine->current_state_);
machine->state_entered_ = true;
}
}

const size_t size = event_queue_.size();
const size_t size = event_queue.size();

for (size_t count = 0; to_run.empty() && count < size; count++) {
for (size_t count = 0; run_queue.empty() && count < size; count++) {
bool deferred = false;
auto event = event_queue_.front();
event_queue_.pop();
auto event = event_queue.front();
event_queue.pop();

for (auto machine : machines_) {
for (const auto machine : machines_) {
typename StateMachine<ContextType, EventType>::TransitionCondition cond {
machine->current_state_, event};
if (machine->deferred_events_.find(event) != machine->deferred_events_.end()) {
Expand All @@ -187,16 +187,16 @@ class StateMachineRunner : virtual public EventPoster<EventType> {
}

auto &target = match->second;
to_run.push_back(target);
run_queue.push_back(target);
machine->current_state_ = target;
}

if (to_run.empty()) {
if (run_queue.empty()) {
if (deferred) {
// Put back in the queue to try later. This won't be tried
// again during this run, due to only making `size`
// attempts in the for loop.
event_queue_.push(event);
event_queue.push(event);
} else {
string states = common::BestAvailableTypeName(*machines_[0]->current_state_);
for (size_t i = 1; i < machines_.size(); i++) {
Expand All @@ -212,8 +212,32 @@ class StateMachineRunner : virtual public EventPoster<EventType> {
}
}

if (!to_run.empty()) {
for (auto &state : to_run) {
return run_queue;
}


void FailIfNonDeferredEventsLeftInEventQueue(queue<EventType> queue_copy) {
// Check if there are any non-deferred events in the queue - then fail if
while (not queue_copy.empty()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's expected to have non-deferred events here, because this is after the handlers have run. I think the check needs to be between the if (!to_run.empty()) { block and the one above it.

There is also a much easier way to check this. When we are doing event_queue_.push(event) above, also increment a deferred_count counter. If deferred_count < event_queue_.size(), then the check is positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Very good idea!

I had a suspicion you would come up with something smarter than me here 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, new try. I am still using my original approach, but moved it to before we run the machine loop.

Ur approach, although clever, did not work when initializing (since we add the start events to the to_run queue, and I didn't feel like special casing it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be easily fixed too. Just keep a counter for the run_queue as well, but don't increment it during the initialization step, only when adding events in the "normal" flow. Then the check becomes deferred_count < added_to_run_queue. I think it's better than copying the whole queue on every state transition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I did dabble with the idea. But decided it's not really better. This implementation reads pretty straight forward, and does not have to touch the other pieces of code, and handle special cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying the queue should just be a couple of elements

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then, I'll let it go! 🙂

EventType event = queue_copy.front();
queue_copy.pop();
for (const auto machine : machines_) {
if (machine->deferred_events_.find(event) == machine->deferred_events_.end()) {
log::Fatal(
"The state machine has an unprocessed non-deferred event in the queue. This is a programming error!");
}
}
}
}


void RunOne() {
vector<State<ContextType, EventType> *> run_queue = FillRunQueueFrom(event_queue_);

FailIfNonDeferredEventsLeftInEventQueue(event_queue_);

if (!run_queue.empty()) {
for (auto &state : run_queue) {
log::Trace("Entering state " + common::BestAvailableTypeName(*state));
state->OnEnter(ctx_, *this);
}
Expand Down