From 230ae1a9da1af049978727a6a44d98a48d3d8168 Mon Sep 17 00:00:00 2001 From: John Date: Fri, 13 Dec 2024 11:30:34 +0200 Subject: [PATCH] Bugfix/libwebsocket dtor deadlock (#896) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added thread-safe queue for messages, removed extra mutexes * Updated websocket connect function name to better clarify what it does * Code reorder for less failure points on separate threads * Clarified thread responsibility, internal mutex lock scope clarification * Updated header docs * Moved the safe queue to it's own header --------- Signed-off-by: AssemblyJohn Signed-off-by: Piet Gömpel Co-authored-by: Piet Gömpel --- CMakeLists.txt | 2 +- include/ocpp/common/safe_queue.hpp | 127 ++ include/ocpp/common/websocket/websocket.hpp | 12 +- .../ocpp/common/websocket/websocket_base.hpp | 11 +- .../websocket/websocket_libwebsockets.hpp | 76 +- include/ocpp/v201/connectivity_manager.hpp | 5 +- lib/ocpp/common/websocket/websocket.cpp | 13 +- lib/ocpp/common/websocket/websocket_base.cpp | 9 +- .../websocket/websocket_libwebsockets.cpp | 1303 +++++++++-------- lib/ocpp/v16/charge_point_impl.cpp | 10 +- lib/ocpp/v201/connectivity_manager.cpp | 8 +- 11 files changed, 897 insertions(+), 679 deletions(-) create mode 100644 include/ocpp/common/safe_queue.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 684ba2c24..952b162e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.14) project(ocpp - VERSION 0.21.0 + VERSION 0.22.0 DESCRIPTION "A C++ implementation of the Open Charge Point Protocol" LANGUAGES CXX ) diff --git a/include/ocpp/common/safe_queue.hpp b/include/ocpp/common/safe_queue.hpp new file mode 100644 index 000000000..933bdcf8b --- /dev/null +++ b/include/ocpp/common/safe_queue.hpp @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 - 2024 Pionix GmbH and Contributors to EVerest + +#pragma once + +#include +#include +#include + +namespace ocpp { + +/// \brief Thread safe message queue. Holds a conditional variable +/// that can be waited upon. Will take up the waiting thread on each +/// operation of push/pop/clear +template class SafeQueue { + using safe_queue_reference = typename std::queue::reference; + using safe_queue_const_reference = typename std::queue::const_reference; + +public: + /// \return True if the queue is empty + inline bool empty() const { + std::lock_guard lock(mutex); + return queue.empty(); + } + + inline safe_queue_reference front() { + std::lock_guard lock(mutex); + return queue.front(); + } + + inline safe_queue_const_reference front() const { + std::lock_guard lock(mutex); + return queue.front(); + } + + /// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty + inline T pop() { + std::unique_lock lock(mutex); + + T front = std::move(queue.front()); + queue.pop(); + + // Unlock here and notify + lock.unlock(); + + notify_waiting_thread(); + + return front; + } + + /// \brief Queues an element and notifies any threads waiting on the internal conditional variable + inline void push(T&& value) { + { + std::lock_guard lock(mutex); + queue.push(value); + } + + notify_waiting_thread(); + } + + /// \brief Queues an element and notifies any threads waiting on the internal conditional variable + inline void push(const T& value) { + { + std::lock_guard lock(mutex); + queue.push(value); + } + + notify_waiting_thread(); + } + + /// \brief Clears the queue + inline void clear() { + { + std::lock_guard lock(mutex); + + std::queue empty; + empty.swap(queue); + } + + notify_waiting_thread(); + } + + /// \brief Waits for the queue to receive an element + /// \param timeout to wait for an element, pass in a value <= 0 to wait indefinitely + inline void wait_on_queue_element(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) { + wait_on_queue_element_or_predicate([]() { return false; }, timeout); + } + + /// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon + template + inline void wait_on_queue_element_or_predicate(Predicate pred, + std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) { + std::unique_lock lock(mutex); + + if (timeout.count() > 0) { + cv.wait_for(lock, timeout, [&]() { return (false == queue.empty()) or pred(); }); + } else { + cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); }); + } + } + + /// \brief Waits on the queue for a custom event + /// \param timeout to wait for an element, pass in a value <= 0 to wait indefinitely + template + inline void wait_on_custom_event(Predicate pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) { + std::unique_lock lock(mutex); + + if (timeout.count() > 0) { + cv.wait_for(lock, timeout, [&]() { return pred(); }); + } else { + cv.wait(lock, [&]() { return pred(); }); + } + } + + /// \brief Notifies a single waiting thread to wake up + inline void notify_waiting_thread() { + cv.notify_one(); + } + +private: + std::queue queue; + + mutable std::mutex mutex; + std::condition_variable cv; +}; + +} // namespace ocpp \ No newline at end of file diff --git a/include/ocpp/common/websocket/websocket.hpp b/include/ocpp/common/websocket/websocket.hpp index 27717ef0a..4fd2ed9f0 100644 --- a/include/ocpp/common/websocket/websocket.hpp +++ b/include/ocpp/common/websocket/websocket.hpp @@ -18,7 +18,7 @@ class Websocket { std::unique_ptr websocket; std::function connected_callback; std::function disconnected_callback; - std::function closed_callback; + std::function stopped_connecting_callback; std::function message_callback; std::shared_ptr logging; @@ -28,8 +28,10 @@ class Websocket { std::shared_ptr evse_security, std::shared_ptr logging); ~Websocket(); - /// \brief connect to a websocket (TLS or non-TLS depending on the central system uri in the configuration). - bool connect(); + /// \brief Starts the connection attempts. It will init the websocket processing thread + /// \returns true if the websocket is successfully initialized, false otherwise. Does + /// not wait for a successful connection + bool start_connecting(); void set_connection_options(const WebsocketConnectionOptions& connection_options); @@ -48,9 +50,9 @@ class Websocket { /// \brief register a \p callback that is called when the websocket connection is disconnected void register_disconnected_callback(const std::function& callback); - /// \brief register a \p callback that is called when the websocket connection has been closed and will not attempt + /// \brief register a \p callback that is called when the websocket connection has been stopped and will not attempt /// to reconnect - void register_closed_callback(const std::function& callback); + void register_stopped_connecting_callback(const std::function& callback); /// \brief register a \p callback that is called when the websocket receives a message void register_message_callback(const std::function& callback); diff --git a/include/ocpp/common/websocket/websocket_base.hpp b/include/ocpp/common/websocket/websocket_base.hpp index 6c63ee530..57a0a8eaf 100644 --- a/include/ocpp/common/websocket/websocket_base.hpp +++ b/include/ocpp/common/websocket/websocket_base.hpp @@ -49,7 +49,7 @@ class WebsocketBase { WebsocketConnectionOptions connection_options; std::function connected_callback; std::function disconnected_callback; - std::function closed_callback; + std::function stopped_connecting_callback; std::function message_callback; std::function connection_failed_callback; std::shared_ptr reconnect_timer; @@ -90,9 +90,10 @@ class WebsocketBase { explicit WebsocketBase(); virtual ~WebsocketBase(); - /// \brief connect to a websocket - /// \returns true if the websocket is initialized and a connection attempt is made - virtual bool connect() = 0; + /// \brief Starts the connection attempts. It will init the websocket processing thread + /// \returns true if the websocket is successfully initialized, false otherwise. Does + /// not wait for a successful connection + virtual bool start_connecting() = 0; /// \brief sets this connection_options to the given \p connection_options and resets the connection_attempts virtual void set_connection_options(const WebsocketConnectionOptions& connection_options) = 0; @@ -118,7 +119,7 @@ class WebsocketBase { /// \brief register a \p callback that is called when the websocket connection has been closed and will not attempt /// to reconnect - void register_closed_callback(const std::function& callback); + void register_stopped_connecting_callback(const std::function& callback); /// \brief register a \p callback that is called when the websocket receives a message void register_message_callback(const std::function& callback); diff --git a/include/ocpp/common/websocket/websocket_libwebsockets.hpp b/include/ocpp/common/websocket/websocket_libwebsockets.hpp index acc178660..b27ca4f61 100644 --- a/include/ocpp/common/websocket/websocket_libwebsockets.hpp +++ b/include/ocpp/common/websocket/websocket_libwebsockets.hpp @@ -4,13 +4,11 @@ #define OCPP_WEBSOCKET_TLS_TPM_HPP #include +#include #include -#include #include -#include #include -#include #include struct ssl_ctx_st; @@ -31,59 +29,75 @@ class WebsocketLibwebsockets final : public WebsocketBase { void set_connection_options(const WebsocketConnectionOptions& connection_options) override; - /// \brief connect to a TLS websocket - /// \returns true if the websocket is initialized and a connection attempt is made - bool connect() override; + bool start_connecting() override; - /// \brief Reconnects the websocket using the delay, a reason for this reconnect can be provided with the - /// \param reason parameter - /// \param delay delay of the reconnect attempt void reconnect(long delay) override; - /// \brief closes the websocket void close(const WebsocketCloseReason code, const std::string& reason) override; - /// \brief send a \p message over the websocket - /// \returns true if the message was sent successfully bool send(const std::string& message) override; - /// \brief send a websocket ping void ping() override; + /// \brief Indicates if the websocket has a valid connection data and is trying to + /// connect/reconnect internally even if for the moment it might not be connected + /// \return True if the websocket is connected or trying to connect, false otherwise + bool is_trying_to_connect(); + public: int process_callback(void* wsi_ptr, int callback_reason, void* user, void* in, size_t len); private: + bool is_trying_to_connect_internal(); + void close_internal(const WebsocketCloseReason code, const std::string& reason); + +private: + /// \brief Initializes the connection options, including the security info + /// \return True if it was successful, false otherwise + bool initialize_connection_options(std::shared_ptr& new_connection_data); + bool tls_init(struct ssl_ctx_st* ctx, const std::string& path_chain, const std::string& path_key, bool custom_key, std::optional& password); - void client_loop(); - void recv_loop(); + + /// \brief Websocket processing thread loop + void thread_websocket_client_loop(std::shared_ptr local_data); + + /// \brief Function to handle received messages. Required since from the received message + /// callback we also send messages that must block and wait on the client thread + void thread_websocket_message_recv_loop(std::shared_ptr local_data); + + /// \brief Function to handle the deferred callbacks + void thread_deferred_callback_queue(); /// \brief Called when a TLS websocket connection is established, calls the connected callback - void on_conn_connected(); + void on_conn_connected(ConnectionData* conn_data); /// \brief Called when a TLS websocket connection is closed - void on_conn_close(); + void on_conn_close(ConnectionData* conn_data); /// \brief Called when a TLS websocket connection fails to be established - void on_conn_fail(); + void on_conn_fail(ConnectionData* conn_data); /// \brief When the connection can send data - void on_writable(); + void on_conn_writable(); /// \brief Called when a message is received over the TLS websocket, calls the message callback - void on_message(std::string&& message); + void on_conn_message(std::string&& message); + /// \brief Requests a message write, awakes the websocket loop from 'poll' void request_write(); void poll_message(const std::shared_ptr& msg); - /// \brief Function to handle the deferred callbacks - void handle_deferred_callback_queue(); - /// \brief Add a callback to the queue of callbacks to be executed. All will be executed from a single thread void push_deferred_callback(const std::function& callback); + // \brief Safely closes the already running connection threads + void safe_close_threads(); + + /// \brief Clears all messages and message queues both incoming and outgoing + void clear_all_queues(); + private: std::shared_ptr evse_security; @@ -91,24 +105,16 @@ class WebsocketLibwebsockets final : public WebsocketBase { Everest::SteadyTimer reconnect_timer_tpm; std::unique_ptr websocket_thread; std::shared_ptr conn_data; - std::condition_variable conn_cv; - - std::mutex queue_mutex; - std::queue> message_queue; - std::condition_variable msg_send_cv; - std::mutex msg_send_cv_mutex; + // Queue of outgoing messages, notify thread only when we remove messages + SafeQueue> message_queue; std::unique_ptr recv_message_thread; - std::mutex recv_mutex; - std::queue recv_message_queue; - std::condition_variable recv_message_cv; + SafeQueue recv_message_queue; std::string recv_buffered_message; std::unique_ptr deferred_callback_thread; - std::queue> deferred_callback_queue; - std::mutex deferred_callback_mutex; - std::condition_variable deferred_callback_cv; + SafeQueue> deferred_callback_queue; std::atomic_bool stop_deferred_handler; OcppProtocolVersion connected_ocpp_version; diff --git a/include/ocpp/v201/connectivity_manager.hpp b/include/ocpp/v201/connectivity_manager.hpp index 5e0b929c3..064516b35 100644 --- a/include/ocpp/v201/connectivity_manager.hpp +++ b/include/ocpp/v201/connectivity_manager.hpp @@ -168,9 +168,10 @@ class ConnectivityManager { /// void on_websocket_disconnected(); - /// \brief Function invoked when the web socket closes + /// \brief Function invoked when the web socket stops connecting + /// and does not re-attempt to connect again /// - void on_websocket_closed(ocpp::WebsocketCloseReason reason); + void on_websocket_stopped_connecting(ocpp::WebsocketCloseReason reason); /// /// \brief Get the active network configuration slot in use. diff --git a/lib/ocpp/common/websocket/websocket.cpp b/lib/ocpp/common/websocket/websocket.cpp index 4ca4ba562..bd6ccdf6d 100644 --- a/lib/ocpp/common/websocket/websocket.cpp +++ b/lib/ocpp/common/websocket/websocket.cpp @@ -22,9 +22,9 @@ Websocket::Websocket(const WebsocketConnectionOptions& connection_options, std:: Websocket::~Websocket() { } -bool Websocket::connect() { +bool Websocket::start_connecting() { this->logging->sys("Connecting"); - return this->websocket->connect(); + return this->websocket->start_connecting(); } void Websocket::set_connection_options(const WebsocketConnectionOptions& connection_options) { @@ -63,10 +63,11 @@ void Websocket::register_disconnected_callback(const std::function& call }); } -void Websocket::register_closed_callback(const std::function& callback) { - this->closed_callback = callback; - this->websocket->register_closed_callback( - [this](const WebsocketCloseReason reason) { this->closed_callback(reason); }); +void Websocket::register_stopped_connecting_callback( + const std::function& callback) { + this->stopped_connecting_callback = callback; + this->websocket->register_stopped_connecting_callback( + [this](const WebsocketCloseReason reason) { this->stopped_connecting_callback(reason); }); } void Websocket::register_message_callback(const std::function& callback) { diff --git a/lib/ocpp/common/websocket/websocket_base.cpp b/lib/ocpp/common/websocket/websocket_base.cpp index e69f43caa..2cfa23113 100644 --- a/lib/ocpp/common/websocket/websocket_base.cpp +++ b/lib/ocpp/common/websocket/websocket_base.cpp @@ -10,7 +10,7 @@ namespace ocpp { WebsocketBase::WebsocketBase() : m_is_connected(false), connected_callback(nullptr), - closed_callback(nullptr), + stopped_connecting_callback(nullptr), message_callback(nullptr), reconnect_timer(nullptr), connection_attempts(1), @@ -44,8 +44,9 @@ void WebsocketBase::register_disconnected_callback(const std::function& this->disconnected_callback = callback; } -void WebsocketBase::register_closed_callback(const std::function& callback) { - this->closed_callback = callback; +void WebsocketBase::register_stopped_connecting_callback( + const std::function& callback) { + this->stopped_connecting_callback = callback; } void WebsocketBase::register_message_callback(const std::function& callback) { @@ -61,7 +62,7 @@ bool WebsocketBase::initialized() { EVLOG_error << "Not properly initialized: please register connected callback."; return false; } - if (this->closed_callback == nullptr) { + if (this->stopped_connecting_callback == nullptr) { EVLOG_error << "Not properly initialized: please closed_callback."; return false; } diff --git a/lib/ocpp/common/websocket/websocket_libwebsockets.cpp b/lib/ocpp/common/websocket/websocket_libwebsockets.cpp index 63cc04371..a28da5707 100644 --- a/lib/ocpp/common/websocket/websocket_libwebsockets.cpp +++ b/lib/ocpp/common/websocket/websocket_libwebsockets.cpp @@ -8,8 +8,11 @@ #include #include +#include +#include #include #include +#include #include #include @@ -25,6 +28,8 @@ #define SSL_CTX_new_ex(LIB, PROP, METHOD) SSL_CTX_new(METHOD) #endif +using namespace std::chrono_literals; + namespace { std::optional keylog_file; @@ -58,20 +63,24 @@ using evse_security::is_custom_private_key_file; using evse_security::OpenSSLProvider; enum class EConnectionState { - INITIALIZE, ///< Initialization state - CONNECTING, ///< Trying to connect - CONNECTED, ///< Successfully connected - ERROR, ///< We couldn't connect - FINALIZED, ///< We finalized the connection and we're never going to connect again + INITIALIZE, ///< Initialization state + CONNECTING, ///< Trying to connect + RECONNECTING, ///< After the first connect attempt, we'll change the state to reconnecting + CONNECTED, ///< Successfully connected + ERROR, ///< We couldn't connect, but we will try again soon internally + FINALIZED, ///< We finalized the connection and we're never going to connect again }; /// \brief Message to return in the callback to close the socket connection static constexpr int LWS_CLOSE_SOCKET_RESPONSE_MESSAGE = -1; -/// \brief Per thread connection data +/// \brief How much we wait for a message to be sent in seconds +static constexpr int MESSAGE_SEND_TIMEOUT_S = 1; + +/// \brief Current connection data, sets the internal state of the struct ConnectionData { - ConnectionData() : - wsi(nullptr), owner(nullptr), is_running(true), is_marked_close(false), state(EConnectionState::INITIALIZE) { + explicit ConnectionData(WebsocketLibwebsockets* owner) : + wsi(nullptr), owner(owner), is_running(true), is_stopped_run(false), state(EConnectionState::INITIALIZE) { } ~ConnectionData() { @@ -81,80 +90,176 @@ struct ConnectionData { owner = nullptr; } - void bind_thread(std::thread::id id) { - lws_thread_id = id; + void bind_thread_client(std::thread::id id) { + std::lock_guard lock(this->mutex); + websocket_client_thread_id = id; + } + + void bind_thread_message(std::thread::id id) { + std::lock_guard lock(this->mutex); + websocket_recv_thread_id = id; } - std::thread::id get_lws_thread_id() { - return lws_thread_id; + std::thread::id get_client_thread_id() { + std::lock_guard lock(this->mutex); + return websocket_client_thread_id; + } + + std::thread::id get_message_thread_id() { + std::lock_guard lock(this->mutex); + return websocket_recv_thread_id; } void update_state(EConnectionState in_state) { + std::lock_guard lock(this->mutex); state = in_state; } - void do_interrupt() { - is_running = false; + EConnectionState get_state() { + std::lock_guard lock(this->mutex); + return state; + } + + /// \brief Requests an active connection to awake from 'poll' and process again + void request_awake() { + if (std::this_thread::get_id() == this->websocket_client_thread_id) { + EVLOG_AND_THROW(std::runtime_error("Attempted to awake connection from websocket thread!")); + } + + std::lock_guard lock(this->mutex); + + if (lws_ctx) { + lws_cancel_service(lws_ctx.get()); + } } - void request_close() { - is_marked_close = true; + /// \brief Requests the threads that are processing to exit as soon as possible + /// in a ordered manner + void do_interrupt_and_exit() { + if (std::this_thread::get_id() == this->websocket_client_thread_id) { + EVLOG_AND_THROW(std::runtime_error("Attempted to interrupt connection from websocket thread!")); + } + + std::lock_guard lock(this->mutex); + + if (is_running) { + is_running = false; + + // Notify if we are on a different thread + if (lws_ctx) { + // Attempt to revive the running thread + lws_cancel_service(lws_ctx.get()); + } + } } bool is_interupted() { + std::lock_guard lock(this->mutex); return (is_running == false); } - bool is_connecting() { - return (state.load() == EConnectionState::CONNECTING); + void mark_stop_executed() { + std::lock_guard lock(this->mutex); + this->is_stopped_run = true; + } + + bool is_stop_executed() { + std::lock_guard lock(this->mutex); + return this->is_stopped_run; + } + +public: + /// \brief This should be used for a cleanup before calling the + /// init functions because releasing the unique ptrs has + /// as an effect the invocation of 'callback_minimal' during + /// '::lws_context_destroy(ptr);' and that causes a deadlock + void reset_connection_data() { + // Destroy them outside the lock scope + std::unique_ptr clear_sec; + std::unique_ptr clear_lws; + + { + std::lock_guard lock(this->mutex); + this->wsi = nullptr; + + if (this->sec_context) { + this->sec_context.swap(clear_sec); + } + + if (this->lws_ctx) { + this->lws_ctx.swap(clear_lws); + } + } } - bool is_close_requested() { - return is_marked_close; + void init_connection_context(lws_context* lws_ctx, SSL_CTX* ssl_ctx) { + std::lock_guard lock(this->mutex); + + if (this->lws_ctx || this->sec_context) { + EVLOG_AND_THROW(std::runtime_error("Cleanup must be called before re-initing a connection!")); + } + + // Reset the close status + is_stopped_run = false; + + // Causes a deadlock in callback_minimal if not reset + this->lws_ctx = std::unique_ptr(lws_ctx); + + if (ssl_ctx) { + this->sec_context = std::unique_ptr(ssl_ctx); + } } - auto get_state() { - return state.load(); + void init_connection(lws* lws) { + std::lock_guard lock(this->mutex); + this->wsi = lws; } lws* get_conn() { + std::lock_guard lock(this->mutex); return wsi; } - WebsocketLibwebsockets* get_owner() { - return owner.load(); + lws_context* get_context() { + std::lock_guard lock(this->mutex); + return lws_ctx.get(); } - void set_owner(WebsocketLibwebsockets* o) { - owner = o; + // No need for sync here since its set on construction + WebsocketLibwebsockets* get_owner() { + return owner; } -public: - // This public block will only be used from client loop thread, no locking needed +private: // Openssl context, must be destroyed in this order std::unique_ptr sec_context; // libwebsockets state std::unique_ptr lws_ctx; - + // Internal used WSI lws* wsi; + // Owner, set on creation + WebsocketLibwebsockets* owner; -private: - std::atomic owner; + // State variables + bool is_running; + bool is_stopped_run; + EConnectionState state; + + std::mutex mutex; - std::thread::id lws_thread_id; + /// \brief Websocket client thread ID + std::thread::id websocket_client_thread_id; + /// \brief Websocket received message thread ID + std::thread::id websocket_recv_thread_id; - std::atomic_bool is_running; - std::atomic_bool is_marked_close; - std::atomic state; + // Required for access of state + friend class WebsocketLibwebsockets; }; struct WebsocketMessage { WebsocketMessage() : sent_bytes(0), message_sent(false) { } - virtual ~WebsocketMessage() { - } - public: std::string payload; lws_write_protocol protocol; @@ -244,27 +349,28 @@ WebsocketLibwebsockets::WebsocketLibwebsockets(const WebsocketConnectionOptions& } WebsocketLibwebsockets::~WebsocketLibwebsockets() { - std::shared_ptr local_data = conn_data; - if (local_data != nullptr) { - local_data->do_interrupt(); - } - std::lock_guard lock(this->connection_mutex); - if (this->websocket_thread != nullptr && this->websocket_thread->joinable()) { - this->websocket_thread->join(); + std::shared_ptr local_conn_data = conn_data; + if (local_conn_data != nullptr) { + auto tid = std::this_thread::get_id(); + + if (tid == local_conn_data->get_client_thread_id() || tid == local_conn_data->get_message_thread_id()) { + EVLOG_error << "Trying to destruct websocket from utility thread!"; + std::terminate(); + } } - if (this->recv_message_thread != nullptr && this->recv_message_thread->joinable()) { - this->recv_message_thread->join(); + if (this->m_is_connected || is_trying_to_connect_internal()) { + this->close_internal(WebsocketCloseReason::Normal, "websocket destructor"); } + // In the dtor we must make sure the deferred callback thread + // finishes since the callbacks capture a reference to 'this' if (this->deferred_callback_thread != nullptr && this->deferred_callback_thread->joinable()) { - { - std::scoped_lock tmp_lock(this->deferred_callback_mutex); - this->stop_deferred_handler = true; - } - this->deferred_callback_cv.notify_one(); + this->stop_deferred_handler.store(true); + this->deferred_callback_queue.notify_waiting_thread(); + this->deferred_callback_thread->join(); } } @@ -305,9 +411,9 @@ static int callback_minimal(struct lws* wsi, enum lws_callback_reasons reason, v // Get user safely, since on some callbacks (void *user) can be different than what we set if (wsi != nullptr) { if (ConnectionData* data = reinterpret_cast(lws_wsi_user(wsi))) { - auto owner = data->get_owner(); + auto* owner = data->get_owner(); if (owner not_eq nullptr) { - return data->get_owner()->process_callback(wsi, static_cast(reason), user, in, len); + return owner->process_callback(wsi, static_cast(reason), user, in, len); } else { EVLOG_debug << "callback_minimal called, but data->owner is nullptr. Reason: " << reason; } @@ -448,12 +554,9 @@ bool WebsocketLibwebsockets::tls_init(SSL_CTX* ctx, const std::string& path_chai return true; } -void WebsocketLibwebsockets::recv_loop() { - std::shared_ptr local_data = conn_data; - +void WebsocketLibwebsockets::thread_websocket_message_recv_loop(std::shared_ptr local_data) { if (local_data == nullptr) { - EVLOG_error << "Failed recv loop context init!"; - return; + EVLOG_AND_THROW(std::runtime_error("Null 'ConnectionData' in message thread, fatal error!")); } EVLOG_debug << "Init recv loop with ID: " << std::hex << std::this_thread::get_id(); @@ -464,12 +567,10 @@ void WebsocketLibwebsockets::recv_loop() { std::string message{}; { - std::lock_guard lk(this->recv_mutex); if (recv_message_queue.empty()) break; - message = std::move(recv_message_queue.front()); - recv_message_queue.pop(); + message = recv_message_queue.pop(); } // Invoke our processing callback, that might trigger a send back that @@ -477,28 +578,19 @@ void WebsocketLibwebsockets::recv_loop() { this->message_callback(message); } - // While we are empty, sleep - { - std::unique_lock lock(this->recv_mutex); - recv_message_cv.wait_for(lock, std::chrono::seconds(1), - [&]() { return (false == recv_message_queue.empty()); }); + // While we are empty, sleep, only if we have not been interrupted in the + // message_callback. An interrupt can be caused in the message callback + // if we receive a certain message type that will cause the implementation + // in the charge point to attempt a reconnect (BasicAuthPass for example) + if (!local_data->is_interupted()) { + recv_message_queue.wait_on_queue_element(1s); } } EVLOG_debug << "Exit recv loop with ID: " << std::hex << std::this_thread::get_id(); } -void WebsocketLibwebsockets::client_loop() { - std::shared_ptr local_data = conn_data; - - if (local_data == nullptr) { - EVLOG_error << "Failed client loop context init!"; - return; - } - - // Bind thread for checks - local_data->bind_thread(std::this_thread::get_id()); - +bool WebsocketLibwebsockets::initialize_connection_options(std::shared_ptr& local_data) { // lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG | LLL_PARSER | LLL_HEADER | LLL_EXT | // LLL_CLIENT | LLL_LATENCY | LLL_THREAD | LLL_USER, nullptr); lws_set_log_level(LLL_ERR, nullptr); @@ -524,6 +616,7 @@ void WebsocketLibwebsockets::client_loop() { // Lifetime of this is important since we use the data from this in private_key_callback() std::optional private_key_password; + SSL_CTX* ssl_ctx = nullptr; if (this->connection_options.security_profile == 2 || this->connection_options.security_profile == 3) { // Setup context - need to know the key type first @@ -537,13 +630,7 @@ void WebsocketLibwebsockets::client_loop() { if (certificate_response.status != ocpp::GetCertificateInfoStatus::Accepted or !certificate_response.info.has_value()) { EVLOG_error << "Connecting with security profile 3 but no client side certificate is present or valid"; - - local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); - - // Notify conn waiter - conn_cv.notify_one(); - return; + return false; } const auto& certificate_info = certificate_response.info.value(); @@ -554,20 +641,13 @@ void WebsocketLibwebsockets::client_loop() { path_chain = certificate_info.certificate_single_path.value(); } else { EVLOG_error << "Connecting with security profile 3 but no client side certificate is present or valid"; - - local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); - - // Notify conn waiter - conn_cv.notify_one(); - return; + return false; } path_key = certificate_info.key_path; private_key_password = certificate_info.password; } - SSL_CTX* ssl_ctx = nullptr; bool custom_key = false; if (!path_key.empty()) { @@ -587,14 +667,8 @@ void WebsocketLibwebsockets::client_loop() { if (ssl_ctx == nullptr) { ERR_print_errors_fp(stderr); - EVLOG_error << "Unable to create ssl ctx"; - - local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); - - // Notify conn waiter - conn_cv.notify_one(); - return; + EVLOG_error << "Unable to create ssl context"; + return false; } if (this->connection_options.enable_tls_keylog and this->connection_options.keylog_file.has_value()) { @@ -604,420 +678,398 @@ void WebsocketLibwebsockets::client_loop() { } // Init TLS data - if (tls_init(ssl_ctx, path_chain, path_key, custom_key, private_key_password) == false) { - EVLOG_error << "Unable to init tls"; - - local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); - - // Notify conn waiter - conn_cv.notify_one(); - return; + if (!tls_init(ssl_ctx, path_chain, path_key, custom_key, private_key_password)) { + EVLOG_error << "Unable to init tls security options for websocket"; + return false; } // Setup our context info.provided_client_ssl_ctx = ssl_ctx; - - // Connection acquire the contexts - local_data->sec_context = std::unique_ptr(ssl_ctx); } lws_context* lws_ctx = lws_create_context(&info); if (nullptr == lws_ctx) { - EVLOG_error << "lws init failed!"; - local_data->update_state(EConnectionState::FINALIZED); - return; + EVLOG_error << "lws create context failed"; + return false; } - // Conn acquire the lws context - local_data->lws_ctx = std::unique_ptr(lws_ctx); + // Conn acquire the lws context and security context + local_data->init_connection_context(lws_ctx, ssl_ctx); + return true; +} - lws_client_connect_info i; - memset(&i, 0, sizeof(lws_client_connect_info)); +void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr local_data) { + if (local_data == nullptr) { + EVLOG_AND_THROW(std::runtime_error("Null 'ConnectionData' in client thread, fatal error!")); + } - // No SSL - int ssl_connection = 0; + EVLOG_info << "Init client loop with ID: " << std::hex << std::this_thread::get_id(); + bool try_reconnect = true; - if (this->connection_options.security_profile == 2 || this->connection_options.security_profile == 3) { - ssl_connection = LCCSCF_USE_SSL; + do { + if (!initialize_connection_options(local_data)) { + EVLOG_error << "Could not initialize connection options."; - // Skip server hostname check - if (this->connection_options.verify_csms_common_name == false) { - ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; - } + local_data->update_state(EConnectionState::ERROR); + on_conn_fail(local_data.get()); + } else { + lws_client_connect_info i; + memset(&i, 0, sizeof(lws_client_connect_info)); - // TODO: Completely remove after test - // ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; - // ssl_connection |= LCCSCF_ALLOW_INSECURE; - // ssl_connection |= LCCSCF_ALLOW_EXPIRED; - } + // No SSL + int ssl_connection = 0; - auto& uri = this->connection_options.csms_uri; + if (this->connection_options.security_profile == 2 || this->connection_options.security_profile == 3) { + ssl_connection = LCCSCF_USE_SSL; - std::string ocpp_versions; - bool first = true; - for (auto version : this->connection_options.ocpp_versions) { - if (!first) { - ocpp_versions += ", "; - } - first = false; - ocpp_versions += conversions::ocpp_protocol_version_to_string(version); - } - - // TODO: No idea who releases the strdup? - i.context = lws_ctx; - i.port = uri.get_port(); - i.address = strdup(uri.get_hostname().c_str()); // Base address, as resolved by getnameinfo - i.path = strdup((uri.get_path() + uri.get_chargepoint_id()).c_str()); // Path of resource - i.host = i.address; - i.origin = i.address; - i.ssl_connection = ssl_connection; - i.protocol = strdup(ocpp_versions.c_str()); - i.local_protocol_name = local_protocol_name; - i.pwsi = &local_data->wsi; - i.userdata = local_data.get(); // See lws_context 'user' + // Skip server hostname check + if (this->connection_options.verify_csms_common_name == false) { + ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; + } - if (this->connection_options.iface.has_value()) { - i.iface = this->connection_options.iface.value().c_str(); - } + // Debugging if required + // ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; + // ssl_connection |= LCCSCF_ALLOW_INSECURE; + // ssl_connection |= LCCSCF_ALLOW_EXPIRED; + } - // Print data for debug - EVLOG_info << "LWS connect with info " - << "port: [" << i.port << "] address: [" << i.address << "] path: [" << i.path << "] protocol: [" - << i.protocol << "]"; + auto& uri = this->connection_options.csms_uri; + lws* local_lws = nullptr; - if (lws_client_connect_via_info(&i) == nullptr) { - EVLOG_error << "LWS connect failed!"; - // This condition can occur when connecting fails to an IP address - // retries need to be attempted - local_data->update_state(EConnectionState::ERROR); - on_conn_fail(); + std::string ocpp_versions; + bool first = true; + for (auto version : this->connection_options.ocpp_versions) { + if (!first) { + ocpp_versions += ", "; + } + first = false; + ocpp_versions += conversions::ocpp_protocol_version_to_string(version); + } - // Notify conn waiter - conn_cv.notify_one(); - } else { - EVLOG_debug << "Init client loop with ID: " << std::hex << std::this_thread::get_id(); + // TODO: No idea who releases the strdup? + i.context = local_data->lws_ctx.get(); + i.port = uri.get_port(); + i.address = strdup(uri.get_hostname().c_str()); // Base address, as resolved by getnameinfo + i.path = strdup((uri.get_path() + uri.get_chargepoint_id()).c_str()); // Path of resource + i.host = i.address; + i.origin = i.address; + i.ssl_connection = ssl_connection; + i.protocol = strdup(ocpp_versions.c_str()); + i.local_protocol_name = local_protocol_name; + i.pwsi = &local_lws; // Will set the local_data->wsi to a valid value in case of a successful connect + i.userdata = local_data.get(); // See lws_context 'user' + + if (this->connection_options.iface.has_value()) { + i.iface = this->connection_options.iface.value().c_str(); + } - // Process while we're running - int n = 0; + // Print data for debug + EVLOG_info << "LWS connect with info " + << "port: [" << i.port << "] address: [" << i.address << "] path: [" << i.path << "] protocol: [" + << i.protocol << "]" + << " security profile: [" << this->connection_options.security_profile << "]"; - while (n >= 0 && (!local_data->is_interupted())) { - // Set to -1 for continuous servicing, of required, not recommended - n = lws_service(local_data->lws_ctx.get(), 0); + if (lws_client_connect_via_info(&i) == nullptr) { + EVLOG_error << "LWS connect failed!"; + // This condition can occur when connecting fails to an IP address + // retries need to be attempted + local_data->update_state(EConnectionState::ERROR); + on_conn_fail(local_data.get()); + } else { + local_data->init_connection(local_lws); - bool message_queue_empty; - { - std::lock_guard lock(this->queue_mutex); - message_queue_empty = message_queue.empty(); + // Process while we're running + int n = 0; + bool processing = true; + + do { + // We can grab the 'state' and 'lws_ctx' members here since we're only + // setting them from this thread and not from the exterior + + // Set to -1 for continuous servicing, if required, not recommended + n = lws_service(local_data->lws_ctx.get(), 0); + + auto state = local_data->state; + processing = (!local_data->is_interupted()) && + (state != EConnectionState::FINALIZED && state != EConnectionState::ERROR); + + if (processing && !message_queue.empty()) { + lws_callback_on_writable(local_data->get_conn()); + } + } while (n >= 0 && processing); + + // After this point no minimal_callback can be called, we have finished + // using the connection information and we will recreate it if required + local_data->reset_connection_data(); + } + } // End init connection + + long reconnect_delay = 0; + + if (local_data->is_interupted() || local_data->get_state() == EConnectionState::FINALIZED) { + EVLOG_info << "Connection interrupted or cleanly finalized, exiting websocket loop"; + try_reconnect = false; + } else if (local_data->get_state() != EConnectionState::CONNECTED) { + // Any other failure than a successful connect + + // -1 indicates to always attempt to reconnect + if (this->connection_options.max_connection_attempts == -1 or + this->connection_attempts <= this->connection_options.max_connection_attempts) { + local_data->update_state(EConnectionState::RECONNECTING); + reconnect_delay = this->get_reconnect_interval(); + try_reconnect = true; + + // Increment reconn attempts + this->connection_attempts += 1; + + EVLOG_info << "Connection not successful, attempting internal reconnect in: " << reconnect_delay + << "ms"; + } else { + local_data->update_state(EConnectionState::FINALIZED); + try_reconnect = false; + + EVLOG_info << "Connection reconnect attempts exhausted, exiting websocket loop, passing back control " + "to the application logic"; + } + } + + // Wait until new connection attempt + if (local_data->get_state() == EConnectionState::RECONNECTING) { + auto end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(reconnect_delay); + + while ((std::chrono::steady_clock::now() < end_time) && (false == local_data->is_interupted())) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - if (!message_queue_empty) { - lws_callback_on_writable(local_data->get_conn()); + + if (true == local_data->is_interupted()) { + try_reconnect = false; + EVLOG_info << "Interrupred reconnect attempt, not reconnecting!"; + } else { + EVLOG_info << "Attempting reconnect after a wait of: " << reconnect_delay << "ms"; } } + } while (try_reconnect); // End trying to connect + + // Give back control to the application + if (false == local_data->is_stop_executed()) { + this->push_deferred_callback([this]() { + if (this->stopped_connecting_callback) { + this->stopped_connecting_callback(WebsocketCloseReason::Normal); + } else { + EVLOG_error << "Stopped connecting callback not registered!"; + } - // Client loop finished for our tid - EVLOG_debug << "Exit client loop with ID: " << std::hex << std::this_thread::get_id(); + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); } + + // Client loop finished for our tid + EVLOG_info << "Exit websocket client loop with ID: " << std::hex << std::this_thread::get_id(); } -// Will be called from external threads as well -bool WebsocketLibwebsockets::connect() { - if (!this->initialized()) { - return false; - } +void WebsocketLibwebsockets::clear_all_queues() { + this->message_queue.clear(); + this->recv_buffered_message.clear(); + this->recv_message_queue.clear(); +} - // Clear shutting down so we allow to reconnect again as well - this->shutting_down = false; +void WebsocketLibwebsockets::safe_close_threads() { + // If we already have a connection attempt started for now shut it down first + std::shared_ptr local_conn_data = conn_data; + bool in_message_thread = false; - EVLOG_info << "Connecting to uri: " << this->connection_options.csms_uri.string() << " with security-profile " - << this->connection_options.security_profile - << (this->connection_options.use_tpm_tls ? " with TPM keys" : ""); + if (local_conn_data != nullptr) { + auto tid = std::this_thread::get_id(); - this->connected_ocpp_version = OcppProtocolVersion::Unknown; + if (tid == local_conn_data->get_client_thread_id()) { + EVLOG_AND_THROW(std::runtime_error("Trying to start/stop/reconnect from client thread!")); + } - // new connection context - std::shared_ptr local_data = std::make_shared(); - local_data->set_owner(this); + // TODO(ioan): reintroduce this check after we have solved the problem of a main processing + // loop in the libocpp. The problem is that a start/stop operation is executed from the + // message thread, because in message_callback we can initiate a reconnect when we receive + // a message of the type 'BasicAuthPass'. In turn, because there's no main processing + // loop with it's own thread, the response is polled immediately, and there's special behavior + // that requires a reconnect that is called from the message thread. + // The resulting problem is that we could have this thread dangling for a bit, since we are forced to detach - // Interrupt any previous connection - std::shared_ptr tmp_data = conn_data; - if (tmp_data != nullptr) { - tmp_data->do_interrupt(); + // else if (std::this_thread::get_id() == local_conn_data->get_message_thread_id()) { + // EVLOG_AND_THROW(std::runtime_error("Trying to start/stop/reconnect connection from message thread!")); } + + in_message_thread = (tid == local_conn_data->get_message_thread_id()); + local_conn_data->do_interrupt_and_exit(); } - // use new connection context - conn_data = local_data; + // Clear any pending outgoing/incoming messages on a new connection + clear_all_queues(); - { - std::scoped_lock lock(connection_mutex); - - // Wait old thread for a clean state - if (this->websocket_thread && this->websocket_thread->joinable()) { - // Awake libwebsockets thread to quickly exit - request_write(); - this->websocket_thread->join(); - this->websocket_thread.reset(); - } + // Wait old thread for a clean state + if (this->websocket_thread && this->websocket_thread->joinable()) { + // Awake libwebsockets thread to quickly exit + request_write(); + this->websocket_thread->join(); + this->websocket_thread.reset(); + } + if (in_message_thread) { + if (this->recv_message_thread) { + // See the note above 'in_message_thread' on why we detach + this->recv_message_thread->detach(); + this->recv_message_thread.reset(); + } + } else { if (this->recv_message_thread && this->recv_message_thread->joinable()) { // Awake the receiving message thread to finish - recv_message_cv.notify_one(); this->recv_message_thread->join(); this->recv_message_thread.reset(); } } +} - if (this->deferred_callback_thread == nullptr) { - this->deferred_callback_thread = - std::make_unique(&WebsocketLibwebsockets::handle_deferred_callback_queue, this); - } +bool WebsocketLibwebsockets::is_trying_to_connect() { + std::lock_guard lock(this->connection_mutex); + return is_trying_to_connect_internal(); +} - // Stop any pending reconnect timer - { - std::lock_guard lk(this->reconnect_mutex); - this->reconnect_timer_tpm.stop(); - } +bool WebsocketLibwebsockets::is_trying_to_connect_internal() { + std::shared_ptr local_conn_data = conn_data; + return (local_conn_data != nullptr) && (local_conn_data->get_state() != EConnectionState::FINALIZED); +} - // Clear any pending messages on a new connection - { - std::lock_guard lock(queue_mutex); - std::queue> empty; - empty.swap(message_queue); - } +// Will be called from external threads as well +bool WebsocketLibwebsockets::start_connecting() { + std::lock_guard lock(this->connection_mutex); - { - std::lock_guard lock(recv_mutex); - std::queue empty; - empty.swap(recv_message_queue); + if (!this->initialized()) { + EVLOG_error << "Websocket not properly initialized. A reconnect attempt will not be made."; + return false; } - bool timeouted = false; - bool connected = false; + // Clear shutting down so we allow to reconnect again as well + this->shutting_down = false; - { - std::unique_lock lock(connection_mutex); - - // Release other threads - this->websocket_thread = std::make_unique(&WebsocketLibwebsockets::client_loop, this); - - // TODO(ioan): remove this thread when the fix will be moved into 'MessageQueue' - // The reason for having a received message processing thread is that because - // if we dispatch a message receive from the client_loop thread, then the callback - // will send back another message, and since we're waiting for that message to be - // sent over the wire on the client_loop, not giving the opportunity to the loop to - // advance we will have a dead-lock - this->recv_message_thread = std::make_unique(&WebsocketLibwebsockets::recv_loop, this); - - // Wait until connect or timeout - timeouted = !conn_cv.wait_for(lock, std::chrono::seconds(60), [&]() { - return !local_data->is_connecting() && EConnectionState::INITIALIZE != local_data->get_state(); - }); + EVLOG_info << "Starting connection attempts to uri: " << this->connection_options.csms_uri.string() + << " with security-profile " << this->connection_options.security_profile + << (this->connection_options.use_tpm_tls ? " with TPM keys" : ""); + + this->connected_ocpp_version = OcppProtocolVersion::Unknown; - connected = (local_data->get_state() == EConnectionState::CONNECTED); + // If we already have a connection attempt started for now shut it down first + safe_close_threads(); + + // Create a new connection data (only created here, owner never changes) + conn_data = std::make_shared(this); + + // Stop any pending reconnect timer + { + std::lock_guard lk(this->reconnect_mutex); + this->reconnect_timer_tpm.stop(); } - if (!connected) { - EVLOG_info << "Connect failed with state: " << (int)local_data->get_state() << " Timeouted: " << timeouted; + this->connection_attempts = 1; // reset connection attempts - // If we timeouted the on_conn_fail was not dispatched, since it did not had the chance - if (timeouted && local_data->get_state() != EConnectionState::ERROR) { - EVLOG_error << "Conn failed with timeout, without disconnect dispatch, dispatching manually."; - on_conn_fail(); - } + // This should always be running, start it only once + if (this->deferred_callback_thread == nullptr) { + this->deferred_callback_thread = + std::make_unique(&WebsocketLibwebsockets::thread_deferred_callback_queue, this); + } - // Interrupt and drop the connection data - local_data->do_interrupt(); + // Release other threads + this->websocket_thread = + std::make_unique(&WebsocketLibwebsockets::thread_websocket_client_loop, this, this->conn_data); - // Also interrupt the latest conenction, if it was set by a parallel thread - auto local = conn_data; + // TODO(ioan): remove this thread when the fix will be moved into 'MessageQueue' + // The reason for having a received message processing thread is that because + // if we dispatch a message receive from the client_loop thread, then the callback + // will send back another message, and since we're waiting for that message to be + // sent over the wire on the client_loop, not giving the opportunity to the loop to + // advance we will have a dead-lock + this->recv_message_thread = std::make_unique( + &WebsocketLibwebsockets::thread_websocket_message_recv_loop, this, this->conn_data); - if (local != nullptr) { - local->do_interrupt(); - } + // Bind threads for various checks + this->conn_data->bind_thread_client(this->websocket_thread->get_id()); + this->conn_data->bind_thread_message(this->recv_message_thread->get_id()); - conn_data.reset(); - } + return true; +} - return (connected); +void WebsocketLibwebsockets::close(const WebsocketCloseReason code, const std::string& reason) { + std::lock_guard lock(this->connection_mutex); + close_internal(code, reason); } -void WebsocketLibwebsockets::reconnect(long delay) { - if (this->shutting_down) { - EVLOG_info << "Not reconnecting because the websocket is being shutdown."; +void WebsocketLibwebsockets::close_internal(const WebsocketCloseReason code, const std::string& reason) { + bool trying_connecting = is_trying_to_connect_internal() || this->m_is_connected; + + if (!trying_connecting) { + EVLOG_warning << "Trying to close inactive websocket with code: " << (int)code << " and reason: " << reason + << ", returning"; return; } - EVLOG_info << "Reconnecting in: " << delay << "ms" - << ", attempt: " << this->connection_attempts; + EVLOG_info << "Closing websocket with code: " << (int)code << " and reason: " << reason; - if (this->m_is_connected) { - this->close(WebsocketCloseReason::AbnormalClose, "before reconnecting"); - } + // Close any ongoing thread + safe_close_threads(); + + // Release the connection data and state + conn_data.reset(); + this->m_is_connected = false; { std::lock_guard lk(this->reconnect_mutex); - this->reconnect_timer_tpm.timeout( - [this]() { - // close connection before reconnecting - if (this->m_is_connected) { - this->close(WebsocketCloseReason::AbnormalClose, "before reconnecting"); - } - - this->connect(); - }, - std::chrono::milliseconds(delay)); + this->reconnect_timer_tpm.stop(); } } -void WebsocketLibwebsockets::close(const WebsocketCloseReason code, const std::string& reason) { - if (!reason.empty()) { - EVLOG_info << "Closing websocket: " << reason; - } +void WebsocketLibwebsockets::reconnect(long delay) { + std::lock_guard lock(this->connection_mutex); - { - std::lock_guard lk(this->reconnect_mutex); - this->reconnect_timer_tpm.stop(); + if (this->shutting_down) { + EVLOG_info << "Not reconnecting because the websocket is being shutdown."; + return; } - std::shared_ptr local_data = conn_data; - if (local_data != nullptr) { - // Set the trigger from us - local_data->request_close(); - local_data->do_interrupt(); + if (this->m_is_connected || is_trying_to_connect_internal()) { + this->close_internal(WebsocketCloseReason::AbnormalClose, "before manually reconnecting"); } - // Release the connection data - conn_data.reset(); - this->m_is_connected = false; + EVLOG_info << "Externally called reconnect in: " << delay << "ms" + << ", attempt: " << this->connection_attempts; - // Notify any message senders that are waiting, since we can't send messages any more - msg_send_cv.notify_all(); + std::lock_guard lk(this->reconnect_mutex); + this->reconnect_timer_tpm.timeout( + [this]() { + // close connection before reconnecting + if (this->m_is_connected || is_trying_to_connect()) { + this->close(WebsocketCloseReason::AbnormalClose, "before manually reconnecting"); + } - // Clear any irrelevant data after a DC - recv_buffered_message.clear(); + this->start_connecting(); + }, + std::chrono::milliseconds(delay)); +} - this->push_deferred_callback([this, code]() { - if (this->closed_callback) { - this->closed_callback(code); - } else { - EVLOG_error << "Closed callback not registered!"; - } +static bool send_internal(lws* wsi, WebsocketMessage* msg) { + static std::vector buff; - if (this->disconnected_callback) { - this->disconnected_callback(); - } else { - EVLOG_error << "Disconnected callback not registered!"; - } - }); -} + std::string& message = msg->payload; + size_t message_len = message.length(); + size_t buff_req_size = message_len + LWS_PRE; -void WebsocketLibwebsockets::on_conn_connected() { - EVLOG_info << "OCPP client successfully connected to server with version: " << this->connected_ocpp_version; + if (buff.size() < buff_req_size) + buff.resize(buff_req_size); - this->connection_attempts = 1; // reset connection attempts - this->m_is_connected = true; - this->reconnecting = false; + // Copy data in send buffer + memcpy(&buff[LWS_PRE], message.data(), message_len); - // Clear any irrelevant data after a DC - recv_buffered_message.clear(); - - this->push_deferred_callback([this]() { - if (connected_callback) { - this->connected_callback(this->connected_ocpp_version); - } else { - EVLOG_error << "Connected callback not registered!"; - } - }); -} - -void WebsocketLibwebsockets::on_conn_close() { - EVLOG_info << "OCPP client closed connection to server"; - - std::lock_guard lk(this->connection_mutex); - this->m_is_connected = false; - this->cancel_reconnect_timer(); - - // Notify any message senders that are waiting, since we can't send messages any more - msg_send_cv.notify_all(); - - // Clear any irrelevant data after a DC - recv_buffered_message.clear(); - - this->push_deferred_callback([this]() { - if (this->closed_callback) { - this->closed_callback(WebsocketCloseReason::Normal); - } else { - EVLOG_error << "Closed callback not registered!"; - } - - if (this->disconnected_callback) { - this->disconnected_callback(); - } else { - EVLOG_error << "Disconnected callback not registered!"; - } - }); -} - -void WebsocketLibwebsockets::on_conn_fail() { - EVLOG_error << "OCPP client connection to server failed"; - - std::lock_guard lk(this->connection_mutex); - if (this->m_is_connected) { - this->push_deferred_callback([this]() { - if (this->disconnected_callback) { - this->disconnected_callback(); - } else { - EVLOG_error << "Disconnected callback not registered!"; - } - }); - } - - this->m_is_connected = false; - recv_buffered_message.clear(); - - // -1 indicates to always attempt to reconnect - if (this->connection_options.max_connection_attempts == -1 or - this->connection_attempts <= this->connection_options.max_connection_attempts) { - this->reconnect(this->get_reconnect_interval()); - - // Increment reconn attempts - this->connection_attempts += 1; - } else { - this->close(WebsocketCloseReason::Normal, "reconnect attempts exhausted"); - } -} - -void WebsocketLibwebsockets::on_message(std::string&& message) { - if (!this->initialized()) { - EVLOG_error << "Message received but TLS websocket has not been correctly initialized. Discarding message."; - return; - } - - EVLOG_debug << "Received message over TLS websocket polling for process: " << message; - - { - std::lock_guard lock(this->recv_mutex); - recv_message_queue.push(std::move(message)); - } - - recv_message_cv.notify_one(); -} - -static bool send_internal(lws* wsi, WebsocketMessage* msg) { - static std::vector buff; - - std::string& message = msg->payload; - size_t message_len = message.length(); - size_t buff_req_size = message_len + LWS_PRE; - - if (buff.size() < buff_req_size) - buff.resize(buff_req_size); - - // Copy data in send buffer - memcpy(&buff[LWS_PRE], message.data(), message_len); - - // TODO (ioan): if we require certain sending over the wire, - // we'll have to send chunked manually something like this: + // TODO (ioan): if we require certain sending over the wire, + // we'll have to send chunked manually something like this: // Ethernet MTU: ~= 1500bytes // size_t BUFF_SIZE = (MTU * 2); @@ -1051,111 +1103,14 @@ static bool send_internal(lws* wsi, WebsocketMessage* msg) { return true; } -void WebsocketLibwebsockets::on_writable() { - if (!this->initialized() || !this->m_is_connected) { - EVLOG_error << "Message sending but TLS websocket has not been correctly initialized/connected."; - return; - } - - std::shared_ptr local_data = conn_data; - - if (local_data == nullptr) { - EVLOG_error << "Message sending TLS websocket with null connection data!"; - return; - } - - if (local_data->is_interupted() || local_data->get_state() == EConnectionState::FINALIZED) { - EVLOG_error << "Trying to write message to interrupted/finalized state!"; - return; - } - - // Execute while we have messages that were polled - while (true) { - WebsocketMessage* message = nullptr; - - { - std::lock_guard lock(this->queue_mutex); - - // Break if we have en empty queue - if (message_queue.empty()) - break; - - message = message_queue.front().get(); - } - - if (message == nullptr) { - EVLOG_AND_THROW(std::runtime_error("Null message in queue, fatal error!")); - } - - // This message was polled in a previous iteration - if (message->sent_bytes >= message->payload.length()) { - EVLOG_debug << "Websocket message fully written, popping processing thread from queue!"; - - // If we have written all bytes to libwebsockets it means that if we received - // this writable callback everything is sent over the wire, mark the message - // as 'sent' and remove it from the queue - { - std::lock_guard lock(this->queue_mutex); - message->message_sent = true; - message_queue.pop(); - } - - EVLOG_debug << "Notifying waiting thread!"; - // Notify any waiting thread to check it's state - msg_send_cv.notify_all(); - } else { - // If the message was not polled, we reached the first unpolled and break - break; - } - } - - // If we still have message ONLY poll a single one that can be processed in the invoke of the function - // libwebsockets is designed so that when a message is sent to the wire from the internal buffer it - // will invoke 'on_writable' again and we can execute the code above - bool any_message_polled; - { - std::lock_guard lock(this->queue_mutex); - any_message_polled = not message_queue.empty(); - } - - // Poll a single message - if (any_message_polled) { - EVLOG_debug << "Client writable, sending message part!"; - - WebsocketMessage* message = nullptr; - - { - std::lock_guard lock(this->queue_mutex); - message = message_queue.front().get(); - } - - if (message == nullptr) { - EVLOG_AND_THROW(std::runtime_error("Null message in queue, fatal error!")); - } - - if (message->sent_bytes >= message->payload.length()) { - EVLOG_AND_THROW(std::runtime_error("Already polled message should be handled above, fatal error!")); - } - - // Continue sending message part, for a single message only - bool sent = send_internal(local_data->get_conn(), message); - - // If we failed, attempt again later - if (!sent) { - message->sent_bytes = 0; - } - } -} - void WebsocketLibwebsockets::request_write() { - std::shared_ptr local_data = conn_data; if (this->m_is_connected) { + std::shared_ptr local_data = conn_data; + if (local_data != nullptr) { - if (local_data->get_conn()) { - // Notify waiting processing thread to wake up. According to docs it is ok to call from another - // thread. - lws_cancel_service(local_data->lws_ctx.get()); - } + // Notify waiting processing thread to wake up. According to docs + // it is ok to call from another thread. + local_data->request_awake(); } } else { EVLOG_debug << "Requested write with offline TLS websocket!"; @@ -1171,7 +1126,7 @@ void WebsocketLibwebsockets::poll_message(const std::shared_ptr local_data = conn_data; if (local_data != nullptr) { - auto cd_tid = local_data->get_lws_thread_id(); + auto cd_tid = local_data->get_client_thread_id(); if (std::this_thread::get_id() == cd_tid) { EVLOG_AND_THROW(std::runtime_error("Deadlock detected, polling send from client lws thread!")); @@ -1185,22 +1140,18 @@ void WebsocketLibwebsockets::poll_message(const std::shared_ptrpayload; - - { - std::lock_guard lock(this->queue_mutex); - message_queue.emplace(msg); - } + message_queue.push(msg); // Request a write callback request_write(); - { - std::unique_lock lock(this->msg_send_cv_mutex); - if (msg_send_cv.wait_for(lock, std::chrono::seconds(20), [&] { return (true == msg->message_sent); })) { - EVLOG_debug << "Successfully sent last message over TLS websocket!"; - } else { - EVLOG_warning << "Could not send last message over TLS websocket!"; - } + message_queue.wait_on_custom_event([&] { return (true == msg->message_sent); }, + std::chrono::seconds(MESSAGE_SEND_TIMEOUT_S)); + + if (msg->message_sent) { + EVLOG_debug << "Successfully sent last message over TLS websocket!"; + } else { + EVLOG_warning << "Could not send last message over TLS websocket!"; } } @@ -1321,7 +1272,7 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { std::string error_message = (in ? reinterpret_cast(in) : "(null)"); - EVLOG_error << "CLIENT_CONNECTION_ERROR: " << error_message; + EVLOG_error << "CLIENT_CONNECTION_ERROR: [" << error_message << "]. Attempting reconnect"; ERR_print_errors_fp(stderr); if (error_message.find("HS: ws upgrade unauthorized") != std::string::npos) { @@ -1334,13 +1285,10 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, }); } - if (data->get_state() == EConnectionState::CONNECTING) { - data->update_state(EConnectionState::ERROR); - conn_cv.notify_one(); - } + data->update_state(EConnectionState::ERROR); + on_conn_fail(data); - on_conn_fail(); - return -1; + return 0; } case LWS_CALLBACK_CONNECTING: EVLOG_debug << "Client connecting..."; @@ -1359,12 +1307,8 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, break; case LWS_CALLBACK_CLIENT_ESTABLISHED: - if (data->get_state() == EConnectionState::CONNECTING) { - data->update_state(EConnectionState::CONNECTED); - conn_cv.notify_one(); - } - - on_conn_connected(); + data->update_state(EConnectionState::CONNECTED); + on_conn_connected(data); // Attempt first write after connection lws_callback_on_writable(wsi); @@ -1375,31 +1319,29 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, unsigned char* pp = reinterpret_cast(in); unsigned short close_code = (unsigned short)((pp[0] << 8) | pp[1]); - EVLOG_info << "Unsolicited client close reason: " << close_reason << " close code: " << close_code; - - if (close_code != LWS_CLOSE_STATUS_NORMAL) { - data->update_state(EConnectionState::ERROR); - data->do_interrupt(); - on_conn_fail(); - } + // In the case that the websocket (server) has closed the + // connection we must ALWAYS try to reconnect + EVLOG_info << "Websocket peer initiated close with reason: [" << close_reason << "] close code: [" << close_code + << "]. Reconnecting"; + data->update_state(EConnectionState::ERROR); + on_conn_fail(data); // Return 0 to print peer close reason return 0; } case LWS_CALLBACK_CLIENT_CLOSED: - EVLOG_info << "Client closed, was requested: " << data->is_close_requested(); - // Determine if the close connection was requested or if the server went away - if (data->is_close_requested()) { + // case in which we receive a 'LWS_CALLBACK_CLIENT_CLOSED' that was not requested + if (data->is_interupted()) { + EVLOG_info << "Client closed, was requested internally, finalizing connection, not reconnecting"; data->update_state(EConnectionState::FINALIZED); - data->do_interrupt(); - on_conn_close(); + on_conn_close(data); } else { + EVLOG_info << "Client closed, was not requested internally, attempting reconnection"; // It means the server went away, attempt to reconnect data->update_state(EConnectionState::ERROR); - data->do_interrupt(); - on_conn_fail(); + on_conn_fail(data); } break; @@ -1408,26 +1350,14 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, break; case LWS_CALLBACK_CLIENT_WRITEABLE: - on_writable(); - { - bool message_queue_empty; - { - std::lock_guard lock(this->queue_mutex); - message_queue_empty = message_queue.empty(); - } - if (false == message_queue_empty) { - lws_callback_on_writable(wsi); - } + on_conn_writable(); + if (false == message_queue.empty()) { + lws_callback_on_writable(wsi); } break; case LWS_CALLBACK_CLIENT_RECEIVE_PONG: { - bool message_queue_empty; - { - std::lock_guard lock(this->queue_mutex); - message_queue_empty = message_queue.empty(); - } - if (false == message_queue_empty) { + if (false == message_queue.empty()) { lws_callback_on_writable(data->get_conn()); } } break; @@ -1437,29 +1367,17 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, // Message is complete if (lws_remaining_packet_payload(wsi) <= 0) { - on_message(std::move(recv_buffered_message)); + on_conn_message(std::move(recv_buffered_message)); recv_buffered_message.clear(); } - { - bool message_queue_empty; - { - std::lock_guard lock(this->queue_mutex); - message_queue_empty = message_queue.empty(); - } - if (false == message_queue_empty) { - lws_callback_on_writable(data->get_conn()); - } + if (false == message_queue.empty()) { + lws_callback_on_writable(data->get_conn()); } break; case LWS_CALLBACK_EVENT_WAIT_CANCELLED: { - bool message_queue_empty; - { - std::lock_guard lock(this->queue_mutex); - message_queue_empty = message_queue.empty(); - } - if (false == message_queue_empty) { + if (false == message_queue.empty()) { lws_callback_on_writable(data->get_conn()); } } break; @@ -1484,32 +1402,195 @@ int WebsocketLibwebsockets::process_callback(void* wsi_ptr, int callback_reason, return 0; } +void WebsocketLibwebsockets::on_conn_connected(ConnectionData* conn_data) { + // Called on the websocket client thread + EVLOG_info << "OCPP client successfully connected to server with version: " << this->connected_ocpp_version; + + this->connection_attempts = 1; // reset connection attempts + this->m_is_connected = true; + this->reconnecting = false; + + // Stop any dangling reconnect + { + std::lock_guard lk(this->reconnect_mutex); + this->reconnect_timer_tpm.stop(); + } + + // Clear any irrelevant data after a DC + clear_all_queues(); + + this->push_deferred_callback([this]() { + if (connected_callback) { + this->connected_callback(this->connected_ocpp_version); + } else { + EVLOG_error << "Connected callback not registered!"; + } + }); +} + +void WebsocketLibwebsockets::on_conn_close(ConnectionData* conn_data) { + // Called on the websocket client thread + EVLOG_info << "OCPP client closed connection to server"; + + this->m_is_connected = false; + + { + std::lock_guard lk(this->reconnect_mutex); + this->reconnect_timer_tpm.stop(); + } + + // Clear any irrelevant data after a DC + clear_all_queues(); + + this->push_deferred_callback([this]() { + if (this->stopped_connecting_callback) { + this->stopped_connecting_callback(WebsocketCloseReason::Normal); + } else { + EVLOG_error << "Stopped connecting callback not registered!"; + } + + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); + + // We have polled the stopped connected + conn_data->mark_stop_executed(); +} + +void WebsocketLibwebsockets::on_conn_fail(ConnectionData* conn_data) { + // Called on the websocket client thread + EVLOG_error << "OCPP client connection to server failed"; + + if (this->m_is_connected) { + this->push_deferred_callback([this]() { + if (this->disconnected_callback) { + this->disconnected_callback(); + } else { + EVLOG_error << "Disconnected callback not registered!"; + } + }); + } + + this->m_is_connected = false; + + // Clear any irrelevant data after a DC + clear_all_queues(); + + // TODO: See if this is required for a faster fail + // lws_set_timeout(conn_data->get_conn(), (enum pending_timeout)1, LWS_TO_KILL_ASYNC); +} + +void WebsocketLibwebsockets::on_conn_message(std::string&& message) { + // Called on the websocket client thread + if (!this->initialized()) { + EVLOG_error << "Message received but TLS websocket has not been correctly initialized. Discarding message."; + return; + } + + EVLOG_debug << "Received message over TLS websocket polling for process: " << message; + recv_message_queue.push(std::move(message)); +} + +void WebsocketLibwebsockets::on_conn_writable() { + // Called on the websocket client thread + if (!this->initialized() || !this->m_is_connected) { + EVLOG_error << "Message sending but TLS websocket has not been correctly initialized/connected."; + return; + } + + std::shared_ptr local_data = conn_data; + + if (local_data == nullptr) { + EVLOG_error << "Message sending TLS websocket with null connection data!"; + return; + } + + if (local_data->is_interupted() || local_data->get_state() == EConnectionState::FINALIZED) { + EVLOG_error << "Trying to write message to interrupted/finalized state!"; + return; + } + + // Execute while we have messages that were polled + while (true) { + WebsocketMessage* message = nullptr; + + // Break if we have en empty queue + if (message_queue.empty()) + break; + + message = message_queue.front().get(); + + if (message == nullptr) { + EVLOG_AND_THROW(std::runtime_error("Null message in queue, fatal error!")); + } + + // This message was polled in a previous iteration + if (message->sent_bytes >= message->payload.length()) { + EVLOG_debug << "Websocket message fully written, popping processing thread from queue!"; + + // If we have written all bytes to libwebsockets it means that if we received + // this writable callback everything is sent over the wire, mark it as sent and remove + message->message_sent = true; + message_queue.pop(); + } else { + // If the message was not polled, we reached the first unpolled and break + break; + } + } + + // If we still have message ONLY poll a single one that can be processed in the invoke of the function + // libwebsockets is designed so that when a message is sent to the wire from the internal buffer it + // will invoke 'on_conn_writable' again and we can execute the code above + if (!message_queue.empty()) { + // Poll a single message + EVLOG_debug << "Client writable, sending message part!"; + + WebsocketMessage* message = message_queue.front().get(); + + if (message == nullptr) { + EVLOG_AND_THROW(std::runtime_error("Null message in queue, fatal error!")); + } + + if (message->sent_bytes >= message->payload.length()) { + EVLOG_AND_THROW(std::runtime_error("Already polled message should be handled above, fatal error!")); + } + + // Continue sending message part, for a single message only + bool sent = send_internal(local_data->get_conn(), message); + + // If we failed, attempt again later + if (!sent) { + message->sent_bytes = 0; + } + } +} + void WebsocketLibwebsockets::push_deferred_callback(const std::function& callback) { if (!callback) { EVLOG_error << "Attempting to push stale callback in deferred queue!"; return; } - std::scoped_lock tmp_lock(this->deferred_callback_mutex); this->deferred_callback_queue.push(callback); - this->deferred_callback_cv.notify_one(); } -void WebsocketLibwebsockets::handle_deferred_callback_queue() { +void WebsocketLibwebsockets::thread_deferred_callback_queue() { while (true) { std::function callback; { - std::unique_lock lock(this->deferred_callback_mutex); - this->deferred_callback_cv.wait( - lock, [this]() { return !this->deferred_callback_queue.empty() or stop_deferred_handler; }); + this->deferred_callback_queue.wait_on_queue_element_or_predicate( + [this]() { return this->stop_deferred_handler.load(); }); if (stop_deferred_handler and this->deferred_callback_queue.empty()) { break; } - callback = this->deferred_callback_queue.front(); - this->deferred_callback_queue.pop(); + callback = this->deferred_callback_queue.pop(); } + // This needs to be out of lock scope otherwise we still keep the mutex locked while executing the callback. // This would block the callers of push_deferred_callback() if (callback) { diff --git a/lib/ocpp/v16/charge_point_impl.cpp b/lib/ocpp/v16/charge_point_impl.cpp index 26f39d761..a8f2176dc 100644 --- a/lib/ocpp/v16/charge_point_impl.cpp +++ b/lib/ocpp/v16/charge_point_impl.cpp @@ -323,7 +323,7 @@ void ChargePointImpl::init_websocket() { this->signal_set_charging_profiles_callback(); } }); - this->websocket->register_closed_callback([this](const WebsocketCloseReason reason) { + this->websocket->register_stopped_connecting_callback([this](const WebsocketCloseReason reason) { if (this->switch_security_profile_callback != nullptr) { this->switch_security_profile_callback(); } @@ -397,8 +397,7 @@ WebsocketConnectionOptions ChargePointImpl::get_ws_connection_options() { void ChargePointImpl::connect_websocket() { if (!this->websocket->is_connected()) { - this->init_websocket(); - this->websocket->connect(); + this->websocket->start_connecting(); } } @@ -1080,7 +1079,7 @@ bool ChargePointImpl::start(const std::map& connector_st this->bootreason = bootreason; this->init_state_machine(connector_status_map); this->init_websocket(); - this->websocket->connect(); + this->websocket->start_connecting(); // push transaction messages including SecurityEventNotification.req onto the message queue this->message_queue->get_persisted_messages_from_db(this->configuration->getDisableSecurityEventNotifications()); this->boot_notification(); @@ -1840,12 +1839,11 @@ void ChargePointImpl::switchSecurityProfile(int32_t new_security_profile, int32_ // we need to reinitialize because it could be plain or tls websocket this->websocket_timer.timeout( [this, max_connection_attempts, new_security_profile]() { - this->init_websocket(); auto connection_options = this->get_ws_connection_options(); connection_options.security_profile = new_security_profile; connection_options.max_connection_attempts = max_connection_attempts; this->websocket->set_connection_options(connection_options); - this->websocket->connect(); + this->websocket->start_connecting(); }, WEBSOCKET_INIT_DELAY); } diff --git a/lib/ocpp/v201/connectivity_manager.cpp b/lib/ocpp/v201/connectivity_manager.cpp index 7949b80e5..856375c5e 100644 --- a/lib/ocpp/v201/connectivity_manager.cpp +++ b/lib/ocpp/v201/connectivity_manager.cpp @@ -237,8 +237,8 @@ void ConnectivityManager::try_connect_websocket() { [this](OcppProtocolVersion protocol) { this->on_websocket_connected(protocol); }); this->websocket->register_disconnected_callback( std::bind(&ConnectivityManager::on_websocket_disconnected, this)); - this->websocket->register_closed_callback( - std::bind(&ConnectivityManager::on_websocket_closed, this, std::placeholders::_1)); + this->websocket->register_stopped_connecting_callback( + std::bind(&ConnectivityManager::on_websocket_stopped_connecting, this, std::placeholders::_1)); } else { this->websocket->set_connection_options(connection_options.value()); } @@ -250,7 +250,7 @@ void ConnectivityManager::try_connect_websocket() { this->websocket->register_message_callback([this](const std::string& message) { this->message_callback(message); }); - this->websocket->connect(); + this->websocket->start_connecting(); } std::optional @@ -397,7 +397,7 @@ void ConnectivityManager::on_websocket_disconnected() { } } -void ConnectivityManager::on_websocket_closed(ocpp::WebsocketCloseReason reason) { +void ConnectivityManager::on_websocket_stopped_connecting(ocpp::WebsocketCloseReason reason) { EVLOG_warning << "Closed websocket of NetworkConfigurationPriority: " << this->active_network_configuration_priority + 1 << " which is configurationSlot " << this->get_active_network_configuration_slot();