Skip to content

Commit

Permalink
Libwebsockets: Fix message loss on reconnect (#457)
Browse files Browse the repository at this point in the history
* Libwebsockets: Fix message loss on reconnect
* Run closing callback in extra thread
* add more threads around callbacks
* Fixed message sending queue logic
* Clear pending message queue on a new connection
* Removed some unecesary mutex locks
* Stopping background timer on a new connect

Signed-off-by: Cornelius Claussen <[email protected]>
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
corneliusclaussen authored Feb 14, 2024
1 parent 94902c2 commit a1fe09b
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 71 deletions.
10 changes: 5 additions & 5 deletions include/ocpp/common/websocket/websocket_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ enum class ConnectionFailedReason {
///
class WebsocketBase {
protected:
bool m_is_connected;
std::atomic_bool m_is_connected;
WebsocketConnectionOptions connection_options;
std::function<void(const int security_profile)> connected_callback;
std::function<void()> disconnected_callback;
Expand All @@ -59,11 +59,11 @@ class WebsocketBase {
websocketpp::connection_hdl handle;
std::mutex reconnect_mutex;
std::mutex connection_mutex;
long reconnect_backoff_ms;
std::atomic_int reconnect_backoff_ms;
websocketpp::transport::timer_handler reconnect_callback;
int connection_attempts;
bool shutting_down;
bool reconnecting;
std::atomic_int connection_attempts;
std::atomic_bool shutting_down;
std::atomic_bool reconnecting;

/// \brief Indicates if the required callbacks are registered
/// \returns true if the websocket is properly initialized
Expand Down
4 changes: 3 additions & 1 deletion include/ocpp/common/websocket/websocket_tls_tpm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class WebsocketTlsTPM final : public WebsocketBase {

void request_write();

void poll_message(const std::shared_ptr<WebsocketMessage>& msg, bool wait_sendaf);
void poll_message(const std::shared_ptr<WebsocketMessage>& msg);

private:
std::shared_ptr<EvseSecurity> evse_security;
Expand All @@ -79,8 +79,10 @@ class WebsocketTlsTPM final : public WebsocketBase {
std::condition_variable conn_cv;

std::mutex queue_mutex;

std::queue<std::shared_ptr<WebsocketMessage>> message_queue;
std::condition_variable msg_send_cv;
std::mutex msg_send_cv_mutex;

std::unique_ptr<std::thread> recv_message_thread;
std::mutex recv_mutex;
Expand Down
Loading

0 comments on commit a1fe09b

Please sign in to comment.