Skip to content

Commit

Permalink
Feat: Optionally delay the resumption of the message queue after reco…
Browse files Browse the repository at this point in the history
…nnect (EVerest#283)

Signed-off-by: Valentin Dimov <[email protected]>
  • Loading branch information
valentin-dimov authored and couryrr-afs committed Dec 18, 2023
1 parent 04cd825 commit 7cdf284
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 22 deletions.
62 changes: 44 additions & 18 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ template <typename M> class MessageQueue {
/// message queue for non-transaction related messages
std::queue<std::shared_ptr<ControlMessage<M>>> normal_message_queue;
std::shared_ptr<ControlMessage<M>> in_flight;
std::mutex message_mutex;
std::condition_variable cv;
std::recursive_mutex message_mutex;
std::condition_variable_any cv;
std::function<bool(json message)> send_callback;
std::vector<M> external_notify;
bool paused;
Expand All @@ -107,6 +107,12 @@ template <typename M> class MessageQueue {
Everest::SteadyTimer in_flight_timeout_timer;
Everest::SteadyTimer notify_queue_timer;

// This timer schedules the resumption of the message queue
Everest::SteadyTimer resume_timer;
// Counts the number of pause()/resume() calls.
// Used by the resume timer callback to abort itself in case the timer triggered before it could be cancelled.
u_int64_t pause_resume_ctr = 0;

// key is the message id of the stop transaction and the value is the transaction id
// this map is used for StopTransaction.req that have been put on the message queue without having received a
// transactionId from the backend (e.g. when offline) it is used to replace the transactionId in the
Expand Down Expand Up @@ -147,7 +153,7 @@ template <typename M> class MessageQueue {
void add_to_normal_message_queue(std::shared_ptr<ControlMessage<M>> message) {
EVLOG_debug << "Adding message to normal message queue";
{
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->normal_message_queue.push(message);
this->new_message = true;
this->check_queue_sizes();
Expand All @@ -158,7 +164,7 @@ template <typename M> class MessageQueue {
void add_to_transaction_message_queue(std::shared_ptr<ControlMessage<M>> message) {
EVLOG_debug << "Adding message to transaction message queue";
{
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->transaction_message_queue.push_back(message);
ocpp::common::DBTransactionMessage db_message{message->message, messagetype_to_string(message->messageType),
message->message_attempts, message->timestamp,
Expand Down Expand Up @@ -241,6 +247,16 @@ template <typename M> class MessageQueue {
}
}

// The public resume() delegates the actual resumption to this method
void resume_now(u_int64_t expected_pause_resume_ctr) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (this->pause_resume_ctr == expected_pause_resume_ctr) {
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
}
}

public:
/// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback
MessageQueue(const std::function<bool(json message)>& send_callback, const MessageQueueConfig& config,
Expand All @@ -260,8 +276,9 @@ template <typename M> class MessageQueue {
while (this->running) {
EVLOG_debug << "Waiting for a message from the message queue";

std::unique_lock<std::mutex> lk(this->message_mutex);
std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
using namespace std::chrono_literals;
// It's safe to wait on the cv here because we're guaranteed to only lock this->message_mutex once
this->cv.wait(lk, [this]() {
return !this->running || (!this->paused && this->new_message && this->in_flight == nullptr);
});
Expand Down Expand Up @@ -532,7 +549,7 @@ template <typename M> class MessageQueue {
// we need to remove Call messages from in_flight if we receive a CallResult OR a CallError

// TODO(kai): we need to do some error handling in the CallError case
std::unique_lock<std::mutex> lk(this->message_mutex);
std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
if (this->in_flight == nullptr) {
EVLOG_error
<< "Received a CALLRESULT OR CALLERROR without a message in flight, this should not happen";
Expand Down Expand Up @@ -594,7 +611,7 @@ template <typename M> class MessageQueue {

/// \brief Handles a message timeout or a CALLERROR. \p enhanced_message_opt is set only in case of CALLERROR
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
EVLOG_warning << "Message timeout or CALLERROR for: " << this->in_flight->messageType << " ("
<< this->in_flight->uniqueId() << ")";
if (this->in_flight->isTransactionMessage()) {
Expand Down Expand Up @@ -664,28 +681,37 @@ template <typename M> class MessageQueue {
/// \brief Pauses the message queue
void pause() {
EVLOG_debug << "pause()";
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
this->resume_timer.stop();
this->paused = true;
this->cv.notify_one();
EVLOG_debug << "pause() notified message queue";
}

/// \brief Resumes the message queue
void resume() {
EVLOG_debug << "resume()";
std::lock_guard<std::mutex> lk(this->message_mutex);
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
void resume(std::chrono::seconds delay_on_reconnect) {
EVLOG_debug << "resume() called";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
// Do not delay if this is the first call to resume(), i.e. this is the initial connection
if (this->pause_resume_ctr > 1 && delay_on_reconnect > std::chrono::seconds(0)) {
EVLOG_debug << "Delaying message queue resume by " << delay_on_reconnect.count() << " seconds";
u_int64_t expected_pause_resume_ctr = this->pause_resume_ctr;
this->resume_timer.timeout(
[this, expected_pause_resume_ctr] { this->resume_now(expected_pause_resume_ctr); }, delay_on_reconnect);
} else {
this->resume_now(this->pause_resume_ctr);
}
}

bool is_transaction_message_queue_empty() {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
return this->transaction_message_queue.empty();
}

bool contains_transaction_messages(const CiString<36> transaction_id) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v201::MessageType::TransactionEvent) {
v201::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
Expand All @@ -698,7 +724,7 @@ template <typename M> class MessageQueue {
}

bool contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
Expand Down Expand Up @@ -753,7 +779,7 @@ template <typename M> class MessageQueue {

// replace transaction id in meter values if start_transaction_message_id is present in map
// this is necessary when the chargepoint queued MeterValue.req for a transaction with unknown transaction_id
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
for (auto it = this->transaction_message_queue.begin(); it != transaction_message_queue.end(); ++it) {
for (const auto& meter_value_message_id :
Expand Down
4 changes: 4 additions & 0 deletions include/ocpp/v16/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ class ChargePoint {
/// \param callback
void register_is_token_reserved_for_connector_callback(
const std::function<bool(const int32_t connector, const std::string& id_token)>& callback);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay);
};

} // namespace v16
Expand Down
9 changes: 9 additions & 0 deletions include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ class ChargePointImpl : ocpp::ChargingStationBase {
FirmwareStatusEnumType signed_firmware_status;
int signed_firmware_status_request_id;

/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

// callbacks
std::function<bool(int32_t connector)> enable_evse_callback;
std::function<bool(int32_t connector)> disable_evse_callback;
Expand Down Expand Up @@ -729,6 +732,12 @@ class ChargePointImpl : ocpp::ChargingStationBase {
/// \param value
/// \return Indicates the result of the operation
ConfigurationStatus set_custom_configuration_key(CiString<50> key, CiString<500> value);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay) {
this->message_queue_resume_delay = delay;
}
};

} // namespace v16
Expand Down
8 changes: 8 additions & 0 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class ChargePoint : ocpp::ChargingStationBase {

/// \brief Handler for automatic or explicit OCSP cache updates
OcspUpdater ocsp_updater;
/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

bool send(CallError call_error);

Expand Down Expand Up @@ -645,6 +647,12 @@ class ChargePoint : ocpp::ChargingStationBase {
/// \return DataTransferResponse contaning the result from CSMS
DataTransferResponse data_transfer_req(const CiString<255>& vendorId, const std::optional<CiString<50>>& messageId,
const std::optional<std::string>& data);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay) {
this->message_queue_resume_delay = delay;
}
};

} // namespace v201
Expand Down
4 changes: 4 additions & 0 deletions lib/ocpp/v16/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,9 @@ void ChargePoint::register_is_token_reserved_for_connector_callback(
this->charge_point->register_is_token_reserved_for_connector_callback(callback);
}

void ChargePoint::set_message_queue_resume_delay(std::chrono::seconds delay) {
this->charge_point->set_message_queue_resume_delay(delay);
}

} // namespace v16
} // namespace ocpp
6 changes: 3 additions & 3 deletions lib/ocpp/v16/charge_point_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ void ChargePointImpl::init_websocket() {
if (this->connection_state_changed_callback != nullptr) {
this->connection_state_changed_callback(true);
}
this->message_queue->resume(); //
this->connected_callback(); //
this->message_queue->resume(this->message_queue_resume_delay);
this->connected_callback();
});
this->websocket->register_disconnected_callback([this]() {
if (this->connection_state_changed_callback != nullptr) {
this->connection_state_changed_callback(false);
}
this->message_queue->pause(); //
this->message_queue->pause();
if (this->ocsp_request_timer != nullptr) {
this->ocsp_request_timer->stop();
}
Expand Down
2 changes: 1 addition & 1 deletion lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ void ChargePoint::init_websocket() {

this->websocket = std::make_unique<Websocket>(connection_options, this->evse_security, this->logging);
this->websocket->register_connected_callback([this](const int security_profile) {
this->message_queue->resume();
this->message_queue->resume(this->message_queue_resume_delay);

const auto& security_profile_cv = ControllerComponentVariables::SecurityProfile;
if (security_profile_cv.variable.has_value()) {
Expand Down

0 comments on commit 7cdf284

Please sign in to comment.