diff --git a/doc/message_dispatching.png b/doc/message_dispatching.png new file mode 100644 index 000000000..c42187867 Binary files /dev/null and b/doc/message_dispatching.png differ diff --git a/doc/message_dispatching.puml b/doc/message_dispatching.puml new file mode 100644 index 000000000..8e6406654 --- /dev/null +++ b/doc/message_dispatching.puml @@ -0,0 +1,51 @@ + +@startuml + +interface MessageDispatcherInterface { + +dispatch_call(const json& call, bool triggered = false) + +dispatch_call_async(const json& call, bool triggered = false): std::future> + +dispatch_call_result(const json& call_result) + +dispatch_call_error(const json& call_error) +} + +class v16::MessageDispatcher { + - MessageQueue& message_queue + - ChargePointConfiguration& configuration + - RegistrationStatus& registration_status +} + +class v201::MessageDispatcher { + - MessageQueue& message_queue + - DeviceModel& device_model + - ConnectivityManager& connectivity_manager + - RegistrationStatusEnum& registration_status +} + +interface v201::DataTransferInterface { + +data_transfer_req(request: DataTransferRequest): std::optional + +handle_data_transfer_req(call: Call) +} + +class v201::DataTransfer { + -MessageDispatcherInterface &message_dispatcher + -std::optional> data_transfer_callback +} + +class v201::ChargePoint { + std::unique_ptr message_dispatcher; + std::unique_ptr data_transfer; +} + +class v16::ChargePoint { + std::unique_ptr message_dispatcher; +} + +MessageDispatcherInterface <|-- v16::MessageDispatcher +MessageDispatcherInterface <|-- v201::MessageDispatcher +v201::DataTransferInterface <|-- v201::DataTransfer +MessageDispatcherInterface *-- v201::DataTransfer +MessageDispatcherInterface *-- v201::ChargePoint +v201::DataTransferInterface *-- v201::ChargePoint +MessageDispatcherInterface *-- v16::ChargePoint + +@enduml \ No newline at end of file diff --git a/include/ocpp/common/constants.hpp b/include/ocpp/common/constants.hpp index 93a0eca9e..59fb9db21 100644 --- a/include/ocpp/common/constants.hpp +++ b/include/ocpp/common/constants.hpp @@ -3,6 +3,7 @@ #pragma once +#include #include namespace ocpp { @@ -22,4 +23,6 @@ constexpr float NO_LIMIT_SPECIFIED = -1.0; constexpr std::int32_t NO_START_PERIOD = -1; constexpr std::int32_t EVSEID_NOT_SET = -1; +constexpr std::chrono::seconds DEFAULT_WAIT_FOR_FUTURE_TIMEOUT = std::chrono::seconds(60); + } // namespace ocpp \ No newline at end of file diff --git a/include/ocpp/common/message_dispatcher.hpp b/include/ocpp/common/message_dispatcher.hpp new file mode 100644 index 000000000..ab479d19c --- /dev/null +++ b/include/ocpp/common/message_dispatcher.hpp @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Pionix GmbH and Contributors to EVerest + +#pragma once + +#include + +namespace ocpp { + +/// \brief Interface for dispatching OCPP messages that shall be send over the websocket. This interface defines +/// dispatching of Call, CallResult and CallError messages. +/// \tparam T Type specifies the OCPP protocol version +template class MessageDispatcherInterface { + +public: + virtual ~MessageDispatcherInterface(){}; + + /// \brief Dispatches a Call message. + /// \param call the OCPP Call message. + /// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false. + virtual void dispatch_call(const json& call, bool triggered = false) = 0; + + /// \brief Dispatches a Call message asynchronously. + /// \param call the OCPP Call message. + /// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false. + /// \return std::future> Future object containing the enhanced message + /// result of type T. + virtual std::future> dispatch_call_async(const json& call, bool triggered = false) = 0; + + /// \brief Dispatches a CallResult message. + /// \param call_result the OCPP CallResult message. + virtual void dispatch_call_result(const json& call_result) = 0; + + /// \brief Dispatches a CallError message. + /// \param call_result the OCPP CallError message. + virtual void dispatch_call_error(const json& call_error) = 0; +}; + +} // namespace ocpp \ No newline at end of file diff --git a/include/ocpp/common/message_queue.hpp b/include/ocpp/common/message_queue.hpp index 1d56ee47c..63dc0e59c 100644 --- a/include/ocpp/common/message_queue.hpp +++ b/include/ocpp/common/message_queue.hpp @@ -630,16 +630,7 @@ template class MessageQueue { } } - /// \brief pushes a new \p call message onto the message queue - template void push(Call call, const bool stall_until_accepted = false) { - if (!running) { - return; - } - json call_json = call; - push(call_json, stall_until_accepted); - } - - void push(const json& message, const bool stall_until_accepted = false) { + void push_call(const json& message, const bool stall_until_accepted = false) { if (!running) { return; } @@ -664,16 +655,15 @@ template class MessageQueue { } /// \brief Sends a new \p call_result message over the websocket - template void push(CallResult call_result) { + void push_call_result(const json& call_result) { if (!running) { return; } - this->send_callback(call_result); { std::lock_guard lk(this->next_message_mutex); if (next_message_to_send.has_value()) { - if (next_message_to_send.value() == call_result.uniqueId) { + if (next_message_to_send.value() == call_result.at(MESSAGE_ID)) { next_message_to_send.reset(); } } @@ -683,7 +673,7 @@ template class MessageQueue { } /// \brief Sends a new \p call_error message over the websocket - void push(CallError call_error) { + void push_call_error(CallError call_error) { if (!running) { return; } @@ -703,7 +693,7 @@ template class MessageQueue { /// \brief pushes a new \p call message onto the message queue /// \returns a future from which the CallResult can be extracted - template std::future> push_async(Call call) { + std::future> push_call_async(const json& call) { auto message = std::make_shared>(call); if (!running) { diff --git a/include/ocpp/v16/charge_point_impl.hpp b/include/ocpp/v16/charge_point_impl.hpp index bcb31bdd2..0f79a59d9 100644 --- a/include/ocpp/v16/charge_point_impl.hpp +++ b/include/ocpp/v16/charge_point_impl.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -87,7 +89,7 @@ class ChargePointImpl : ocpp::ChargingStationBase { BootReasonEnum bootreason; ChargePointConnectionState connection_state; bool boot_notification_callerror; - RegistrationStatus registration_status; + std::atomic registration_status; DiagnosticsStatus diagnostics_status; FirmwareStatus firmware_status; bool firmware_update_is_pending = false; @@ -95,6 +97,7 @@ class ChargePointImpl : ocpp::ChargingStationBase { std::string message_log_path; std::unique_ptr websocket; + std::unique_ptr> message_dispatcher; Everest::SteadyTimer websocket_timer; std::unique_ptr> message_queue; std::map> connectors; @@ -205,11 +208,6 @@ class ChargePointImpl : ocpp::ChargingStationBase { std::unique_ptr> create_message_queue(); void message_callback(const std::string& message); void handle_message(const EnhancedMessage& message); - template bool send(Call call, bool initiated_by_trigger_message = false); - template - std::future> send_async(Call call, bool initiated_by_trigger_message = false); - template bool send(CallResult call_result); - bool send(CallError call_error); void heartbeat(bool initiated_by_trigger_message = false); void boot_notification(bool initiated_by_trigger_message = false); void clock_aligned_meter_values_sample(); diff --git a/include/ocpp/v16/message_dispatcher.hpp b/include/ocpp/v16/message_dispatcher.hpp new file mode 100644 index 000000000..5c82e2234 --- /dev/null +++ b/include/ocpp/v16/message_dispatcher.hpp @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Pionix GmbH and Contributors to EVerest + +#pragma once + +#include +#include + +namespace ocpp { +namespace v16 { + +class MessageDispatcher : public MessageDispatcherInterface { + +public: + MessageDispatcher(ocpp::MessageQueue& message_queue, ChargePointConfiguration& configuration, + std::atomic& registration_status) : + message_queue(message_queue), configuration(configuration), registration_status(registration_status){}; + void dispatch_call(const json& call, bool triggered = false) override; + std::future> dispatch_call_async(const json& call, bool triggered) override; + void dispatch_call_result(const json& call_result) override; + void dispatch_call_error(const json& call_error) override; + +private: + ocpp::MessageQueue& message_queue; + ChargePointConfiguration& configuration; + std::atomic& registration_status; +}; + +} // namespace v16 +} // namespace ocpp \ No newline at end of file diff --git a/include/ocpp/v201/charge_point.hpp b/include/ocpp/v201/charge_point.hpp index 35a963e3a..2ed41a608 100644 --- a/include/ocpp/v201/charge_point.hpp +++ b/include/ocpp/v201/charge_point.hpp @@ -7,6 +7,8 @@ #include #include +#include + #include #include @@ -373,6 +375,8 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa std::unique_ptr evse_manager; std::unique_ptr connectivity_manager; + std::unique_ptr> message_dispatcher; + // utility std::shared_ptr> message_queue; std::shared_ptr database_handler; @@ -401,7 +405,7 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa std::atomic_bool stop_auth_cache_cleanup_handler; // states - RegistrationStatusEnum registration_status; + std::atomic registration_status; FirmwareStatusEnum firmware_status; // The request ID in the last firmware update status received std::optional firmware_status_id; @@ -453,8 +457,6 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa /// \brief optional delay to resumption of message queue after reconnecting to the CSMS std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0); - bool send(CallError call_error); - // internal helper functions void initialize(const std::map& evse_connector_structure, const std::string& message_log_path); void init_certificate_expiration_check_timers(); @@ -748,20 +750,13 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa // Functional Block P: DataTransfer void handle_data_transfer_req(Call call); - // general message handling - template bool send(ocpp::Call call, const bool initiated_by_trigger_message = false); - - template std::future> send_async(ocpp::Call call); - - template bool send(ocpp::CallResult call_result); - // Generates async sending callbacks template std::function send_callback(MessageType expected_response_message_type) { return [this, expected_response_message_type](auto request) { MessageId message_id = MessageId(to_string(this->uuid_generator())); const auto enhanced_response = - this->send_async(ocpp::Call(request, message_id)).get(); + this->message_dispatcher->dispatch_call_async(ocpp::Call(request, message_id)).get(); if (enhanced_response.messageType != expected_response_message_type) { throw UnexpectedMessageTypeFromCSMS( std::string("Got unexpected message type from CSMS, expected: ") + diff --git a/include/ocpp/v201/message_dispatcher.hpp b/include/ocpp/v201/message_dispatcher.hpp new file mode 100644 index 000000000..a0ff42a5e --- /dev/null +++ b/include/ocpp/v201/message_dispatcher.hpp @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Pionix GmbH and Contributors to EVerest + +#pragma once + +#include +#include +#include + +namespace ocpp { +namespace v201 { + +class MessageDispatcher : public MessageDispatcherInterface { + +public: + MessageDispatcher(ocpp::MessageQueue& message_queue, DeviceModel& device_model, + std::atomic& registration_status) : + message_queue(message_queue), device_model(device_model), registration_status(registration_status){}; + void dispatch_call(const json& call, bool triggered = false) override; + std::future> dispatch_call_async(const json& call, bool triggered) override; + void dispatch_call_result(const json& call_result) override; + void dispatch_call_error(const json& call_error) override; + +private: + ocpp::MessageQueue& message_queue; + DeviceModel& device_model; + std::atomic& registration_status; +}; + +} // namespace v201 +} // namespace ocpp \ No newline at end of file diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 246a53f4f..9653406bc 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -38,6 +38,7 @@ if(LIBOCPP_ENABLE_V16) ocpp/v16/charge_point.cpp ocpp/v16/database_handler.cpp ocpp/v16/charge_point_impl.cpp + ocpp/v16/message_dispatcher.cpp ocpp/v16/smart_charging.cpp ocpp/v16/charge_point_configuration.cpp ocpp/v16/charge_point_state_machine.cpp @@ -80,6 +81,7 @@ if(LIBOCPP_ENABLE_V201) ocpp/v201/utils.cpp ocpp/v201/component_state_manager.cpp ocpp/v201/connectivity_manager.cpp + ocpp/v201/message_dispatcher.cpp ) add_subdirectory(ocpp/v201/messages) endif() diff --git a/lib/ocpp/v16/charge_point_impl.cpp b/lib/ocpp/v16/charge_point_impl.cpp index 639d3c33a..0c9501f69 100644 --- a/lib/ocpp/v16/charge_point_impl.cpp +++ b/lib/ocpp/v16/charge_point_impl.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -32,7 +33,6 @@ const auto INITIAL_CERTIFICATE_REQUESTS_DELAY = std::chrono::seconds(60); const auto WEBSOCKET_INIT_DELAY = std::chrono::seconds(2); const auto DEFAULT_MESSAGE_QUEUE_SIZE_THRESHOLD = 2E5; const auto DEFAULT_BOOT_NOTIFICATION_INTERVAL_S = 60; // fallback interval if BootNotification returns interval of 0. -const auto DEFAULT_WAIT_FOR_FUTURE_TIMEOUT = std::chrono::seconds(60); const auto DEFAULT_PRICE_NUMBER_OF_DECIMALS = 3; ChargePointImpl::ChargePointImpl(const std::string& config, const fs::path& share_path, @@ -60,6 +60,8 @@ ChargePointImpl::ChargePointImpl(const std::string& config, const fs::path& shar this->transaction_handler = std::make_unique(this->configuration->getNumberOfConnectors()); this->external_notify = {v16::MessageType::StartTransactionResponse}; this->message_queue = this->create_message_queue(); + this->message_dispatcher = + std::make_unique(*this->message_queue, *this->configuration, this->registration_status); auto log_formats = this->configuration->getLogMessagesFormat(); bool log_to_console = std::find(log_formats.begin(), log_formats.end(), "console") != log_formats.end(); bool detailed_log_to_console = @@ -417,7 +419,7 @@ void ChargePointImpl::heartbeat(bool initiated_by_trigger_message) { HeartbeatRequest req; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePointImpl::boot_notification(bool initiated_by_trigger_message) { @@ -434,7 +436,7 @@ void ChargePointImpl::boot_notification(bool initiated_by_trigger_message) { req.meterType = this->configuration->getMeterType(); ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePointImpl::clock_aligned_meter_values_sample() { @@ -975,7 +977,7 @@ void ChargePointImpl::send_meter_value(int32_t connector, MeterValue meter_value req.meterValue.push_back(meter_value); ocpp::Call call(req, message_id); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePointImpl::send_meter_value_on_pricing_trigger(const int32_t connector_number, @@ -1116,6 +1118,8 @@ bool ChargePointImpl::restart(const std::map& connector_ this->database_handler->open_connection(); // instantiating new message queue on restart this->message_queue = this->create_message_queue(); + this->message_dispatcher = + std::make_unique(*this->message_queue, *this->configuration, this->registration_status); return this->start(connector_status_map, bootreason, {}); } else { EVLOG_warning << "Attempting to restart Chargepoint while it has not been stopped before"; @@ -1257,10 +1261,11 @@ void ChargePointImpl::message_callback(const std::string& message) { enhanced_message = this->message_queue->receive(message); } catch (const TimePointParseException& e) { EVLOG_error << "Exception during handling of message: " << e.what(); - this->send(CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({}))); + this->message_dispatcher->dispatch_call_error( + CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({}))); } catch (const json::exception& e) { EVLOG_error << "JSON exception during reception of message: " << e.what(); - this->send(CallError(MessageId("-1"), "GenericError", e.what(), json({}))); + this->message_dispatcher->dispatch_call_error(CallError(MessageId("-1"), "GenericError", e.what(), json({}))); return; } @@ -1273,7 +1278,7 @@ void ChargePointImpl::message_callback(const std::string& message) { // FIXME(kai): however, only send a CALLERROR when it is a CALL message we just received if (enhanced_message.messageTypeId == MessageTypeId::CALL) { auto call_error = CallError(enhanced_message.uniqueId, "NotSupported", "", json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } else if (enhanced_message.messageTypeId == MessageTypeId::CALLERROR) { EVLOG_error << "Received a CALLERROR in response to a " << conversions::messagetype_to_string(enhanced_message.messageType) << ": " << message; @@ -1316,13 +1321,13 @@ void ChargePointImpl::message_callback(const std::string& message) { response.status = RemoteStartStopStatus::Rejected; const ocpp::CallResult call_result(response, enhanced_message.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } else if (enhanced_message.messageType == MessageType::RemoteStopTransaction) { RemoteStopTransactionResponse response; response.status = RemoteStartStopStatus::Rejected; const ocpp::CallResult call_result(response, enhanced_message.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } else { this->handle_message(enhanced_message); } @@ -1342,14 +1347,14 @@ void ChargePointImpl::message_callback(const std::string& message) { EVLOG_error << "JSON exception during handling of message: " << e.what(); if (json_message.is_array() && json_message.size() > MESSAGE_ID) { auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); this->securityEventNotification(ocpp::security_events::INVALIDMESSAGES, std::optional>(message), true); } } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); this->securityEventNotification(ocpp::security_events::INVALIDMESSAGES, std::optional>(message), true); } @@ -1663,7 +1668,7 @@ void ChargePointImpl::handleChangeAvailabilityRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); // if scheduled: execute status transition for connector 0 // if accepted: execute status transition for connector 0 and other connectors @@ -1704,7 +1709,7 @@ void ChargePointImpl::handleChangeConfigurationRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); responded = true; this->websocket->reconnect(1000); } else { @@ -1739,7 +1744,7 @@ void ChargePointImpl::handleChangeConfigurationRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); int32_t security_profile = std::stoi(call.msg.value); responded = true; this->switch_security_profile_callback = [this, security_profile]() { @@ -1794,7 +1799,7 @@ void ChargePointImpl::handleChangeConfigurationRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } if (this->configuration_key_changed_callbacks.count(call.msg.key) and @@ -1845,7 +1850,7 @@ void ChargePointImpl::handleClearCacheRequest(ocpp::Call call } catch (QueryExecutionException& e) { auto call_error = CallError(call.uniqueId, "InternalError", "Database error while clearing authorization cache", json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } } else { @@ -1853,7 +1858,7 @@ void ChargePointImpl::handleClearCacheRequest(ocpp::Call call } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleDataTransferRequest(ocpp::Call call) { @@ -1901,7 +1906,7 @@ void ChargePointImpl::handleDataTransferRequest(ocpp::Call } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleGetConfigurationRequest(ocpp::Call call) { @@ -1909,7 +1914,7 @@ void ChargePointImpl::handleGetConfigurationRequest(ocpp::Callget_configuration_key(call.msg); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleRemoteStartTransactionRequest(ocpp::Call call) { @@ -1925,7 +1930,7 @@ void ChargePointImpl::handleRemoteStartTransactionRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } referenced_connectors.push_back(call.msg.connectorId.value()); @@ -1969,7 +1974,7 @@ void ChargePointImpl::handleRemoteStartTransactionRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -1988,7 +1993,7 @@ void ChargePointImpl::handleRemoteStartTransactionRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } } @@ -2006,7 +2011,7 @@ void ChargePointImpl::handleRemoteStartTransactionRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (this->configuration->getAuthorizeRemoteTxRequests()) { this->provide_token_callback(call.msg.idTag.get(), referenced_connectors, false); @@ -2059,7 +2064,7 @@ void ChargePointImpl::handleRemoteStopTransactionRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (connector > 0) { this->stop_transaction_callback(connector, Reason::Remote); @@ -2082,7 +2087,7 @@ void ChargePointImpl::handleResetRequest(ocpp::Call call) { // send response ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == ResetStatus::Accepted) { // gracefully stop all transactions and send StopTransaction. Restart software afterwards @@ -2250,7 +2255,7 @@ void ChargePointImpl::handleUnlockConnectorRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleHeartbeatResponse(CallResult call_result) { @@ -2300,7 +2305,7 @@ void ChargePointImpl::handleSetChargingProfileRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == ChargingProfileStatus::Accepted) { if (this->signal_set_charging_profiles_callback != nullptr) { @@ -2348,7 +2353,7 @@ void ChargePointImpl::handleGetCompositeScheduleRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleClearChargingProfileRequest(ocpp::Call call) { @@ -2374,7 +2379,7 @@ void ChargePointImpl::handleClearChargingProfileRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == ClearChargingProfileStatus::Accepted and this->signal_set_charging_profiles_callback != nullptr) { @@ -2416,7 +2421,7 @@ void ChargePointImpl::handleTriggerMessageRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (!valid) { return; @@ -2476,7 +2481,7 @@ void ChargePointImpl::handleGetDiagnosticsRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleUpdateFirmwareRequest(ocpp::Call call) { @@ -2486,7 +2491,7 @@ void ChargePointImpl::handleUpdateFirmwareRequest(ocpp::Callupdate_firmware_callback(call.msg); } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleExtendedTriggerMessageRequest(ocpp::Call call) { @@ -2532,7 +2537,7 @@ void ChargePointImpl::handleExtendedTriggerMessageRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (!valid) { return; @@ -2614,7 +2619,7 @@ void ChargePointImpl::sign_certificate(const ocpp::CertificateSigningUseEnum& ce req.csr = response.csr.value(); ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePointImpl::update_ocsp_cache() { @@ -2659,7 +2664,7 @@ void ChargePointImpl::handleCertificateSignedRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == CertificateSignedStatusEnumType::Rejected) { this->securityEventNotification( @@ -2704,7 +2709,7 @@ void ChargePointImpl::handleGetInstalledCertificateIdsRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleDeleteCertificateRequest(ocpp::Call call) { @@ -2719,7 +2724,7 @@ void ChargePointImpl::handleDeleteCertificateRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleInstallCertificateRequest(ocpp::Call call) { @@ -2744,7 +2749,7 @@ void ChargePointImpl::handleInstallCertificateRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == InstallCertificateStatusEnumType::Rejected) { this->securityEventNotification( @@ -2764,7 +2769,7 @@ void ChargePointImpl::handleGetLogRequest(ocpp::Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleSignedUpdateFirmware(ocpp::Call call) { @@ -2776,11 +2781,11 @@ void ChargePointImpl::handleSignedUpdateFirmware(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } else { response.status = this->signed_update_firmware_callback(call.msg); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } if (response.status == UpdateFirmwareStatusEnumType::InvalidCertificate) { @@ -2813,7 +2818,7 @@ void ChargePointImpl::securityEventNotification(const CiString<50>& event_type, if (critical_security_event and !this->configuration->getDisableSecurityEventNotifications()) { ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } if (triggered_internally and this->security_event_callback != nullptr) { @@ -2834,7 +2839,7 @@ void ChargePointImpl::log_status_notification(UploadLogStatusEnumType status, in this->log_status_request_id = requestId; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePointImpl::signed_firmware_update_status_notification(FirmwareStatusEnumType status, int requestId, @@ -2861,7 +2866,7 @@ void ChargePointImpl::signed_firmware_update_status_notification(FirmwareStatusE this->signed_firmware_status_request_id = requestId; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); if (status == FirmwareStatusEnumType::InvalidSignature) { this->securityEventNotification(ocpp::security_events::INVALIDFIRMWARESIGNATURE, std::nullopt, true); @@ -2887,7 +2892,7 @@ void ChargePointImpl::handleReserveNowRequest(ocpp::Call call } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleCancelReservationRequest(ocpp::Call call) { @@ -2900,7 +2905,7 @@ void ChargePointImpl::handleCancelReservationRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleSendLocalListRequest(ocpp::Call call) { @@ -2954,7 +2959,7 @@ void ChargePointImpl::handleSendLocalListRequest(ocpp::Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handleGetLocalListVersionRequest(ocpp::Call call) { @@ -2976,7 +2981,7 @@ void ChargePointImpl::handleGetLocalListVersionRequest(ocpp::Callsend(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } catch (RequiredEntryNotFoundException& e) { try { @@ -2987,13 +2992,13 @@ void ChargePointImpl::handleGetLocalListVersionRequest(ocpp::Callsend(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } DataTransferResponse ChargePointImpl::handle_set_user_price(const std::optional& msg) { @@ -3215,58 +3220,6 @@ void ChargePointImpl::set_time_offset_timer(const std::string& date_time) { this->change_time_offset_timer->at(d.to_time_point()); } -template bool ChargePointImpl::send(ocpp::Call call, bool initiated_by_trigger_message) { - const auto message_type = conversions::string_to_messagetype(json(call).at(CALL_ACTION)); - const auto message_transmission_priority = get_message_transmission_priority( - is_boot_notification_message(message_type), initiated_by_trigger_message, - (this->registration_status == RegistrationStatus::Accepted), is_transaction_message(message_type), - this->configuration->getQueueAllMessages().value_or(false)); - switch (message_transmission_priority) { - case MessageTransmissionPriority::SendImmediately: - this->message_queue->push(call); - return true; - case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: - this->message_queue->push(call, true); - return true; - case MessageTransmissionPriority::Discard: - return false; - } - throw std::runtime_error("Missing handling for MessageTransmissionPriority"); -} - -template -std::future> ChargePointImpl::send_async(ocpp::Call call, - bool initiated_by_trigger_message) { - const auto message_type = conversions::string_to_messagetype(json(call).at(CALL_ACTION)); - const auto message_transmission_priority = get_message_transmission_priority( - is_boot_notification_message(message_type), initiated_by_trigger_message, - (this->registration_status == RegistrationStatus::Accepted), is_transaction_message(message_type), - this->configuration->getQueueAllMessages().value_or(false)); - - switch (message_transmission_priority) { - case MessageTransmissionPriority::SendImmediately: - return this->message_queue->push_async(call); - case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: - case MessageTransmissionPriority::Discard: - auto promise = std::promise>(); - auto enhanced_message = EnhancedMessage(); - enhanced_message.offline = true; - promise.set_value(enhanced_message); - return promise.get_future(); - } - throw std::runtime_error("Missing handling for MessageTransmissionPriority"); -} - -template bool ChargePointImpl::send(ocpp::CallResult call_result) { - this->message_queue->push(call_result); - return true; -} - -bool ChargePointImpl::send(CallError call_error) { - this->message_queue->push(call_error); - return true; -} - void ChargePointImpl::status_notification(const int32_t connector, const ChargePointErrorCode errorCode, const ChargePointStatus status, const ocpp::DateTime& timestamp, const std::optional>& info, @@ -3282,7 +3235,7 @@ void ChargePointImpl::status_notification(const int32_t connector, const ChargeP request.vendorId = vendor_id; request.vendorErrorCode = vendor_error_code; ocpp::Call call(request, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } // public API for Core profile @@ -3329,7 +3282,7 @@ IdTagInfo ChargePointImpl::authorize_id_token(CiString<20> idTag, const bool aut ocpp::Call call(req, this->message_queue->createMessageId()); - auto authorize_future = this->send_async(call); + auto authorize_future = this->message_dispatcher->dispatch_call_async(call); if (authorize_future.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { EVLOG_warning << "Waiting for Authorize.conf future timed out!"; @@ -3542,7 +3495,7 @@ ocpp::v201::AuthorizeResponse ChargePointImpl::data_transfer_pnc_authorize( // Send the DataTransfer(Authorize) to the CSMS Call call(req, this->message_queue->createMessageId()); - auto authorize_future = this->send_async(call); + auto authorize_future = this->message_dispatcher->dispatch_call_async(call); if (authorize_future.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { EVLOG_warning << "Waiting for DataTransfer.conf(Authorize) future timed out!"; @@ -3624,7 +3577,7 @@ void ChargePointImpl::data_transfer_pnc_sign_certificate() { req.data.emplace(json(csr_req).dump()); Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } void ChargePointImpl::data_transfer_pnc_get_15118_ev_certificate( @@ -3650,7 +3603,7 @@ void ChargePointImpl::data_transfer_pnc_get_15118_ev_certificate( req.data.emplace(json(cert_req).dump()); Call call(req, this->message_queue->createMessageId()); - auto future = this->send_async(call); + auto future = this->message_dispatcher->dispatch_call_async(call); if (future.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { EVLOG_warning << "Waiting for DataTransfer.conf(Get15118EVCertificate) future timed out!"; @@ -3700,7 +3653,7 @@ void ChargePointImpl::data_transfer_pnc_get_certificate_status(const ocpp::v201: req.data.emplace(json(cert_status_req).dump()); Call call(req, this->message_queue->createMessageId()); - auto future = this->send_async(call); + auto future = this->message_dispatcher->dispatch_call_async(call); if (future.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { EVLOG_warning << "Waiting for DataTransfer.conf(GetCertificateStatus) future timed out!"; @@ -3770,7 +3723,7 @@ void ChargePointImpl::handle_data_transfer_pnc_trigger_message(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == DataTransferStatus::Accepted) { // send sign certificate wrapped in data_transfer @@ -3820,7 +3773,7 @@ void ChargePointImpl::handle_data_transfer_pnc_certificate_signed(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (certificate_response.status == CertificateSignedStatusEnumType::Rejected) { this->securityEventNotification(ocpp::security_events::INVALIDCHARGEPOINTCERTIFICATE, @@ -3829,11 +3782,11 @@ void ChargePointImpl::handle_data_transfer_pnc_certificate_signed(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } catch (const std::exception& e) { EVLOG_error << "Unknown Error while handling DataTransfer message CertificateSigned.req: " << e.what(); CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } } @@ -3883,7 +3836,7 @@ void ChargePointImpl::handle_data_transfer_pnc_get_installed_certificates(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handle_data_transfer_delete_certificate(Call call) { @@ -3913,7 +3866,7 @@ void ChargePointImpl::handle_data_transfer_delete_certificate(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePointImpl::handle_data_transfer_install_certificate(Call call) { @@ -3941,7 +3894,7 @@ void ChargePointImpl::handle_data_transfer_install_certificate(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } std::optional ChargePointImpl::data_transfer(const CiString<255>& vendorId, @@ -3955,7 +3908,7 @@ std::optional ChargePointImpl::data_transfer(const CiStrin DataTransferResponse response; response.status = DataTransferStatus::Rejected; ocpp::Call call(req, this->message_queue->createMessageId()); - auto data_transfer_future = this->send_async(call); + auto data_transfer_future = this->message_dispatcher->dispatch_call_async(call); if (this->websocket == nullptr or !this->websocket->is_connected()) { EVLOG_warning << "Attempting to send DataTransfer.req but charging station is offline"; @@ -4057,7 +4010,7 @@ void ChargePointImpl::start_transaction(std::shared_ptr transaction transaction->set_start_transaction_message_id(message_id.get()); transaction->change_meter_values_sample_interval(this->configuration->getMeterValueSampleInterval()); - this->send(call); + this->message_dispatcher->dispatch_call(call); if (this->transaction_started_callback != nullptr) { this->transaction_started_callback(transaction->get_connector(), transaction->get_session_id()); @@ -4222,7 +4175,7 @@ void ChargePointImpl::stop_transaction(int32_t connector, Reason reason, std::op { std::lock_guard lock(this->stop_transaction_mutex); - this->send(call); + this->message_dispatcher->dispatch_call(call); } if (this->transaction_stopped_callback != nullptr) { @@ -4370,7 +4323,7 @@ void ChargePointImpl::diagnostic_status_notification(DiagnosticsStatus status, b this->diagnostics_status = status; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send_async(call, true); + this->message_dispatcher->dispatch_call_async(call, true); } void ChargePointImpl::firmware_status_notification(FirmwareStatus status, bool initiated_by_trigger_message) { @@ -4392,7 +4345,7 @@ void ChargePointImpl::firmware_status_notification(FirmwareStatus status, bool i this->firmware_status = status; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send_async(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call_async(call, initiated_by_trigger_message); if (this->firmware_update_is_pending) { this->change_all_connectors_to_unavailable_for_firmware_update(); diff --git a/lib/ocpp/v16/message_dispatcher.cpp b/lib/ocpp/v16/message_dispatcher.cpp new file mode 100644 index 000000000..6283c5e7b --- /dev/null +++ b/lib/ocpp/v16/message_dispatcher.cpp @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Pionix GmbH and Contributors to EVerest + +#include + +namespace ocpp { +namespace v16 { + +void MessageDispatcher::dispatch_call(const json& call, bool triggered) { + const auto message_type = conversions::string_to_messagetype(call.at(CALL_ACTION)); + const auto message_transmission_priority = get_message_transmission_priority( + is_boot_notification_message(message_type), triggered, + (this->registration_status == RegistrationStatus::Accepted), is_transaction_message(message_type), + this->configuration.getQueueAllMessages().value_or(false)); + switch (message_transmission_priority) { + case MessageTransmissionPriority::SendImmediately: + this->message_queue.push_call(call); + return; + case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: + this->message_queue.push_call(call, true); + return; + case MessageTransmissionPriority::Discard: + return; + } + throw std::runtime_error("Missing handling for MessageTransmissionPriority"); +} + +std::future> MessageDispatcher::dispatch_call_async(const json& call, + bool triggered) { + const auto message_type = conversions::string_to_messagetype(call.at(CALL_ACTION)); + const auto message_transmission_priority = get_message_transmission_priority( + is_boot_notification_message(message_type), triggered, + (this->registration_status == RegistrationStatus::Accepted), is_transaction_message(message_type), + this->configuration.getQueueAllMessages().value_or(false)); + + switch (message_transmission_priority) { + case MessageTransmissionPriority::SendImmediately: + return this->message_queue.push_call_async(call); + case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: + case MessageTransmissionPriority::Discard: + auto promise = std::promise>(); + auto enhanced_message = EnhancedMessage(); + enhanced_message.offline = true; + promise.set_value(enhanced_message); + return promise.get_future(); + } + throw std::runtime_error("Missing handling for MessageTransmissionPriority"); +} + +void MessageDispatcher::dispatch_call_result(const json& call_result) { + this->message_queue.push_call_result(call_result); +} + +void MessageDispatcher::dispatch_call_error(const json& call_error) { + this->message_queue.push_call_error(call_error); +} + +} // namespace v16 +} // namespace ocpp \ No newline at end of file diff --git a/lib/ocpp/v201/charge_point.cpp b/lib/ocpp/v201/charge_point.cpp index 6043d228c..3f9fda8cc 100644 --- a/lib/ocpp/v201/charge_point.cpp +++ b/lib/ocpp/v201/charge_point.cpp @@ -1,10 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Pionix GmbH and Contributors to EVerest +#include #include #include #include #include +#include #include #include #include @@ -19,7 +21,6 @@ using namespace std::chrono_literals; const auto DEFAULT_MAX_CUSTOMER_INFORMATION_DATA_LENGTH = 51200; const std::string VARIABLE_ATTRIBUTE_VALUE_SOURCE_INTERNAL = "internal"; const std::string VARIABLE_ATTRIBUTE_VALUE_SOURCE_CSMS = "csms"; -const auto DEFAULT_WAIT_FOR_FUTURE_TIMEOUT = std::chrono::seconds(60); const auto DEFAULT_PRICE_NUMBER_OF_DECIMALS = 3; using DatabaseException = ocpp::common::DatabaseException; @@ -74,7 +75,14 @@ ChargePoint::ChargePoint(const std::map& evse_connector_struct EVLOG_AND_THROW(std::invalid_argument("Database handler should not be null")); } + if (!this->message_queue) { + EVLOG_AND_THROW(std::invalid_argument("Message Queue should not be null")); + } + initialize(evse_connector_structure, message_log_path); + + this->message_dispatcher = + std::make_unique(*this->message_queue, *this->device_model, registration_status); } ChargePoint::ChargePoint(const std::map& evse_connector_structure, @@ -117,6 +125,9 @@ ChargePoint::ChargePoint(const std::map& evse_connector_struct message_types_discard_for_queueing, this->device_model->get_value(ControllerComponentVariables::MessageTimeout)}, this->database_handler); + + this->message_dispatcher = + std::make_unique(*this->message_queue, *this->device_model, registration_status); } ChargePoint::ChargePoint(const std::map& evse_connector_structure, @@ -233,7 +244,7 @@ void ChargePoint::on_firmware_update_status_notification(int32_t request_id, } ocpp::Call call(req, this->message_queue->createMessageId()); - this->send_async(call); + this->message_dispatcher->dispatch_call_async(call); if (req.status == FirmwareStatusEnum::Installed) { std::string firmwareVersionMessage = "New firmware succesfully installed! Version: "; @@ -262,7 +273,7 @@ void ChargePoint::on_firmware_update_status_notification(int32_t request_id, this->firmware_status = FirmwareStatusEnum::InstallScheduled; req.status = firmware_status; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send_async(call); + this->message_dispatcher->dispatch_call_async(call); } this->change_all_connectors_to_unavailable_for_firmware_update(); } @@ -287,7 +298,7 @@ ChargePoint::on_get_15118_ev_certificate_request(const Get15118EVCertificateRequ } EVLOG_debug << "Received Get15118EVCertificateRequest " << request; - auto future_res = this->send_async( + auto future_res = this->message_dispatcher->dispatch_call_async( ocpp::Call(request, this->message_queue->createMessageId())); if (future_res.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { @@ -309,7 +320,7 @@ ChargePoint::on_get_15118_ev_certificate_request(const Get15118EVCertificateRequ } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(response_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return response; } } @@ -1069,7 +1080,7 @@ void ChargePoint::on_log_status_notification(UploadLogStatusEnum status, int32_t this->upload_log_status_id = requestId; ocpp::Call call(request, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } void ChargePoint::on_security_event(const CiString<50>& event_type, const std::optional>& tech_info, @@ -1087,11 +1098,6 @@ void ChargePoint::on_variable_changed(const SetVariableData& set_variable_data) this->handle_variable_changed(set_variable_data); } -bool ChargePoint::send(CallError call_error) { - this->message_queue->push(call_error); - return true; -} - void ChargePoint::initialize(const std::map& evse_connector_structure, const std::string& message_log_path) { this->device_model->check_integrity(evse_connector_structure); @@ -1377,7 +1383,7 @@ void ChargePoint::handle_message(const EnhancedMessage& messa default: if (message.messageTypeId == MessageTypeId::CALL) { const auto call_error = CallError(message.uniqueId, "NotImplemented", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } break; } @@ -1390,7 +1396,8 @@ void ChargePoint::message_callback(const std::string& message) { } catch (const json::exception& e) { this->logging->central_system("Unknown", message); EVLOG_error << "JSON exception during reception of message: " << e.what(); - this->send(CallError(MessageId("-1"), "RpcFrameworkError", e.what(), json({}))); + this->message_dispatcher->dispatch_call_error( + CallError(MessageId("-1"), "RpcFrameworkError", e.what(), json({}))); const auto& security_event = ocpp::security_events::INVALIDMESSAGES; this->security_event_notification_req(CiString<50>(security_event), CiString<255>(message), true, utils::is_critical(security_event)); @@ -1398,7 +1405,7 @@ void ChargePoint::message_callback(const std::string& message) { } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(MessageId("-1"), "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); const auto& security_event = ocpp::security_events::INVALIDMESSAGES; this->security_event_notification_req(CiString<50>(security_event), CiString<255>(message), true, utils::is_critical(security_event)); @@ -1429,14 +1436,14 @@ void ChargePoint::message_callback(const std::string& message) { response.status = RequestStartStopStatusEnum::Rejected; const ocpp::CallResult call_result(response, enhanced_message.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } else if (enhanced_message.messageType == MessageType::RequestStopTransaction) { // Send rejected: B02.FR.05 RequestStopTransactionResponse response; response.status = RequestStartStopStatusEnum::Rejected; const ocpp::CallResult call_result(response, enhanced_message.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } else { std::string const call_error_message = "Received invalid MessageType: " + @@ -1446,7 +1453,7 @@ void ChargePoint::message_callback(const std::string& message) { // B02.FR.09 send CALLERROR SecurityError const auto call_error = CallError(enhanced_message.uniqueId, "SecurityError", call_error_message, json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } } } else if (this->registration_status == RegistrationStatusEnum::Rejected) { @@ -1462,37 +1469,37 @@ void ChargePoint::message_callback(const std::string& message) { "having received an accepted BootNotificationResponse"; EVLOG_warning << error_message; const auto call_error = CallError(enhanced_message.uniqueId, "SecurityError", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } } else { const auto error_message = "Received other message than BootNotificationResponse before " "having received an accepted BootNotificationResponse"; EVLOG_warning << error_message; const auto call_error = CallError(enhanced_message.uniqueId, "SecurityError", "", json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } } } catch (const EvseOutOfRangeException& e) { EVLOG_error << "Exception during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "OccurrenceConstraintViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } catch (const ConnectorOutOfRangeException& e) { EVLOG_error << "Exception during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "OccurrenceConstraintViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } catch (const TimePointParseException& e) { EVLOG_error << "Exception during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } catch (json::exception& e) { EVLOG_error << "JSON exception during handling of message: " << e.what(); if (json_message.is_array() and json_message.size() > MESSAGE_ID) { auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); } } } @@ -2014,7 +2021,7 @@ void ChargePoint::security_event_notification_req(const CiString<50>& event_type this->logging->security(json(req).dump()); if (critical) { ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } if (triggered_internally and this->callbacks.security_event_callback != nullptr) { this->callbacks.security_event_callback(event_type, tech_info); @@ -2089,7 +2096,7 @@ void ChargePoint::sign_certificate_req(const ocpp::CertificateSigningUseEnum& ce this->awaited_certificate_signing_use_enum = certificate_signing_use; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePoint::boot_notification_req(const BootReasonEnum& reason, const bool initiated_by_trigger_message) { @@ -2109,7 +2116,7 @@ void ChargePoint::boot_notification_req(const BootReasonEnum& reason, const bool req.chargingStation = charging_station; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePoint::notify_report_req(const int request_id, const std::vector& report_data) { @@ -2123,7 +2130,7 @@ void ChargePoint::notify_report_req(const int request_id, const std::vector call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } else { NotifyReportRequestsSplitter splitter{ req, @@ -2131,7 +2138,7 @@ void ChargePoint::notify_report_req(const int request_id, const std::vectormessage_queue->createMessageId(); }}; for (const auto& msg : splitter.create_call_payloads()) { - this->message_queue->push(msg); + this->message_queue->push_call(msg); } } } @@ -2151,7 +2158,7 @@ AuthorizeResponse ChargePoint::authorize_req(const IdToken id_token, const std:: } ocpp::Call call(req, this->message_queue->createMessageId()); - auto future = this->send_async(call); + auto future = this->message_dispatcher->dispatch_call_async(call); if (future.wait_for(DEFAULT_WAIT_FOR_FUTURE_TIMEOUT) == std::future_status::timeout) { EVLOG_warning << "Waiting for DataTransfer.conf(Authorize) future timed out!"; @@ -2170,7 +2177,7 @@ AuthorizeResponse ChargePoint::authorize_req(const IdToken id_token, const std:: } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return response; } } @@ -2184,7 +2191,7 @@ void ChargePoint::status_notification_req(const int32_t evse_id, const int32_t c req.connectorStatus = status; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePoint::heartbeat_req(const bool initiated_by_trigger_message) { @@ -2192,7 +2199,7 @@ void ChargePoint::heartbeat_req(const bool initiated_by_trigger_message) { heartbeat_request_time = std::chrono::steady_clock::now(); ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePoint::transaction_event_req(const TransactionEventEnum& event_type, const DateTime& timestamp, @@ -2248,7 +2255,7 @@ void ChargePoint::transaction_event_req(const TransactionEventEnum& event_type, remote_start_id_per_evse.erase(it); } - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); if (this->callbacks.transaction_event_callback.has_value()) { this->callbacks.transaction_event_callback.value()(req); @@ -2262,7 +2269,7 @@ void ChargePoint::meter_values_req(const int32_t evse_id, const std::vector call(req, this->message_queue->createMessageId()); - this->send(call, initiated_by_trigger_message); + this->message_dispatcher->dispatch_call(call, initiated_by_trigger_message); } void ChargePoint::report_charging_profile_req(const int32_t request_id, const int32_t evse_id, @@ -2276,12 +2283,12 @@ void ChargePoint::report_charging_profile_req(const int32_t request_id, const in req.tbc = tbc; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } void ChargePoint::report_charging_profile_req(const ReportChargingProfilesRequest& req) { ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } void ChargePoint::notify_event_req(const std::vector& events) { @@ -2291,7 +2298,7 @@ void ChargePoint::notify_event_req(const std::vector& events) { req.seqNo = 0; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } void ChargePoint::notify_customer_information_req(const std::string& data, const int32_t request_id) { @@ -2309,7 +2316,7 @@ void ChargePoint::notify_customer_information_req(const std::string& data, const }(); ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); pos += 512; seq_no++; @@ -2353,7 +2360,7 @@ void ChargePoint::handle_certificate_signed_req(Call c } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (result != ocpp::InstallCertificateResult::Accepted) { this->security_event_notification_req("InvalidChargingStationCertificate", @@ -2492,7 +2499,7 @@ void ChargePoint::handle_set_variables_req(Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); // post handling of changed variables after the SetVariables.conf has been queued this->handle_variables_changed(set_variables_response); @@ -2511,7 +2518,7 @@ void ChargePoint::handle_get_variables_req(const EnhancedMessage max_variables_per_message) { // send a CALLERROR const auto call_error = CallError(call.uniqueId, "OccurenceConstraintViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -2519,7 +2526,7 @@ void ChargePoint::handle_get_variables_req(const EnhancedMessage max_bytes_per_message) { // send a CALLERROR const auto call_error = CallError(call.uniqueId, "FormatViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -2527,7 +2534,7 @@ void ChargePoint::handle_get_variables_req(const EnhancedMessageget_variables(msg.getVariableData); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_get_base_report_req(Call call) { @@ -2536,7 +2543,7 @@ void ChargePoint::handle_get_base_report_req(Call call) { response.status = GenericDeviceModelStatusEnum::Accepted; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == GenericDeviceModelStatusEnum::Accepted) { const auto report_data = this->device_model->get_base_report_data(msg.reportBase); @@ -2559,7 +2566,7 @@ void ChargePoint::handle_get_report_req(const EnhancedMessage if (msg.componentVariable.has_value() and msg.componentVariable->size() > max_items_per_message) { // send a CALLERROR const auto call_error = CallError(call.uniqueId, "OccurenceConstraintViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -2567,7 +2574,7 @@ void ChargePoint::handle_get_report_req(const EnhancedMessage if (message.message_size > max_bytes_per_message) { // send a CALLERROR const auto call_error = CallError(call.uniqueId, "FormatViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -2599,7 +2606,7 @@ void ChargePoint::handle_get_report_req(const EnhancedMessage } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == GenericDeviceModelStatusEnum::Accepted) { this->notify_report_req(msg.requestId, report_data); @@ -2615,7 +2622,7 @@ void ChargePoint::handle_set_network_profile_req(Call EVLOG_warning << "No callback registered to validate network profile"; response.status = SetNetworkProfileStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -2624,7 +2631,7 @@ void ChargePoint::handle_set_network_profile_req(Call EVLOG_warning << "CSMS attempted to set a network profile with a lower securityProfile"; response.status = SetNetworkProfileStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -2633,7 +2640,7 @@ void ChargePoint::handle_set_network_profile_req(Call EVLOG_warning << "CSMS attempted to set a network profile that could not be validated."; response.status = SetNetworkProfileStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -2664,7 +2671,7 @@ void ChargePoint::handle_set_network_profile_req(Call EVLOG_warning << "CSMS attempted to set a network profile that could not be written to the device model"; response.status = SetNetworkProfileStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -2678,7 +2685,7 @@ void ChargePoint::handle_set_network_profile_req(Call response.status = SetNetworkProfileStatusEnum::Accepted; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_reset_req(Call call) { @@ -2747,7 +2754,7 @@ void ChargePoint::handle_reset_req(Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); // Reset response is sent, now set evse connectors to unavailable and / or // stop transaction (depending on reset type) @@ -2783,13 +2790,13 @@ void ChargePoint::handle_clear_cache_req(Call call) { } catch (DatabaseException& e) { auto call_error = CallError(call.uniqueId, "InternalError", "Database error while clearing authorization cache", json({}, true)); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_transaction_event_response(const EnhancedMessage& message) { @@ -2884,7 +2891,7 @@ void ChargePoint::handle_get_transaction_status(const Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_unlock_connector(Call call) { @@ -2904,7 +2911,7 @@ void ChargePoint::handle_unlock_connector(Call call) { } ocpp::CallResult call_result(unlock_response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_trigger_message(Call call) { @@ -3006,7 +3013,7 @@ void ChargePoint::handle_trigger_message(Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status != TriggerMessageStatusEnum::Accepted) { return; @@ -3084,7 +3091,7 @@ void ChargePoint::handle_trigger_message(Call call) { } ocpp::Call call(request, this->message_queue->createMessageId()); - this->send(call, true); + this->message_dispatcher->dispatch_call(call, true); } break; case MessageTriggerEnum::FirmwareStatusNotification: { @@ -3103,7 +3110,7 @@ void ChargePoint::handle_trigger_message(Call call) { } ocpp::Call call(request, this->message_queue->createMessageId()); - this->send(call, true); + this->message_dispatcher->dispatch_call(call, true); } break; case MessageTriggerEnum::SignChargingStationCertificate: { @@ -3196,7 +3203,7 @@ void ChargePoint::handle_remote_start_transaction_request(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_remote_stop_transaction_request(Call call) { @@ -3218,7 +3225,7 @@ void ChargePoint::handle_remote_stop_transaction_request(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_change_availability_req(Call call) { @@ -3231,7 +3238,7 @@ void ChargePoint::handle_change_availability_req(Call EVLOG_warning << "CSMS requested ChangeAvailability for invalid evse id or connector id"; response.status = ChangeAvailabilityStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -3263,7 +3270,7 @@ void ChargePoint::handle_change_availability_req(Call // Respond to the CSMS before performing any changes to avoid StatusNotification.req being sent before // the ChangeAvailabilityResponse. ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (!transaction_active) { // No transactions - execute the change now @@ -3311,7 +3318,7 @@ void ChargePoint::handle_costupdated_req(const Call call) { ocpp::CallResult call_result(response, call.uniqueId); if (!is_cost_enabled() or !this->callbacks.set_running_cost_callback.has_value()) { - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -3355,7 +3362,7 @@ void ChargePoint::handle_costupdated_req(const Call call) { this->device_model->get_value(ControllerComponentVariables::TariffCostCtrlrCurrency); this->callbacks.set_running_cost_callback.value()(running_cost, decimals, currency); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); // In OCPP 2.0.1, the chargepoint status trigger is not used. if (!triggers.at_energy_kwh.has_value() and !triggers.at_power_kw.has_value() and !triggers.at_time.has_value()) { @@ -3395,7 +3402,7 @@ void ChargePoint::handle_set_charging_profile_req(Callsend(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -3409,7 +3416,7 @@ void ChargePoint::handle_set_charging_profile_req(CalladditionalInfo->get(); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -3424,7 +3431,7 @@ void ChargePoint::handle_set_charging_profile_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_clear_charging_profile_req(Call call) { @@ -3448,7 +3455,7 @@ void ChargePoint::handle_clear_charging_profile_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_get_charging_profiles_req(Call call) { @@ -3462,7 +3469,7 @@ void ChargePoint::handle_get_charging_profiles_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == GetChargingProfileStatusEnum::NoProfiles) { return; @@ -3517,7 +3524,7 @@ void ChargePoint::handle_get_composite_schedule_req(Callget_composite_schedule_internal(call.msg); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_firmware_update_req(Call call) { @@ -3547,7 +3554,7 @@ void ChargePoint::handle_firmware_update_req(Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if ((response.status == UpdateFirmwareStatusEnum::InvalidCertificate) or (response.status == UpdateFirmwareStatusEnum::RevokedCertificate)) { @@ -3595,7 +3602,7 @@ void ChargePoint::handle_get_installed_certificate_ids_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_install_certificate_req(Call call) { @@ -3617,7 +3624,7 @@ void ChargePoint::handle_install_certificate_req(Call } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_delete_certificate_req(Call call) { @@ -3640,14 +3647,14 @@ void ChargePoint::handle_delete_certificate_req(Call c } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_get_log_req(Call call) { const GetLogResponse response = this->callbacks.get_log_request_callback(call.msg); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_customer_information_req(Call call) { @@ -3667,7 +3674,7 @@ void ChargePoint::handle_customer_information_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == CustomerInformationStatusEnum::Accepted) { std::string data = ""; @@ -3719,7 +3726,7 @@ void ChargePoint::handle_set_monitoring_base_req(Call } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_set_monitoring_level_req(Call call) { @@ -3743,7 +3750,7 @@ void ChargePoint::handle_set_monitoring_level_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_set_variable_monitoring_req(const EnhancedMessage& message) { @@ -3759,13 +3766,13 @@ void ChargePoint::handle_set_variable_monitoring_req(const EnhancedMessage max_items_per_message) { const auto call_error = CallError(call.uniqueId, "OccurenceConstraintViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } if (message.message_size > max_bytes_message) { const auto call_error = CallError(call.uniqueId, "FormatViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -3776,7 +3783,7 @@ void ChargePoint::handle_set_variable_monitoring_req(const EnhancedMessage call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::notify_monitoring_report_req(const int request_id, @@ -3792,7 +3799,7 @@ void ChargePoint::notify_monitoring_report_req(const int request_id, req.tbc = false; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); } else { // Split for larger message sizes int32_t sequence_num = 0; @@ -3818,7 +3825,7 @@ void ChargePoint::notify_monitoring_report_req(const int request_id, req.monitor = sub_data; ocpp::Call call(req, this->message_queue->createMessageId()); - this->send(call); + this->message_dispatcher->dispatch_call(call); sequence_num++; } @@ -3836,7 +3843,7 @@ void ChargePoint::handle_get_monitoring_report_req(Call max_variable_components_per_message) { const auto call_error = CallError(call.uniqueId, "OccurenceConstraintViolation", "", json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } @@ -3857,7 +3864,7 @@ void ChargePoint::handle_get_monitoring_report_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); if (response.status == GenericDeviceModelStatusEnum::Accepted) { // Send the result with splits if required @@ -3876,7 +3883,7 @@ void ChargePoint::handle_clear_variable_monitoring_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_get_display_message(const Call call) { @@ -3884,7 +3891,7 @@ void ChargePoint::handle_get_display_message(const Callcallbacks.get_display_message_callback.has_value()) { response.status = GetDisplayMessagesStatusEnum::Unknown; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -3909,18 +3916,18 @@ void ChargePoint::handle_get_display_message(const Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } else { response.status = GetDisplayMessagesStatusEnum::Accepted; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } // Send display messages. The response is empty, so we don't have to get that back. // Sending multiple messages is not supported for now, because there is no need to split them up (yet). ocpp::Call request(messages_request, this->message_queue->createMessageId()); - this->send(request); + this->message_dispatcher->dispatch_call(request); } void ChargePoint::handle_set_display_message(const Call call) { @@ -3928,7 +3935,7 @@ void ChargePoint::handle_set_display_message(const Callcallbacks.set_display_message_callback.has_value()) { response.status = DisplayMessageStatusEnum::Rejected; ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } @@ -3992,14 +3999,14 @@ void ChargePoint::handle_set_display_message(const Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); return; } const DisplayMessage message = message_info_to_display_message(call.msg.message); response = this->callbacks.set_display_message_callback.value()({message}); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_clear_display_message(const Call call) { @@ -4008,12 +4015,12 @@ void ChargePoint::handle_clear_display_message(const Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } response = this->callbacks.clear_display_message_callback.value()(call.msg); ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_data_transfer_req(Call call) { @@ -4028,51 +4035,7 @@ void ChargePoint::handle_data_transfer_req(Call call) { } ocpp::CallResult call_result(response, call.uniqueId); - this->send(call_result); -} - -template bool ChargePoint::send(ocpp::Call call, const bool initiated_by_trigger_message) { - const auto message_type = conversions::string_to_messagetype(json(call).at(CALL_ACTION)); - const auto message_transmission_priority = get_message_transmission_priority( - is_boot_notification_message(message_type), initiated_by_trigger_message, - (this->registration_status == RegistrationStatusEnum::Accepted), is_transaction_message(message_type), - this->device_model->get_optional_value(ControllerComponentVariables::QueueAllMessages).value_or(false)); - switch (message_transmission_priority) { - case MessageTransmissionPriority::SendImmediately: - this->message_queue->push(call); - return true; - case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: - this->message_queue->push(call, true); - return true; - case MessageTransmissionPriority::Discard: - return false; - } - throw std::runtime_error("Missing handling for MessageTransmissionPriority"); -} - -template std::future> ChargePoint::send_async(ocpp::Call call) { - const auto message_type = conversions::string_to_messagetype(json(call).at(CALL_ACTION)); - const auto message_transmission_priority = get_message_transmission_priority( - is_boot_notification_message(message_type), false, - (this->registration_status == RegistrationStatusEnum::Accepted), is_transaction_message(message_type), - this->device_model->get_optional_value(ControllerComponentVariables::QueueAllMessages).value_or(false)); - switch (message_transmission_priority) { - case MessageTransmissionPriority::SendImmediately: - return this->message_queue->push_async(call); - case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: - case MessageTransmissionPriority::Discard: - auto promise = std::promise>(); - auto enhanced_message = EnhancedMessage(); - enhanced_message.offline = true; - promise.set_value(enhanced_message); - return promise.get_future(); - } - throw std::runtime_error("Missing handling for MessageTransmissionPriority"); -} - -template bool ChargePoint::send(ocpp::CallResult call_result) { - this->message_queue->push(call_result); - return true; + this->message_dispatcher->dispatch_call_result(call_result); } std::optional ChargePoint::data_transfer_req(const CiString<255>& vendorId, @@ -4091,7 +4054,7 @@ std::optional ChargePoint::data_transfer_req(const DataTra response.status = DataTransferStatusEnum::Rejected; ocpp::Call call(request, this->message_queue->createMessageId()); - auto data_transfer_future = this->send_async(call); + auto data_transfer_future = this->message_dispatcher->dispatch_call_async(call); if (this->connectivity_manager == nullptr or !this->connectivity_manager->is_websocket_connected()) { return std::nullopt; @@ -4110,7 +4073,7 @@ std::optional ChargePoint::data_transfer_req(const DataTra } catch (const EnumConversionException& e) { EVLOG_error << "EnumConversionException during handling of message: " << e.what(); auto call_error = CallError(enhanced_message.uniqueId, "FormationViolation", e.what(), json({})); - this->send(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return std::nullopt; } } @@ -4157,7 +4120,7 @@ void ChargePoint::handle_send_local_authorization_list_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::handle_get_local_authorization_list_version_req(Call call) { @@ -4170,7 +4133,7 @@ void ChargePoint::handle_get_local_authorization_list_version_req(Callsend(call_error); + this->message_dispatcher->dispatch_call_error(call_error); return; } } else { @@ -4178,7 +4141,7 @@ void ChargePoint::handle_get_local_authorization_list_version_req(Call call_result(response, call.uniqueId); - this->send(call_result); + this->message_dispatcher->dispatch_call_result(call_result); } void ChargePoint::scheduled_check_client_certificate_expiration() { diff --git a/lib/ocpp/v201/message_dispatcher.cpp b/lib/ocpp/v201/message_dispatcher.cpp new file mode 100644 index 000000000..1386d1752 --- /dev/null +++ b/lib/ocpp/v201/message_dispatcher.cpp @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Pionix GmbH and Contributors to EVerest + +#include +#include + +namespace ocpp { +namespace v201 { + +void MessageDispatcher::dispatch_call(const json& call, bool triggered) { + const auto message_type = conversions::string_to_messagetype(call.at(CALL_ACTION)); + const auto message_transmission_priority = get_message_transmission_priority( + is_boot_notification_message(message_type), triggered, + (this->registration_status == RegistrationStatusEnum::Accepted), is_transaction_message(message_type), + this->device_model.get_optional_value(ControllerComponentVariables::QueueAllMessages).value_or(false)); + switch (message_transmission_priority) { + case MessageTransmissionPriority::SendImmediately: + this->message_queue.push_call(call); + return; + case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: + this->message_queue.push_call(call, true); + return; + case MessageTransmissionPriority::Discard: + return; + } + throw std::runtime_error("Missing handling for MessageTransmissionPriority"); +} + +std::future> MessageDispatcher::dispatch_call_async(const json& call, + bool triggered) { + const auto message_type = conversions::string_to_messagetype(call.at(CALL_ACTION)); + const auto message_transmission_priority = get_message_transmission_priority( + is_boot_notification_message(message_type), false, + (this->registration_status == RegistrationStatusEnum::Accepted), is_transaction_message(message_type), + this->device_model.get_optional_value(ControllerComponentVariables::QueueAllMessages).value_or(false)); + switch (message_transmission_priority) { + case MessageTransmissionPriority::SendImmediately: + return this->message_queue.push_call_async(call); + case MessageTransmissionPriority::SendAfterRegistrationStatusAccepted: + case MessageTransmissionPriority::Discard: + auto promise = std::promise>(); + auto enhanced_message = EnhancedMessage(); + enhanced_message.offline = true; + promise.set_value(enhanced_message); + return promise.get_future(); + } + throw std::runtime_error("Missing handling for MessageTransmissionPriority"); +} + +void MessageDispatcher::dispatch_call_result(const json& call_result) { + this->message_queue.push_call_result(call_result); +} + +void MessageDispatcher::dispatch_call_error(const json& call_error) { + this->message_queue.push_call_error(call_error); +} + +} // namespace v201 +} // namespace ocpp \ No newline at end of file diff --git a/tests/lib/ocpp/common/test_message_queue.cpp b/tests/lib/ocpp/common/test_message_queue.cpp index dbf7f5f21..67e00131f 100644 --- a/tests/lib/ocpp/common/test_message_queue.cpp +++ b/tests/lib/ocpp/common/test_message_queue.cpp @@ -202,7 +202,7 @@ class MessageQueueTest : public ::testing::Test { call.msg.type = message_type; call.msg.data = identifier; call.uniqueId = identifier; - message_queue->push(call); + message_queue->push_call(call); return identifier; } @@ -246,7 +246,7 @@ TEST_F(MessageQueueTest, test_transactional_message_is_sent) { call.msg.type = TestMessageType::TRANSACTIONAL; call.msg.data = "test_data"; call.uniqueId = "0"; - message_queue->push(call); + message_queue->push_call(call); wait_for_calls(); } @@ -261,7 +261,7 @@ TEST_F(MessageQueueTest, test_non_transactional_message_is_sent) { call.msg.type = TestMessageType::NON_TRANSACTIONAL; call.msg.data = "test_data"; call.uniqueId = "0"; - message_queue->push(call); + message_queue->push_call(call); wait_for_calls(); } diff --git a/tests/lib/ocpp/v201/mocks/device_model_storage_interface_mock.hpp b/tests/lib/ocpp/v201/mocks/device_model_storage_interface_mock.hpp index 9dcf872bd..7589dce78 100644 --- a/tests/lib/ocpp/v201/mocks/device_model_storage_interface_mock.hpp +++ b/tests/lib/ocpp/v201/mocks/device_model_storage_interface_mock.hpp @@ -24,5 +24,7 @@ class DeviceModelStorageMock : public DeviceModelStorageInterface { MOCK_METHOD(ClearMonitoringStatusEnum, clear_variable_monitor, (int, bool)); MOCK_METHOD(int32_t, clear_custom_variable_monitors, ()); MOCK_METHOD(void, check_integrity, ()); + MOCK_METHOD(bool, update_monitoring_reference, (int32_t monitor_id, const std::string& reference_value), + (override)); }; } // namespace ocpp::v201 diff --git a/tests/lib/ocpp/v201/mocks/message_dispatcher_mock.hpp b/tests/lib/ocpp/v201/mocks/message_dispatcher_mock.hpp new file mode 100644 index 000000000..73155d41c --- /dev/null +++ b/tests/lib/ocpp/v201/mocks/message_dispatcher_mock.hpp @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 - 2024 Pionix GmbH and Contributors to EVerest + +#pragma once + +#include "gmock/gmock.h" + +#include + +using namespace ocpp::v201; + +class MockMessageDispatcher : public ocpp::MessageDispatcherInterface { +public: + MOCK_METHOD(void, dispatch_call, (const json& call, bool triggered), (override)); + MOCK_METHOD(std::future>, dispatch_call_async, + (const json& call, bool triggered), (override)); + MOCK_METHOD(void, dispatch_call_result, (const json& call_result), (override)); + MOCK_METHOD(void, dispatch_call_error, (const json& call_error), (override)); +}; \ No newline at end of file