Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Delay the resumption of the message queue after reconnect #283

Merged
merged 10 commits into from
Dec 8, 2023
36 changes: 31 additions & 5 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ template <typename M> class MessageQueue {
Everest::SteadyTimer in_flight_timeout_timer;
Everest::SteadyTimer notify_queue_timer;

// This timer schedules the resumption of the message queue
Everest::SteadyTimer resume_timer;
// Counts the number of pause()/resume() calls.
// Used by the resume timer callback to abort itself in case the timer triggered before it could be cancelled.
u_int64_t pause_resume_ctr = 0;


// key is the message id of the stop transaction and the value is the transaction id
// this map is used for StopTransaction.req that have been put on the message queue without having received a
// transactionId from the backend (e.g. when offline) it is used to replace the transactionId in the
Expand Down Expand Up @@ -154,6 +161,16 @@ template <typename M> class MessageQueue {
EVLOG_debug << "Notified message queue worker";
}

// Unlike the public resume(), this doesn't schedule anything - it just does the actual resumption
// Does not lock this->message_mutex, make sure to do that before calling
valentin-dimov marked this conversation as resolved.
Show resolved Hide resolved
void resume_now(u_int64_t expected_pause_resume_ctr) {
if (this->pause_resume_ctr == expected_pause_resume_ctr) {
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
}
}

public:
/// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback
MessageQueue(const std::function<bool(json message)>& send_callback, const int transaction_message_attempts,
Expand Down Expand Up @@ -579,18 +596,27 @@ template <typename M> class MessageQueue {
void pause() {
EVLOG_debug << "pause()";
std::lock_guard<std::mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
this->resume_timer.stop();
this->paused = true;
this->cv.notify_one();
EVLOG_debug << "pause() notified message queue";
}

/// \brief Resumes the message queue
void resume() {
EVLOG_debug << "resume()";
void resume(unsigned int delay_seconds = 0) {
valentin-dimov marked this conversation as resolved.
Show resolved Hide resolved
EVLOG_debug << "resume() called, delay: " << delay_seconds << " seconds";
std::lock_guard<std::mutex> lk(this->message_mutex);
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
if (delay_seconds > 0) {
this->pause_resume_ctr++;
u_int64_t expected_pause_resume_ctr = this->pause_resume_ctr;
this->resume_timer.timeout([this, expected_pause_resume_ctr] {
std::lock_guard<std::mutex> lk(this->message_mutex);
this->resume_now(expected_pause_resume_ctr);
}, std::chrono::seconds(delay_seconds));
} else {
this->resume_now(this->pause_resume_ctr);
}
}

bool is_transaction_message_queue_empty() {
Expand Down
2 changes: 1 addition & 1 deletion lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ void ChargePoint::init_websocket() {

this->websocket = std::make_unique<Websocket>(connection_options, this->evse_security, this->logging);
this->websocket->register_connected_callback([this](const int security_profile) {
this->message_queue->resume();
this->message_queue->resume(1); // TODO: derive from configuration

const auto& security_profile_cv = ControllerComponentVariables::SecurityProfile;
if (security_profile_cv.variable.has_value()) {
Expand Down