Skip to content

Commit

Permalink
Added thread safety to external usage of ConnectionData members
Browse files Browse the repository at this point in the history
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
AssemblyJohn committed Dec 4, 2024
1 parent 37d4cb5 commit 8cd27e2
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions lib/ocpp/common/websocket/websocket_libwebsockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,36 +143,60 @@ struct ConnectionData {
}

public:
void init_connection_context(lws_context* lws_ctx) {
std::lock_guard lock(this->mutex);
this->lws_ctx = std::unique_ptr<lws_context>(lws_ctx);
}

void init_security_context(SSL_CTX* ssl_ctx) {
std::lock_guard lock(this->mutex);
this->sec_context = std::unique_ptr<SSL_CTX>(ssl_ctx);
}

void init_connection(lws* lws) {
std::lock_guard lock(this->mutex);
this->wsi = lws;
}

lws* get_conn() {
std::lock_guard lock(this->mutex);
return wsi;
}

lws_context* get_context() {
std::lock_guard lock(this->mutex);
return lws_ctx.get();
}

WebsocketLibwebsockets* get_owner() {
std::lock_guard lock(this->mutex);
return owner;
}

public:
// This public block will only be used from client loop thread, no locking needed
private:
// Openssl context, must be destroyed in this order
std::unique_ptr<SSL_CTX> sec_context;
// libwebsockets state
std::unique_ptr<lws_context> lws_ctx;

// Internal used WSI
lws* wsi;
// Owner, set on creation
WebsocketLibwebsockets* owner;

private:
std::mutex mutex;

bool is_running;
EConnectionState state;

WebsocketLibwebsockets* owner;

private:
/// \brief Websocket client thread ID
std::thread::id websocket_client_thread_id;
/// \brief Websocket received message thread ID
std::thread::id websocket_recv_thread_id;

// Required for access of state
friend class WebsocketLibwebsockets;
};

struct WebsocketMessage {
Expand Down Expand Up @@ -593,7 +617,7 @@ bool WebsocketLibwebsockets::initialize_connection_options(std::shared_ptr<Conne
info.provided_client_ssl_ctx = ssl_ctx;

// Connection acquire the contexts
local_data->sec_context = std::unique_ptr<SSL_CTX>(ssl_ctx);
local_data->init_security_context(ssl_ctx);
}

lws_context* lws_ctx = lws_create_context(&info);
Expand All @@ -603,8 +627,7 @@ bool WebsocketLibwebsockets::initialize_connection_options(std::shared_ptr<Conne
}

// Conn acquire the lws context
local_data->lws_ctx = std::unique_ptr<lws_context>(lws_ctx);

local_data->init_connection_context(lws_ctx);
return true;
}

Expand Down Expand Up @@ -644,6 +667,7 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
}

auto& uri = this->connection_options.csms_uri;
lws* local_lws = nullptr;

// TODO: No idea who releases the strdup?
i.context = local_data->lws_ctx.get();
Expand All @@ -656,7 +680,7 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
i.protocol =
strdup(conversions::ocpp_protocol_version_to_string(this->connection_options.ocpp_version).c_str());
i.local_protocol_name = local_protocol_name;
i.pwsi = &local_data->wsi; // Will set the local_data->wsi to a valid value in case of a successful connect
i.pwsi = &local_lws; // Will set the local_data->wsi to a valid value in case of a successful connect
i.userdata = local_data.get(); // See lws_context 'user'

if (this->connection_options.iface.has_value()) {
Expand All @@ -675,6 +699,8 @@ void WebsocketLibwebsockets::thread_websocket_client_loop(std::shared_ptr<Connec
local_data->update_state(EConnectionState::ERROR);
on_conn_fail();
} else {
local_data->init_connection(local_lws);

// Process while we're running
int n = 0;
bool processing = true;
Expand Down Expand Up @@ -995,13 +1021,17 @@ static bool send_internal(lws* wsi, WebsocketMessage* msg) {

void WebsocketLibwebsockets::request_write() {
if (this->m_is_connected) {
std::scoped_lock lock(this->connection_mutex);
std::shared_ptr<ConnectionData> local_data = conn_data;

if (local_data != nullptr) {
if (local_data->get_conn()) {
// Notify waiting processing thread to wake up. According to docs it is ok to call from another
// thread.
lws_cancel_service(local_data->lws_ctx.get());
if (!local_data->is_interupted() && local_data->get_conn()) {
// Notify waiting processing thread to wake up. According to docs
// it is ok to call from another thread.
auto* context = local_data->get_context();
if (context) {
lws_cancel_service(context);
}
}
}
} else {
Expand Down

0 comments on commit 8cd27e2

Please sign in to comment.