Skip to content

Commit

Permalink
Bugfix/libwebsocket dtor deadlock (#896)
Browse files Browse the repository at this point in the history
* Added thread-safe queue for messages, removed extra mutexes
* Updated websocket connect function name to better clarify what it does
* Code reorder for less failure points on separate threads
* Clarified thread responsibility, internal mutex lock scope clarification
* Updated header docs
* Moved the safe queue to it's own header

---------

Signed-off-by: AssemblyJohn <[email protected]>
Signed-off-by: Piet Gömpel <[email protected]>
Co-authored-by: Piet Gömpel <[email protected]>
  • Loading branch information
AssemblyJohn and Pietfried authored Dec 13, 2024
1 parent 21c83c9 commit 230ae1a
Show file tree
Hide file tree
Showing 11 changed files with 897 additions and 679 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.14)

project(ocpp
VERSION 0.21.0
VERSION 0.22.0
DESCRIPTION "A C++ implementation of the Open Charge Point Protocol"
LANGUAGES CXX
)
Expand Down
127 changes: 127 additions & 0 deletions include/ocpp/common/safe_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2024 Pionix GmbH and Contributors to EVerest

#pragma once

#include <condition_variable>
#include <mutex>
#include <queue>

namespace ocpp {

/// \brief Thread safe message queue. Holds a conditional variable
/// that can be waited upon. Will take up the waiting thread on each
/// operation of push/pop/clear
template <typename T> class SafeQueue {
using safe_queue_reference = typename std::queue<T>::reference;
using safe_queue_const_reference = typename std::queue<T>::const_reference;

public:
/// \return True if the queue is empty
inline bool empty() const {
std::lock_guard lock(mutex);
return queue.empty();
}

inline safe_queue_reference front() {
std::lock_guard lock(mutex);
return queue.front();
}

inline safe_queue_const_reference front() const {
std::lock_guard lock(mutex);
return queue.front();
}

/// \return retrieves and removes the first element in the queue. Undefined behavior if the queue is empty
inline T pop() {
std::unique_lock<std::mutex> lock(mutex);

T front = std::move(queue.front());
queue.pop();

// Unlock here and notify
lock.unlock();

notify_waiting_thread();

return front;
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(T&& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
inline void push(const T& value) {
{
std::lock_guard<std::mutex> lock(mutex);
queue.push(value);
}

notify_waiting_thread();
}

/// \brief Clears the queue
inline void clear() {
{
std::lock_guard<std::mutex> lock(mutex);

std::queue<T> empty;
empty.swap(queue);
}

notify_waiting_thread();
}

/// \brief Waits for the queue to receive an element
/// \param timeout to wait for an element, pass in a value <= 0 to wait indefinitely
inline void wait_on_queue_element(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
wait_on_queue_element_or_predicate([]() { return false; }, timeout);
}

/// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon
template <class Predicate>
inline void wait_on_queue_element_or_predicate(Predicate pred,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(mutex);

if (timeout.count() > 0) {
cv.wait_for(lock, timeout, [&]() { return (false == queue.empty()) or pred(); });
} else {
cv.wait(lock, [&]() { return (false == queue.empty()) or pred(); });
}
}

/// \brief Waits on the queue for a custom event
/// \param timeout to wait for an element, pass in a value <= 0 to wait indefinitely
template <class Predicate>
inline void wait_on_custom_event(Predicate pred, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(mutex);

if (timeout.count() > 0) {
cv.wait_for(lock, timeout, [&]() { return pred(); });
} else {
cv.wait(lock, [&]() { return pred(); });
}
}

/// \brief Notifies a single waiting thread to wake up
inline void notify_waiting_thread() {
cv.notify_one();
}

private:
std::queue<T> queue;

mutable std::mutex mutex;
std::condition_variable cv;
};

} // namespace ocpp
12 changes: 7 additions & 5 deletions include/ocpp/common/websocket/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Websocket {
std::unique_ptr<WebsocketBase> websocket;
std::function<void(OcppProtocolVersion protocol)> connected_callback;
std::function<void()> disconnected_callback;
std::function<void(const WebsocketCloseReason reason)> closed_callback;
std::function<void(const WebsocketCloseReason reason)> stopped_connecting_callback;
std::function<void(const std::string& message)> message_callback;
std::shared_ptr<MessageLogging> logging;

Expand All @@ -28,8 +28,10 @@ class Websocket {
std::shared_ptr<EvseSecurity> evse_security, std::shared_ptr<MessageLogging> logging);
~Websocket();

/// \brief connect to a websocket (TLS or non-TLS depending on the central system uri in the configuration).
bool connect();
/// \brief Starts the connection attempts. It will init the websocket processing thread
/// \returns true if the websocket is successfully initialized, false otherwise. Does
/// not wait for a successful connection
bool start_connecting();

void set_connection_options(const WebsocketConnectionOptions& connection_options);

Expand All @@ -48,9 +50,9 @@ class Websocket {
/// \brief register a \p callback that is called when the websocket connection is disconnected
void register_disconnected_callback(const std::function<void()>& callback);

/// \brief register a \p callback that is called when the websocket connection has been closed and will not attempt
/// \brief register a \p callback that is called when the websocket connection has been stopped and will not attempt
/// to reconnect
void register_closed_callback(const std::function<void(const WebsocketCloseReason)>& callback);
void register_stopped_connecting_callback(const std::function<void(const WebsocketCloseReason)>& callback);

/// \brief register a \p callback that is called when the websocket receives a message
void register_message_callback(const std::function<void(const std::string& message)>& callback);
Expand Down
11 changes: 6 additions & 5 deletions include/ocpp/common/websocket/websocket_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class WebsocketBase {
WebsocketConnectionOptions connection_options;
std::function<void(OcppProtocolVersion protocol)> connected_callback;
std::function<void()> disconnected_callback;
std::function<void(const WebsocketCloseReason reason)> closed_callback;
std::function<void(const WebsocketCloseReason reason)> stopped_connecting_callback;
std::function<void(const std::string& message)> message_callback;
std::function<void(ConnectionFailedReason)> connection_failed_callback;
std::shared_ptr<boost::asio::steady_timer> reconnect_timer;
Expand Down Expand Up @@ -90,9 +90,10 @@ class WebsocketBase {
explicit WebsocketBase();
virtual ~WebsocketBase();

/// \brief connect to a websocket
/// \returns true if the websocket is initialized and a connection attempt is made
virtual bool connect() = 0;
/// \brief Starts the connection attempts. It will init the websocket processing thread
/// \returns true if the websocket is successfully initialized, false otherwise. Does
/// not wait for a successful connection
virtual bool start_connecting() = 0;

/// \brief sets this connection_options to the given \p connection_options and resets the connection_attempts
virtual void set_connection_options(const WebsocketConnectionOptions& connection_options) = 0;
Expand All @@ -118,7 +119,7 @@ class WebsocketBase {

/// \brief register a \p callback that is called when the websocket connection has been closed and will not attempt
/// to reconnect
void register_closed_callback(const std::function<void(const WebsocketCloseReason reason)>& callback);
void register_stopped_connecting_callback(const std::function<void(const WebsocketCloseReason reason)>& callback);

/// \brief register a \p callback that is called when the websocket receives a message
void register_message_callback(const std::function<void(const std::string& message)>& callback);
Expand Down
76 changes: 41 additions & 35 deletions include/ocpp/common/websocket/websocket_libwebsockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
#define OCPP_WEBSOCKET_TLS_TPM_HPP

#include <ocpp/common/evse_security.hpp>
#include <ocpp/common/safe_queue.hpp>
#include <ocpp/common/websocket/websocket_base.hpp>

#include <condition_variable>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <string>

struct ssl_ctx_st;
Expand All @@ -31,84 +29,92 @@ class WebsocketLibwebsockets final : public WebsocketBase {

void set_connection_options(const WebsocketConnectionOptions& connection_options) override;

/// \brief connect to a TLS websocket
/// \returns true if the websocket is initialized and a connection attempt is made
bool connect() override;
bool start_connecting() override;

/// \brief Reconnects the websocket using the delay, a reason for this reconnect can be provided with the
/// \param reason parameter
/// \param delay delay of the reconnect attempt
void reconnect(long delay) override;

/// \brief closes the websocket
void close(const WebsocketCloseReason code, const std::string& reason) override;

/// \brief send a \p message over the websocket
/// \returns true if the message was sent successfully
bool send(const std::string& message) override;

/// \brief send a websocket ping
void ping() override;

/// \brief Indicates if the websocket has a valid connection data and is trying to
/// connect/reconnect internally even if for the moment it might not be connected
/// \return True if the websocket is connected or trying to connect, false otherwise
bool is_trying_to_connect();

public:
int process_callback(void* wsi_ptr, int callback_reason, void* user, void* in, size_t len);

private:
bool is_trying_to_connect_internal();
void close_internal(const WebsocketCloseReason code, const std::string& reason);

private:
/// \brief Initializes the connection options, including the security info
/// \return True if it was successful, false otherwise
bool initialize_connection_options(std::shared_ptr<ConnectionData>& new_connection_data);

bool tls_init(struct ssl_ctx_st* ctx, const std::string& path_chain, const std::string& path_key, bool custom_key,
std::optional<std::string>& password);
void client_loop();
void recv_loop();

/// \brief Websocket processing thread loop
void thread_websocket_client_loop(std::shared_ptr<ConnectionData> local_data);

/// \brief Function to handle received messages. Required since from the received message
/// callback we also send messages that must block and wait on the client thread
void thread_websocket_message_recv_loop(std::shared_ptr<ConnectionData> local_data);

/// \brief Function to handle the deferred callbacks
void thread_deferred_callback_queue();

/// \brief Called when a TLS websocket connection is established, calls the connected callback
void on_conn_connected();
void on_conn_connected(ConnectionData* conn_data);

/// \brief Called when a TLS websocket connection is closed
void on_conn_close();
void on_conn_close(ConnectionData* conn_data);

/// \brief Called when a TLS websocket connection fails to be established
void on_conn_fail();
void on_conn_fail(ConnectionData* conn_data);

/// \brief When the connection can send data
void on_writable();
void on_conn_writable();

/// \brief Called when a message is received over the TLS websocket, calls the message callback
void on_message(std::string&& message);
void on_conn_message(std::string&& message);

/// \brief Requests a message write, awakes the websocket loop from 'poll'
void request_write();

void poll_message(const std::shared_ptr<WebsocketMessage>& msg);

/// \brief Function to handle the deferred callbacks
void handle_deferred_callback_queue();

/// \brief Add a callback to the queue of callbacks to be executed. All will be executed from a single thread
void push_deferred_callback(const std::function<void()>& callback);

// \brief Safely closes the already running connection threads
void safe_close_threads();

/// \brief Clears all messages and message queues both incoming and outgoing
void clear_all_queues();

private:
std::shared_ptr<EvseSecurity> evse_security;

// Connection related data
Everest::SteadyTimer reconnect_timer_tpm;
std::unique_ptr<std::thread> websocket_thread;
std::shared_ptr<ConnectionData> conn_data;
std::condition_variable conn_cv;

std::mutex queue_mutex;

std::queue<std::shared_ptr<WebsocketMessage>> message_queue;
std::condition_variable msg_send_cv;
std::mutex msg_send_cv_mutex;
// Queue of outgoing messages, notify thread only when we remove messages
SafeQueue<std::shared_ptr<WebsocketMessage>> message_queue;

std::unique_ptr<std::thread> recv_message_thread;
std::mutex recv_mutex;
std::queue<std::string> recv_message_queue;
std::condition_variable recv_message_cv;
SafeQueue<std::string> recv_message_queue;
std::string recv_buffered_message;

std::unique_ptr<std::thread> deferred_callback_thread;
std::queue<std::function<void()>> deferred_callback_queue;
std::mutex deferred_callback_mutex;
std::condition_variable deferred_callback_cv;
SafeQueue<std::function<void()>> deferred_callback_queue;
std::atomic_bool stop_deferred_handler;

OcppProtocolVersion connected_ocpp_version;
Expand Down
5 changes: 3 additions & 2 deletions include/ocpp/v201/connectivity_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ class ConnectivityManager {
///
void on_websocket_disconnected();

/// \brief Function invoked when the web socket closes
/// \brief Function invoked when the web socket stops connecting
/// and does not re-attempt to connect again
///
void on_websocket_closed(ocpp::WebsocketCloseReason reason);
void on_websocket_stopped_connecting(ocpp::WebsocketCloseReason reason);

///
/// \brief Get the active network configuration slot in use.
Expand Down
13 changes: 7 additions & 6 deletions lib/ocpp/common/websocket/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ Websocket::Websocket(const WebsocketConnectionOptions& connection_options, std::
Websocket::~Websocket() {
}

bool Websocket::connect() {
bool Websocket::start_connecting() {
this->logging->sys("Connecting");
return this->websocket->connect();
return this->websocket->start_connecting();
}

void Websocket::set_connection_options(const WebsocketConnectionOptions& connection_options) {
Expand Down Expand Up @@ -63,10 +63,11 @@ void Websocket::register_disconnected_callback(const std::function<void()>& call
});
}

void Websocket::register_closed_callback(const std::function<void(const WebsocketCloseReason reason)>& callback) {
this->closed_callback = callback;
this->websocket->register_closed_callback(
[this](const WebsocketCloseReason reason) { this->closed_callback(reason); });
void Websocket::register_stopped_connecting_callback(
const std::function<void(const WebsocketCloseReason reason)>& callback) {
this->stopped_connecting_callback = callback;
this->websocket->register_stopped_connecting_callback(
[this](const WebsocketCloseReason reason) { this->stopped_connecting_callback(reason); });
}

void Websocket::register_message_callback(const std::function<void(const std::string& message)>& callback) {
Expand Down
Loading

0 comments on commit 230ae1a

Please sign in to comment.