Skip to content

Commit

Permalink
Large refactor of message handling:
Browse files Browse the repository at this point in the history
* Added new class MessageDispatcherInterface that can dispatch OCPP Call, CallResult, CallError
* MessageDispatcherInterface became member of ChargePoint classes. The templated methods for sending/dispatching Call, CallResult, CallError have been moved from ChargePoint classes to MessageDispatcher

Signed-off-by: Piet Gömpel <[email protected]>
  • Loading branch information
Pietfried committed Nov 14, 2024
1 parent b229082 commit 3a07d82
Show file tree
Hide file tree
Showing 17 changed files with 497 additions and 303 deletions.
Binary file added doc/message_dispatching.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
51 changes: 51 additions & 0 deletions doc/message_dispatching.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

@startuml

interface MessageDispatcherInterface {
+dispatch_call(const json& call, bool triggered = false)
+dispatch_call_async(const json& call, bool triggered = false): std::future<EnhancedMessage<T>>
+dispatch_call_result(const json& call_result)
+dispatch_call_error(const json& call_error)
}

class v16::MessageDispatcher {
- MessageQueue& message_queue
- ChargePointConfiguration& configuration
- RegistrationStatus& registration_status
}

class v201::MessageDispatcher {
- MessageQueue& message_queue
- DeviceModel& device_model
- ConnectivityManager& connectivity_manager
- RegistrationStatusEnum& registration_status
}

interface v201::DataTransferInterface {
+data_transfer_req(request: DataTransferRequest): std::optional<DataTransferResponse>
+handle_data_transfer_req(call: Call<DataTransferRequest>)
}

class v201::DataTransfer {
-MessageDispatcherInterface &message_dispatcher
-std::optional<std::function<DataTransferResponse(DataTransferRequest)>> data_transfer_callback
}

class v201::ChargePoint {
std::unique_ptr<MessageDispatcherInterface> message_dispatcher;
std::unique_ptr<v201::DataTransferInterface> data_transfer;
}

class v16::ChargePoint {
std::unique_ptr<MessageDispatcherInterface> message_dispatcher;
}

MessageDispatcherInterface <|-- v16::MessageDispatcher
MessageDispatcherInterface <|-- v201::MessageDispatcher
v201::DataTransferInterface <|-- v201::DataTransfer
MessageDispatcherInterface *-- v201::DataTransfer
MessageDispatcherInterface *-- v201::ChargePoint
v201::DataTransferInterface *-- v201::ChargePoint
MessageDispatcherInterface *-- v16::ChargePoint

@enduml
3 changes: 3 additions & 0 deletions include/ocpp/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <chrono>
#include <cstdint>

namespace ocpp {
Expand All @@ -22,4 +23,6 @@ constexpr float NO_LIMIT_SPECIFIED = -1.0;
constexpr std::int32_t NO_START_PERIOD = -1;
constexpr std::int32_t EVSEID_NOT_SET = -1;

constexpr std::chrono::seconds DEFAULT_WAIT_FOR_FUTURE_TIMEOUT = std::chrono::seconds(60);

} // namespace ocpp
39 changes: 39 additions & 0 deletions include/ocpp/common/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_queue.hpp>

namespace ocpp {

/// \brief Interface for dispatching OCPP messages that shall be send over the websocket. This interface defines
/// dispatching of Call, CallResult and CallError messages.
/// \tparam T Type specifies the OCPP protocol version
template <typename T> class MessageDispatcherInterface {

public:
virtual ~MessageDispatcherInterface(){};

/// \brief Dispatches a Call message.
/// \param call the OCPP Call message.
/// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false.
virtual void dispatch_call(const json& call, bool triggered = false) = 0;

/// \brief Dispatches a Call message asynchronously.
/// \param call the OCPP Call message.
/// \param triggered indicates if the call was triggered by a TriggerMessage. Default is false.
/// \return std::future<ocpp::EnhancedMessage<T>> Future object containing the enhanced message
/// result of type T.
virtual std::future<ocpp::EnhancedMessage<T>> dispatch_call_async(const json& call, bool triggered = false) = 0;

/// \brief Dispatches a CallResult message.
/// \param call_result the OCPP CallResult message.
virtual void dispatch_call_result(const json& call_result) = 0;

/// \brief Dispatches a CallError message.
/// \param call_result the OCPP CallError message.
virtual void dispatch_call_error(const json& call_error) = 0;
};

} // namespace ocpp
20 changes: 5 additions & 15 deletions include/ocpp/common/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,16 +630,7 @@ template <typename M> class MessageQueue {
}
}

/// \brief pushes a new \p call message onto the message queue
template <class T> void push(Call<T> call, const bool stall_until_accepted = false) {
if (!running) {
return;
}
json call_json = call;
push(call_json, stall_until_accepted);
}

