Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Delay the resumption of the message queue after reconnect #283

Merged
merged 10 commits into from
Dec 8, 2023
Merged
62 changes: 44 additions & 18 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ template <typename M> class MessageQueue {
/// message queue for non-transaction related messages
std::queue<std::shared_ptr<ControlMessage<M>>> normal_message_queue;
std::shared_ptr<ControlMessage<M>> in_flight;
std::mutex message_mutex;
std::condition_variable cv;
std::recursive_mutex message_mutex;
std::condition_variable_any cv;
std::function<bool(json message)> send_callback;
std::vector<M> external_notify;
bool paused;
Expand All @@ -107,6 +107,12 @@ template <typename M> class MessageQueue {
Everest::SteadyTimer in_flight_timeout_timer;
Everest::SteadyTimer notify_queue_timer;

// This timer schedules the resumption of the message queue
Everest::SteadyTimer resume_timer;
// Counts the number of pause()/resume() calls.
// Used by the resume timer callback to abort itself in case the timer triggered before it could be cancelled.
u_int64_t pause_resume_ctr = 0;

// key is the message id of the stop transaction and the value is the transaction id
// this map is used for StopTransaction.req that have been put on the message queue without having received a
// transactionId from the backend (e.g. when offline) it is used to replace the transactionId in the
Expand Down Expand Up @@ -147,7 +153,7 @@ template <typename M> class MessageQueue {
void add_to_normal_message_queue(std::shared_ptr<ControlMessage<M>> message) {
EVLOG_debug << "Adding message to normal message queue";
{
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->normal_message_queue.push(message);
this->new_message = true;
this->check_queue_sizes();
Expand All @@ -158,7 +164,7 @@ template <typename M> class MessageQueue {
void add_to_transaction_message_queue(std::shared_ptr<ControlMessage<M>> message) {
EVLOG_debug << "Adding message to transaction message queue";
{
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->transaction_message_queue.push_back(message);
ocpp::common::DBTransactionMessage db_message{message->message, messagetype_to_string(message->messageType),
message->message_attempts, message->timestamp,
Expand Down Expand Up @@ -241,6 +247,16 @@ template <typename M> class MessageQueue {
}
}

// The public resume() delegates the actual resumption to this method
void resume_now(u_int64_t expected_pause_resume_ctr) {
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (this->pause_resume_ctr == expected_pause_resume_ctr) {
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
}
}

public:
/// \brief Creates a new MessageQueue object with the provided \p configuration and \p send_callback
MessageQueue(const std::function<bool(json message)>& send_callback, const MessageQueueConfig& config,
Expand All @@ -260,8 +276,9 @@ template <typename M> class MessageQueue {
while (this->running) {
EVLOG_debug << "Waiting for a message from the message queue";

std::unique_lock<std::mutex> lk(this->message_mutex);
std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
using namespace std::chrono_literals;
// It's safe to wait on the cv here because we're guaranteed to only lock this->message_mutex once
this->cv.wait(lk, [this]() {
return !this->running || (!this->paused && this->new_message && this->in_flight == nullptr);
});
Expand Down Expand Up @@ -532,7 +549,7 @@ template <typename M> class MessageQueue {
// we need to remove Call messages from in_flight if we receive a CallResult OR a CallError

// TODO(kai): we need to do some error handling in the CallError case
std::unique_lock<std::mutex> lk(this->message_mutex);
std::unique_lock<std::recursive_mutex> lk(this->message_mutex);
if (this->in_flight == nullptr) {
EVLOG_error
<< "Received a CALLRESULT OR CALLERROR without a message in flight, this should not happen";
Expand Down Expand Up @@ -594,7 +611,7 @@ template <typename M> class MessageQueue {

/// \brief Handles a message timeout or a CALLERROR. \p enhanced_message_opt is set only in case of CALLERROR
void handle_timeout_or_callerror(const std::optional<EnhancedMessage<M>>& enhanced_message_opt) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
EVLOG_warning << "Message timeout or CALLERROR for: " << this->in_flight->messageType << " ("
<< this->in_flight->uniqueId() << ")";
if (this->in_flight->isTransactionMessage()) {
Expand Down Expand Up @@ -664,28 +681,37 @@ template <typename M> class MessageQueue {
/// \brief Pauses the message queue
void pause() {
EVLOG_debug << "pause()";
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
this->resume_timer.stop();
this->paused = true;
this->cv.notify_one();
EVLOG_debug << "pause() notified message queue";
}

/// \brief Resumes the message queue
void resume() {
EVLOG_debug << "resume()";
std::lock_guard<std::mutex> lk(this->message_mutex);
this->paused = false;
this->cv.notify_one();
EVLOG_debug << "resume() notified message queue";
void resume(std::chrono::seconds delay_on_reconnect) {
EVLOG_debug << "resume() called";
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
this->pause_resume_ctr++;
// Do not delay if this is the first call to resume(), i.e. this is the initial connection
if (this->pause_resume_ctr > 1 && delay_on_reconnect > std::chrono::seconds(0)) {
EVLOG_debug << "Delaying message queue resume by " << delay_on_reconnect.count() << " seconds";
u_int64_t expected_pause_resume_ctr = this->pause_resume_ctr;
this->resume_timer.timeout(
[this, expected_pause_resume_ctr] { this->resume_now(expected_pause_resume_ctr); }, delay_on_reconnect);
} else {
this->resume_now(this->pause_resume_ctr);
}
}

bool is_transaction_message_queue_empty() {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
return this->transaction_message_queue.empty();
}

bool contains_transaction_messages(const CiString<36> transaction_id) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v201::MessageType::TransactionEvent) {
v201::TransactionEventRequest req = control_message->message.at(CALL_PAYLOAD);
Expand All @@ -698,7 +724,7 @@ template <typename M> class MessageQueue {
}

bool contains_stop_transaction_message(const int32_t transaction_id) {
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
for (const auto control_message : this->transaction_message_queue) {
if (control_message->messageType == v16::MessageType::StopTransaction) {
v16::StopTransactionRequest req = control_message->message.at(CALL_PAYLOAD);
Expand Down Expand Up @@ -753,7 +779,7 @@ template <typename M> class MessageQueue {

// replace transaction id in meter values if start_transaction_message_id is present in map
// this is necessary when the chargepoint queued MeterValue.req for a transaction with unknown transaction_id
std::lock_guard<std::mutex> lk(this->message_mutex);
std::lock_guard<std::recursive_mutex> lk(this->message_mutex);
if (this->start_transaction_mid_meter_values_mid_map.count(start_transaction_message_id)) {
for (auto it = this->transaction_message_queue.begin(); it != transaction_message_queue.end(); ++it) {
for (const auto& meter_value_message_id :
Expand Down
4 changes: 4 additions & 0 deletions include/ocpp/v16/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ class ChargePoint {
/// \param callback
void register_is_token_reserved_for_connector_callback(
const std::function<bool(const int32_t connector, const std::string& id_token)>& callback);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay);
};

} // namespace v16
Expand Down
9 changes: 9 additions & 0 deletions include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ class ChargePointImpl : ocpp::ChargingStationBase {
FirmwareStatusEnumType signed_firmware_status;
int signed_firmware_status_request_id;

/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

// callbacks
std::function<bool(int32_t connector)> enable_evse_callback;
std::function<bool(int32_t connector)> disable_evse_callback;
Expand Down Expand Up @@ -729,6 +732,12 @@ class ChargePointImpl : ocpp::ChargingStationBase {
/// \param value
/// \return Indicates the result of the operation
ConfigurationStatus set_custom_configuration_key(CiString<50> key, CiString<500> value);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay) {
this->message_queue_resume_delay = delay;
}
};

} // namespace v16
Expand Down
8 changes: 8 additions & 0 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class ChargePoint : ocpp::ChargingStationBase {

/// \brief Handler for automatic or explicit OCSP cache updates
OcspUpdater ocsp_updater;
/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

bool send(CallError call_error);

Expand Down Expand Up @@ -643,6 +645,12 @@ class ChargePoint : ocpp::ChargingStationBase {
/// \return DataTransferResponse contaning the result from CSMS
DataTransferResponse data_transfer_req(const CiString<255>& vendorId, const std::optional<CiString<50>>& messageId,
const std::optional<std::string>& data);

/// \brief Delay draining the message queue after reconnecting, so the CSMS can perform post-reconnect checks first
/// \param delay The delay period (seconds)
void set_message_queue_resume_delay(std::chrono::seconds delay) {
this->message_queue_resume_delay = delay;
}
};

} // namespace v201
Expand Down
4 changes: 4 additions & 0 deletions lib/ocpp/v16/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,9 @@ void ChargePoint::register_is_token_reserved_for_connector_callback(
this->charge_point->register_is_token_reserved_for_connector_callback(callback);
}

void ChargePoint::set_message_queue_resume_delay(std::chrono::seconds delay) {
this->charge_point->set_message_queue_resume_delay(delay);
}

} // namespace v16
} // namespace ocpp
6 changes: 3 additions & 3 deletions lib/ocpp/v16/charge_point_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ void ChargePointImpl::init_websocket() {
if (this->connection_state_changed_callback != nullptr) {
this->connection_state_changed_callback(true);
}
this->message_queue->resume(); //
this->connected_callback(); //
this->message_queue->resume(this->message_queue_resume_delay);
this->connected_callback();
});
this->websocket->register_disconnected_callback([this]() {
if (this->connection_state_changed_callback != nullptr) {
this->connection_state_changed_callback(false);
}
this->message_queue->pause(); //
this->message_queue->pause();
if (this->ocsp_request_timer != nullptr) {
this->ocsp_request_timer->stop();
}
Expand Down
2 changes: 1 addition & 1 deletion lib/ocpp/v201/charge_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ void ChargePoint::init_websocket() {

this->websocket = std::make_unique<Websocket>(connection_options, this->evse_security, this->logging);
this->websocket->register_connected_callback([this](const int security_profile) {
this->message_queue->resume();
this->message_queue->resume(this->message_queue_resume_delay);

const auto& security_profile_cv = ControllerComponentVariables::SecurityProfile;
if (security_profile_cv.variable.has_value()) {
Expand Down