Skip to content

Commit

Permalink
Fixed reconnect attempt, fixed sys error on timer dtor (#443)
Browse files Browse the repository at this point in the history
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
AssemblyJohn authored Feb 2, 2024
1 parent fbc95ee commit 72c7a1f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 49 deletions.
2 changes: 1 addition & 1 deletion include/ocpp/common/websocket/websocket_tls_tpm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class WebsocketTlsTPM final : public WebsocketBase {
std::shared_ptr<EvseSecurity> evse_security;

// Connection related data
std::unique_ptr<Everest::SteadyTimer> reconnect_timer_tpm;
Everest::SteadyTimer reconnect_timer_tpm;
std::unique_ptr<std::thread> websocket_thread;
std::shared_ptr<ConnectionData> conn_data;
std::condition_variable conn_cv;
Expand Down
117 changes: 69 additions & 48 deletions lib/ocpp/common/websocket/websocket_tls_tpm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ static constexpr int LWS_CLOSE_SOCKET_RESPONSE_MESSAGE = -1;

/// \brief Per thread connection data
struct ConnectionData {
ConnectionData() : is_running(true), state(EConnectionState::INITIALIZE), wsi(nullptr), owner(nullptr) {
ConnectionData() :
is_running(true), is_marked_close(false), state(EConnectionState::INITIALIZE), wsi(nullptr), owner(nullptr) {
}

~ConnectionData() {
Expand All @@ -77,6 +78,10 @@ struct ConnectionData {
is_running = false;
}

void request_close() {
is_marked_close = true;
}

bool is_interupted() {
return (is_running == false);
}
Expand All @@ -85,6 +90,10 @@ struct ConnectionData {
return (state == EConnectionState::CONNECTING);
}

bool is_close_requested() {
return is_marked_close;
}

auto get_state() {
return state;
}
Expand All @@ -111,6 +120,7 @@ struct ConnectionData {
private:
std::thread::id lws_thread_id;
bool is_running;
bool is_marked_close;
EConnectionState state;
};

Expand Down Expand Up @@ -488,6 +498,25 @@ bool WebsocketTlsTPM::connect() {
this->recv_message_thread->join();
}

// Bind reconnect callback
this->reconnect_callback = [this](const websocketpp::lib::error_code& ec) {
EVLOG_info << "Reconnecting to TLS websocket at uri: " << this->connection_options.csms_uri.string()
<< " with security profile: " << this->connection_options.security_profile;

// close connection before reconnecting
if (this->m_is_connected) {
EVLOG_info << "Closing websocket connection before reconnecting";
this->close(websocketpp::close::status::abnormal_close, "Reconnect");
}

{
std::lock_guard<std::mutex> lk(this->reconnect_mutex);
this->reconnect_timer_tpm.stop();
}

this->connect();
};

std::unique_lock<std::mutex> lock(connection_mutex);

// Release other threads
Expand Down Expand Up @@ -517,31 +546,12 @@ bool WebsocketTlsTPM::connect() {
this->conn_data.reset();
}

this->reconnect_callback = [this](const websocketpp::lib::error_code& ec) {
EVLOG_info << "Reconnecting to TLS websocket at uri: " << this->connection_options.csms_uri.string()
<< " with security profile: " << this->connection_options.security_profile;

// close connection before reconnecting
if (this->m_is_connected) {
EVLOG_info << "Closing websocket connection before reconnecting";
this->close(websocketpp::close::status::abnormal_close, "Reconnect");
}

{
std::lock_guard<std::mutex> lk(this->reconnect_mutex);
if (this->reconnect_timer_tpm) {
this->reconnect_timer_tpm->stop();
}
this->reconnect_timer_tpm = nullptr;
}

this->connect();
};

return (connected);
}

void WebsocketTlsTPM::reconnect(std::error_code reason, long delay) {
EVLOG_info << "Attempting TLS TPM reconnect with reason: " << reason << " and delay:" << delay;

if (this->shutting_down) {
EVLOG_info << "Not reconnecting because the websocket is being shutdown.";
return;
Expand All @@ -553,31 +563,25 @@ void WebsocketTlsTPM::reconnect(std::error_code reason, long delay) {
this->close(websocketpp::close::status::abnormal_close, "Reconnect");
}

if (!this->reconnect_timer_tpm) {
EVLOG_info << "Reconnecting in: " << delay << "ms"
<< ", attempt: " << this->connection_attempts;
EVLOG_info << "Reconnecting in: " << delay << "ms"
<< ", attempt: " << this->connection_attempts;

this->reconnect_timer_tpm = std::make_unique<Everest::SteadyTimer>(
[this]() { this->reconnect_callback(websocketpp::lib::error_code()); });
this->reconnect_timer_tpm->timeout(std::chrono::seconds(delay));
} else {
EVLOG_info << "Reconnect timer already running";
}
this->reconnect_timer_tpm.timeout([this]() { this->reconnect_callback(websocketpp::lib::error_code()); },
std::chrono::milliseconds(delay));
}

void WebsocketTlsTPM::close(websocketpp::close::status::value code, const std::string& reason) {
EVLOG_info << "Closing TLS TPM websocket with reason: " << reason;

{
std::lock_guard<std::mutex> lk(this->reconnect_mutex);
if (this->reconnect_timer_tpm) {
this->reconnect_timer_tpm->stop();
}
this->reconnect_timer_tpm = nullptr;
this->reconnect_timer_tpm.stop();
}

if (conn_data) {
if (auto* data = conn_data.get()) {
// Set the trigger from us
data->request_close();
data->do_interrupt();
}

Expand Down Expand Up @@ -617,6 +621,7 @@ void WebsocketTlsTPM::on_conn_fail() {
if (this->m_is_connected) {
this->disconnected_callback();
}

this->m_is_connected = false;
this->connection_attempts += 1;

Expand Down Expand Up @@ -837,15 +842,6 @@ int WebsocketTlsTPM::process_callback(void* wsi_ptr, int callback_reason, void*
return LWS_CLOSE_SOCKET_RESPONSE_MESSAGE;
}

// If we are interrupted, close the socket cleanly
if (data->is_interupted()) {
EVLOG_info << "Conn interrupted/closed, closing socket!";

// Set the normal reason if we are interrupted
lws_close_reason(data->get_conn(), LWS_CLOSE_STATUS_NORMAL, NULL, 0);
return LWS_CLOSE_SOCKET_RESPONSE_MESSAGE;
}

switch (reason) {
// TODO: If required in the future
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
Expand Down Expand Up @@ -918,15 +914,34 @@ int WebsocketTlsTPM::process_callback(void* wsi_ptr, int callback_reason, void*
std::string close_reason(reinterpret_cast<char*>(in), len);
unsigned char* pp = reinterpret_cast<unsigned char*>(in);
unsigned short close_code = (unsigned short)((pp[0] << 8) | pp[1]);
EVLOG_info << "Client close reason: " << close_reason << " close code: " << close_code;

EVLOG_info << "Unsolicited client close reason: " << close_reason << " close code: " << close_code;

if (close_code != LWS_CLOSE_STATUS_NORMAL) {
data->update_state(EConnectionState::ERROR);
data->do_interrupt();
on_conn_fail();
}

// Return 0 to print peer close reason
return 0;
}

case LWS_CALLBACK_CLIENT_CLOSED:
data->update_state(EConnectionState::FINALIZED);
data->do_interrupt();
on_conn_close();
EVLOG_info << "Client closed, was requested: " << data->is_close_requested();

// Determine if the close connection was requested or if the server went away
if (data->is_close_requested()) {
data->update_state(EConnectionState::FINALIZED);
data->do_interrupt();
on_conn_close();
} else {
// It means the server went away, attempt to reconnect
data->update_state(EConnectionState::ERROR);
data->do_interrupt();
on_conn_fail();
}

break;

case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
Expand Down Expand Up @@ -965,6 +980,12 @@ int WebsocketTlsTPM::process_callback(void* wsi_ptr, int callback_reason, void*
break;
}

// If we are interrupted, close the socket cleanly
if (data->is_interupted()) {
EVLOG_info << "Conn interrupted/closed, closing socket!";
return LWS_CLOSE_SOCKET_RESPONSE_MESSAGE;
}

// Return -1 on fatal error (-1 is request to close the socket)
return 0;
}
Expand Down

0 comments on commit 72c7a1f

Please sign in to comment.