void push(const json& message, const bool stall_until_accepted = false) {
void push_call(const json& message, const bool stall_until_accepted = false) {
if (!running) {
return;
}
Expand All @@ -664,16 +655,15 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_result message over the websocket
template <class T> void push(CallResult<T> call_result) {
void push_call_result(const json& call_result) {
if (!running) {
return;
}

this->send_callback(call_result);
{
std::lock_guard<std::recursive_mutex> lk(this->next_message_mutex);
if (next_message_to_send.has_value()) {
if (next_message_to_send.value() == call_result.uniqueId) {
if (next_message_to_send.value() == call_result.at(MESSAGE_ID)) {
next_message_to_send.reset();
}
}
Expand All @@ -683,7 +673,7 @@ template <typename M> class MessageQueue {
}

/// \brief Sends a new \p call_error message over the websocket
void push(CallError call_error) {
void push_call_error(CallError call_error) {
if (!running) {
return;
}
Expand All @@ -703,7 +693,7 @@ template <typename M> class MessageQueue {

/// \brief pushes a new \p call message onto the message queue
/// \returns a future from which the CallResult can be extracted
template <class T> std::future<EnhancedMessage<M>> push_async(Call<T> call) {
std::future<EnhancedMessage<M>> push_call_async(const json& call) {
auto message = std::make_shared<ControlMessage<M>>(call);

if (!running) {
Expand Down
10 changes: 4 additions & 6 deletions include/ocpp/v16/charge_point_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

#include <ocpp/common/aligned_timer.hpp>
#include <ocpp/common/charging_station_base.hpp>
#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/common/message_queue.hpp>
#include <ocpp/common/schemas.hpp>
#include <ocpp/common/types.hpp>
#include <ocpp/common/websocket/websocket.hpp>
#include <ocpp/v16/charge_point_configuration.hpp>
#include <ocpp/v16/connector.hpp>
#include <ocpp/v16/database_handler.hpp>
#include <ocpp/v16/message_dispatcher.hpp>
#include <ocpp/v16/messages/Authorize.hpp>
#include <ocpp/v16/messages/BootNotification.hpp>
#include <ocpp/v16/messages/CancelReservation.hpp>
Expand Down Expand Up @@ -87,14 +89,15 @@ class ChargePointImpl : ocpp::ChargingStationBase {
BootReasonEnum bootreason;
ChargePointConnectionState connection_state;
bool boot_notification_callerror;
RegistrationStatus registration_status;
std::atomic<RegistrationStatus> registration_status;
DiagnosticsStatus diagnostics_status;
FirmwareStatus firmware_status;
bool firmware_update_is_pending = false;
UploadLogStatusEnumType log_status;
std::string message_log_path;

std::unique_ptr<Websocket> websocket;
std::unique_ptr<ocpp::MessageDispatcherInterface<MessageType>> message_dispatcher;
Everest::SteadyTimer websocket_timer;
std::unique_ptr<MessageQueue<v16::MessageType>> message_queue;
std::map<int32_t, std::shared_ptr<Connector>> connectors;
Expand Down Expand Up @@ -205,11 +208,6 @@ class ChargePointImpl : ocpp::ChargingStationBase {
std::unique_ptr<ocpp::MessageQueue<v16::MessageType>> create_message_queue();
void message_callback(const std::string& message);
void handle_message(const EnhancedMessage<v16::MessageType>& message);
template <class T> bool send(Call<T> call, bool initiated_by_trigger_message = false);
template <class T>
std::future<EnhancedMessage<v16::MessageType>> send_async(Call<T> call, bool initiated_by_trigger_message = false);
template <class T> bool send(CallResult<T> call_result);
bool send(CallError call_error);
void heartbeat(bool initiated_by_trigger_message = false);
void boot_notification(bool initiated_by_trigger_message = false);
void clock_aligned_meter_values_sample();
Expand Down
30 changes: 30 additions & 0 deletions include/ocpp/v16/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/v16/charge_point_configuration.hpp>

namespace ocpp {
namespace v16 {

class MessageDispatcher : public MessageDispatcherInterface<MessageType> {

public:
MessageDispatcher(ocpp::MessageQueue<MessageType>& message_queue, ChargePointConfiguration& configuration,
std::atomic<RegistrationStatus>& registration_status) :
message_queue(message_queue), configuration(configuration), registration_status(registration_status){};
void dispatch_call(const json& call, bool triggered = false) override;
std::future<ocpp::EnhancedMessage<MessageType>> dispatch_call_async(const json& call, bool triggered) override;
void dispatch_call_result(const json& call_result) override;
void dispatch_call_error(const json& call_error) override;

private:
ocpp::MessageQueue<MessageType>& message_queue;
ChargePointConfiguration& configuration;
std::atomic<RegistrationStatus>& registration_status;
};

} // namespace v16
} // namespace ocpp
17 changes: 6 additions & 11 deletions include/ocpp/v201/charge_point.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <memory>
#include <set>

#include <ocpp/common/message_dispatcher.hpp>

#include <ocpp/common/charging_station_base.hpp>

#include <ocpp/v201/average_meter_values.hpp>
Expand Down Expand Up @@ -373,6 +375,8 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::unique_ptr<EvseManager> evse_manager;
std::unique_ptr<ConnectivityManager> connectivity_manager;

std::unique_ptr<MessageDispatcherInterface<MessageType>> message_dispatcher;

// utility
std::shared_ptr<MessageQueue<v201::MessageType>> message_queue;
std::shared_ptr<DatabaseHandler> database_handler;
Expand Down Expand Up @@ -401,7 +405,7 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
std::atomic_bool stop_auth_cache_cleanup_handler;

// states
RegistrationStatusEnum registration_status;
std::atomic<RegistrationStatusEnum> registration_status;
FirmwareStatusEnum firmware_status;
// The request ID in the last firmware update status received
std::optional<int32_t> firmware_status_id;
Expand Down Expand Up @@ -453,8 +457,6 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
/// \brief optional delay to resumption of message queue after reconnecting to the CSMS
std::chrono::seconds message_queue_resume_delay = std::chrono::seconds(0);

bool send(CallError call_error);

// internal helper functions
void initialize(const std::map<int32_t, int32_t>& evse_connector_structure, const std::string& message_log_path);
void init_certificate_expiration_check_timers();
Expand Down Expand Up @@ -748,20 +750,13 @@ class ChargePoint : public ChargePointInterface, private ocpp::ChargingStationBa
// Functional Block P: DataTransfer
void handle_data_transfer_req(Call<DataTransferRequest> call);

// general message handling
template <class T> bool send(ocpp::Call<T> call, const bool initiated_by_trigger_message = false);

template <class T> std::future<EnhancedMessage<v201::MessageType>> send_async(ocpp::Call<T> call);

template <class T> bool send(ocpp::CallResult<T> call_result);

// Generates async sending callbacks
template <class RequestType, class ResponseType>
std::function<ResponseType(RequestType)> send_callback(MessageType expected_response_message_type) {
return [this, expected_response_message_type](auto request) {
MessageId message_id = MessageId(to_string(this->uuid_generator()));
const auto enhanced_response =
this->send_async<RequestType>(ocpp::Call<RequestType>(request, message_id)).get();
this->message_dispatcher->dispatch_call_async(ocpp::Call<RequestType>(request, message_id)).get();
if (enhanced_response.messageType != expected_response_message_type) {
throw UnexpectedMessageTypeFromCSMS(
std::string("Got unexpected message type from CSMS, expected: ") +
Expand Down
31 changes: 31 additions & 0 deletions include/ocpp/v201/message_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Pionix GmbH and Contributors to EVerest

#pragma once

#include <ocpp/common/message_dispatcher.hpp>
#include <ocpp/v201/connectivity_manager.hpp>
#include <ocpp/v201/device_model.hpp>

namespace ocpp {
namespace v201 {

class MessageDispatcher : public MessageDispatcherInterface<MessageType> {

public:
MessageDispatcher(ocpp::MessageQueue<MessageType>& message_queue, DeviceModel& device_model,
std::atomic<RegistrationStatusEnum>& registration_status) :
message_queue(message_queue), device_model(device_model), registration_status(registration_status){};
void dispatch_call(const json& call, bool triggered = false) override;
std::future<ocpp::EnhancedMessage<MessageType>> dispatch_call_async(const json& call, bool triggered) override;
void dispatch_call_result(const json& call_result) override;
void dispatch_call_error(const json& call_error) override;

private:
ocpp::MessageQueue<MessageType>& message_queue;
DeviceModel& device_model;
std::atomic<RegistrationStatusEnum>& registration_status;
};

} // namespace v201
} // namespace ocpp
2 changes: 2 additions & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ if(LIBOCPP_ENABLE_V16)
ocpp/v16/charge_point.cpp
ocpp/v16/database_handler.cpp
ocpp/v16/charge_point_impl.cpp
ocpp/v16/message_dispatcher.cpp
ocpp/v16/smart_charging.cpp
ocpp/v16/charge_point_configuration.cpp
ocpp/v16/charge_point_state_machine.cpp
Expand Down Expand Up @@ -80,6 +81,7 @@ if(LIBOCPP_ENABLE_V201)
ocpp/v201/utils.cpp
ocpp/v201/component_state_manager.cpp
ocpp/v201/connectivity_manager.cpp
ocpp/v201/message_dispatcher.cpp
)
add_subdirectory(ocpp/v201/messages)
endif()
Expand Down
Loading

0 comments on commit 3a07d82

Please sign in to comment.