From 5303ee89cc4bc02152e36750b8c4f7f12ac68eb5 Mon Sep 17 00:00:00 2001 From: kadin Date: Wed, 12 Feb 2025 18:07:09 -0700 Subject: [PATCH] Implement Mr. Stanton's comments --- src/common/heartbeat2/CMakeLists.txt | 28 ++-- .../heartbeat2/include/heartbeat_client.hpp | 55 +++++++ .../heartbeat2/include/heartbeat_server.hpp | 55 +++++++ src/common/heartbeat2/package.xml | 6 +- src/common/heartbeat2/src/client.cpp | 137 ------------------ .../heartbeat2/src/heartbeat_client.cpp | 113 +++++++++++++++ .../heartbeat2/src/heartbeat_server.cpp | 76 ++++++++++ src/common/heartbeat2/src/server.cpp | 94 ------------ 8 files changed, 313 insertions(+), 251 deletions(-) create mode 100644 src/common/heartbeat2/include/heartbeat_client.hpp create mode 100644 src/common/heartbeat2/include/heartbeat_server.hpp delete mode 100644 src/common/heartbeat2/src/client.cpp create mode 100644 src/common/heartbeat2/src/heartbeat_client.cpp create mode 100644 src/common/heartbeat2/src/heartbeat_server.cpp delete mode 100644 src/common/heartbeat2/src/server.cpp diff --git a/src/common/heartbeat2/CMakeLists.txt b/src/common/heartbeat2/CMakeLists.txt index fa73f6a..78f3743 100644 --- a/src/common/heartbeat2/CMakeLists.txt +++ b/src/common/heartbeat2/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.8) project(heartbeat2) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) + add_compile_options(-Wall -Wpedantic) endif() # find dependencies @@ -11,42 +11,38 @@ find_package(ament_cmake REQUIRED) # further dependencies manually. # find_package( REQUIRED) find_package(rclcpp REQUIRED) -find_package(builtin_interfaces REQUIRED) find_package(csm_common_interfaces REQUIRED) -find_package(std_msgs REQUIRED) find_package(std_srvs REQUIRED) find_package(Boost 1.40 REQUIRED COMPONENTS container) # add executables for ros to recognize -add_executable(client src/client.cpp) -target_include_directories(client PUBLIC +add_executable(heartbeat_client src/heartbeat_client.cpp) +target_include_directories(heartbeat_client PUBLIC $ $) ament_target_dependencies( - client + heartbeat_client rclcpp csm_common_interfaces - builtin_interfaces - std_msgs + std_srvs ) -target_include_directories(client PUBLIC ${Boost_INCLUDE_DIR}) -target_link_libraries(client ${Boost_LIBRARIES}) +target_include_directories(heartbeat_client PUBLIC ${Boost_INCLUDE_DIR}) +target_link_libraries(heartbeat_client ${Boost_LIBRARIES}) -add_executable(server src/server.cpp) -target_include_directories(server PUBLIC +add_executable(heartbeat_server src/heartbeat_server.cpp) +target_include_directories(heartbeat_server PUBLIC $ $) ament_target_dependencies( - server + heartbeat_server rclcpp csm_common_interfaces - builtin_interfaces std_srvs ) -install(TARGETS client +install(TARGETS heartbeat_client DESTINATION lib/${PROJECT_NAME}) -install(TARGETS server +install(TARGETS heartbeat_server DESTINATION lib/${PROJECT_NAME}) diff --git a/src/common/heartbeat2/include/heartbeat_client.hpp b/src/common/heartbeat2/include/heartbeat_client.hpp new file mode 100644 index 0000000..d7b9e26 --- /dev/null +++ b/src/common/heartbeat2/include/heartbeat_client.hpp @@ -0,0 +1,55 @@ +#ifndef HEARTBEAT_CLIENT_HPP +#define HEARTBEAT_CLIENT_HPP + +#include +#include +#include + +#include "csm_common_interfaces/msg/e_stop.hpp" + +class HeartbeatClient : public rclcpp::Node { + public: + /** + * @brief Construct a new Heartbeat Client object + * + */ + HeartbeatClient(); + + /** + * @brief Establishes connection to heartbeat server + * + */ + void handshake(); + + private: + /** + * @brief Checks for handshake response + * + */ + void handshake_callback(); + + /** + * @brief Sends requests to server and checks if responses are sent + * + */ + void ping_callback(); + + + // Time since last ping was received + unsigned int _lastResponseTime; + boost::circular_buffer>> _futures; + + // heartbeat and handshake + rclcpp::Client::SharedPtr handshakeClient; + rclcpp::Client::SharedPtr heartbeatClient; + + // storing handshake futures and setting up timer to send requests and check for responses + std::shared_future> _handshakeFuture; + rclcpp::TimerBase::SharedPtr handshakeTimer; + rclcpp::TimerBase::SharedPtr pingTimer; + + // the estop + rclcpp::Publisher::SharedPtr estopPub; +}; + +#endif // (HEARTBEAT_CLIENT_H) \ No newline at end of file diff --git a/src/common/heartbeat2/include/heartbeat_server.hpp b/src/common/heartbeat2/include/heartbeat_server.hpp new file mode 100644 index 0000000..65f7e0b --- /dev/null +++ b/src/common/heartbeat2/include/heartbeat_server.hpp @@ -0,0 +1,55 @@ +#ifndef HEARTBEAT_SERVER_HPP +#define HEARTBEAT_SERVER_HPP + +#include +#include + +#include "csm_common_interfaces/msg/e_stop.hpp" + +class HeartbeatServer : public rclcpp::Node { + public: + /** + * @brief Construct a new Heartbeat Server object + * + */ + HeartbeatServer(); + + private: + /** + * @brief Establishes connection with heartbeat client + * + * @param request empty request + * @param response empty response + */ + void handshake(std_srvs::srv::Empty::Request::SharedPtr request, std_srvs::srv::Empty::Response::SharedPtr response); + + /** + * @brief Watches if client stops sending requests + * + */ + void watchdog_callback(); + + /** + * @brief Processes requests from client + * + * @param request empty request + * @param response empty response + */ + void heartbeat_callback(std_srvs::srv::Empty::Request::SharedPtr request, std_srvs::srv::Empty::Response::SharedPtr response); + + // last time request was received + unsigned int _lastPingTime; + + // handshake and heartbeat + rclcpp::Service::SharedPtr handshakeSrv; + rclcpp::Service::SharedPtr heartbeatSrv; + + // timer to periodically check for pings and send responses + rclcpp::TimerBase::SharedPtr watchdog; + + // estop + rclcpp::Publisher::SharedPtr estopPub; +}; + + +#endif // (HEARTBEAT_SERVER_H) \ No newline at end of file diff --git a/src/common/heartbeat2/package.xml b/src/common/heartbeat2/package.xml index edd3666..a5cace1 100644 --- a/src/common/heartbeat2/package.xml +++ b/src/common/heartbeat2/package.xml @@ -3,7 +3,7 @@ heartbeat2 0.0.0 - establish a heartbeat between drivestation and ROV + Establish a heartbeat between drivestation and ROV kadin Apache-2.0 @@ -13,11 +13,9 @@ ament_lint_common rclcpp - builtin_interfaces - std_msgs csm_common_interfaces std_srvs - boost/circular_buffer + boost diff --git a/src/common/heartbeat2/src/client.cpp b/src/common/heartbeat2/src/client.cpp deleted file mode 100644 index 48b0f39..0000000 --- a/src/common/heartbeat2/src/client.cpp +++ /dev/null @@ -1,137 +0,0 @@ -#include -#include -#include -#include -#include - -#include "csm_common_interfaces/srv/heartbeat.hpp" -#include "csm_common_interfaces/msg/e_stop.hpp" - -class Client : public rclcpp::Node { -public: - Client() : Node("heartbeat_client") { - // initializing circular buffer to store request futures - _futures = new boost::circular_buffer>>(10); - - // creating subscriber using built in time message interface because it was built in. - handshakeClient = this->create_client("handshake"); - heartbeatClient = this->create_client("heartbeat_srv"); - - // create estop publisher - estopPub = this->create_publisher("estop", 10); - - // creating timers - handshakeTimer = this->create_wall_timer(std::chrono::milliseconds(10), std::bind(&Client::handshake_callback, this)); - handshakeTimer->cancel(); - pingTimer = this->create_wall_timer(std::chrono::milliseconds(300), std::bind(&Client::ping_callback, this)); - pingTimer->cancel(); - } - - void handshake () { - // checking if service is available or waiting for service to be availble - RCLCPP_INFO(this->get_logger(), "Waiting for drivestation."); - while (!handshakeClient->wait_for_service(std::chrono::milliseconds(10))) { - if (!rclcpp::ok()) { - RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting"); - exit(1); - } - } - RCLCPP_INFO(this->get_logger(), "Found drivestation"); - - // Make handshake request and send - auto request = std::make_shared(); - request->milliseconds = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - _handshakeFuture = std::move(handshakeClient->async_send_request(request).future.share()); - - // wait for response - handshakeTimer->reset(); - } - -private: - void handshake_callback() { - // check for handshake response - if (_handshakeFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) { - auto response = _handshakeFuture.get(); - _lastResponseTime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - RCLCPP_INFO(this->get_logger(), "Connected to drivestation"); - handshakeTimer->cancel(); - pingTimer->reset(); - } - if (!rclcpp::ok()) { - RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting for response"); - exit(2); - } - } - - - void ping_callback() { - unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - // send a request every 300ms - if (heartbeatClient->wait_for_service(std::chrono::milliseconds(1))) { - auto request = std::make_shared(); - request->milliseconds = currentMillisec; - _futures->push_back(std::move(heartbeatClient->async_send_request(request).future.share())); - - // debug - // RCLCPP_INFO(this->get_logger(), "Sent ping at %u", currentMillisec); - } - - // check for responses - int futuresSize = _futures->size(); - int increment = 0; - std::shared_ptr response; - for (int i = 0; i < futuresSize; i++) { - if (_futures->at(increment).wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) { - // if future is ready, check if the time sent is latest, and update latest time - response = _futures->at(increment).get(); - _futures->erase(_futures->begin() + increment); - _lastResponseTime = currentMillisec; - // debug - // RCLCPP_INFO(this->get_logger(), "Received response at: %i", int(response->milliseconds)); - } else { - increment += 1; - } - } - - // check if last response has exceeded 1000ms second, the place where disconnection happens - if (currentMillisec - _lastResponseTime > 1000) { - // do something is drop was detected - RCLCPP_FATAL(this->get_logger(), "A drop has been detected, time since last ping: %ums", currentMillisec - _lastResponseTime); - csm_common_interfaces::msg::EStop estop; - estop.is_fatal = true; - estopPub->publish(estop); - exit(3); - } - } - - - // Time since last ping was received - unsigned int _lastResponseTime; - boost::circular_buffer>>* _futures; - - // heartbeat and handshake - rclcpp::Client::SharedPtr handshakeClient; - rclcpp::Client::SharedPtr heartbeatClient; - - // storing handshake futures and setting up timer to send requests and check for responses - std::shared_future> _handshakeFuture; - rclcpp::TimerBase::SharedPtr handshakeTimer; - rclcpp::TimerBase::SharedPtr pingTimer; - - // the estop - rclcpp::Publisher::SharedPtr estopPub; -}; - - -int main(int argc, char ** argv) { - rclcpp::init(argc, argv); - std::shared_ptr node = std::make_shared(); - - // start handshake to check connection to producer - node->handshake(); - - // spin the node - rclcpp::spin(node); - rclcpp::shutdown(); - return 0; -} \ No newline at end of file diff --git a/src/common/heartbeat2/src/heartbeat_client.cpp b/src/common/heartbeat2/src/heartbeat_client.cpp new file mode 100644 index 0000000..2e0a93c --- /dev/null +++ b/src/common/heartbeat2/src/heartbeat_client.cpp @@ -0,0 +1,113 @@ +#include +#include +#include +#include +#include +#include + +#include "csm_common_interfaces/msg/e_stop.hpp" +#include "heartbeat_client.hpp" + +HeartbeatClient::HeartbeatClient() : Node("heartbeat_client") { + // initializing circular buffer to store request futures + _futures = boost::circular_buffer>>(10); + + // creating subscriber using built in time message interface because it was built in. + handshakeClient = this->create_client("handshake"); + heartbeatClient = this->create_client("heartbeat_srv"); + + // create estop publisher + estopPub = this->create_publisher("estop", 10); + + // creating timers + handshakeTimer = this->create_wall_timer(std::chrono::milliseconds(10), std::bind(&HeartbeatClient::handshake_callback, this)); + handshakeTimer->cancel(); + pingTimer = this->create_wall_timer(std::chrono::milliseconds(300), std::bind(&HeartbeatClient::ping_callback, this)); + pingTimer->cancel(); +} + +void HeartbeatClient::handshake () { + // checking if service is available or waiting for service to be availble + RCLCPP_INFO(this->get_logger(), "Waiting for drivestation."); + while (!handshakeClient->wait_for_service(std::chrono::milliseconds(10))) { + if (!rclcpp::ok()) { + RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting"); + exit(1); + } + } + RCLCPP_INFO(this->get_logger(), "Found drivestation"); + + // Make handshake request and send + auto request = std::make_shared(); + _handshakeFuture = std::move(handshakeClient->async_send_request(request).future.share()); + + // wait for response + handshakeTimer->reset(); +} + +void HeartbeatClient::handshake_callback() { + // check for handshake response + if (_handshakeFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::ready) { + auto response = _handshakeFuture.get(); + _lastResponseTime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + RCLCPP_INFO(this->get_logger(), "Connected to drivestation"); + handshakeTimer->cancel(); + pingTimer->reset(); + } + if (!rclcpp::ok()) { + RCLCPP_ERROR(this->get_logger(), "Interrupted while waiting for response"); + exit(2); + } +} + + +void HeartbeatClient::ping_callback() { + unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + // send a request every 300ms + if (heartbeatClient->wait_for_service(std::chrono::milliseconds(1))) { + auto request = std::make_shared(); + _futures.push_back(std::move(heartbeatClient->async_send_request(request).future.share())); + + // debug + // RCLCPP_INFO(this->get_logger(), "Sent ping at %u", currentMillisec); + } + + // check for responses + int futuresSize = _futures.size(); + int increment = 0; + for (int i = 0; i < futuresSize; i++) { + if (_futures.at(increment).wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) { + // if future is ready, check if the time sent is latest, and update latest time + _futures.erase(_futures.begin() + increment); + _lastResponseTime = currentMillisec; + // debug + // RCLCPP_INFO(this->get_logger(), "Received response at: %i", currentMillisec); + } else { + increment += 1; + } + } + + // check if last response has exceeded 1000ms second, the place where disconnection happens + if (currentMillisec - _lastResponseTime > 1000) { + // do something is drop was detected + RCLCPP_FATAL(this->get_logger(), "A drop has been detected, time since last ping: %ums", currentMillisec - _lastResponseTime); + csm_common_interfaces::msg::EStop estop; + estop.is_fatal = true; + estopPub->publish(estop); + exit(3); + } +} + + +int main(int argc, char ** argv) { + rclcpp::init(argc, argv); + std::shared_ptr node = std::make_shared(); + + // start handshake to check connection to producer + node->handshake(); + + // spin the node + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} \ No newline at end of file diff --git a/src/common/heartbeat2/src/heartbeat_server.cpp b/src/common/heartbeat2/src/heartbeat_server.cpp new file mode 100644 index 0000000..6749a1d --- /dev/null +++ b/src/common/heartbeat2/src/heartbeat_server.cpp @@ -0,0 +1,76 @@ +#include +#include +#include +#include + +#include "csm_common_interfaces/msg/e_stop.hpp" +#include "heartbeat_server.hpp" + +HeartbeatServer::HeartbeatServer() : Node("heartbeat_producer") { + // service to receive pings + heartbeatSrv = this->create_service("heartbeat_srv", std::bind(&HeartbeatServer::heartbeat_callback, this, std::placeholders::_1, std::placeholders::_2)); + + // Setting up handshake + RCLCPP_INFO(this->get_logger(), "Waiting for ROV"); + handshakeSrv = this->create_service("handshake", std::bind(&HeartbeatServer::handshake, this, std::placeholders::_1, std::placeholders::_2)); + + // create timers + watchdog = this->create_wall_timer(std::chrono::milliseconds(300), std::bind(&HeartbeatServer::watchdog_callback, this)); + watchdog->cancel(); + + // create estop publisher + estopPub = this->create_publisher("estop", 10); +} + +void HeartbeatServer::handshake(std_srvs::srv::Empty::Request::SharedPtr request, std_srvs::srv::Empty::Response::SharedPtr response) { + // send response to handshake request to start heartbeat + unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + _lastPingTime = currentMillisec; + RCLCPP_INFO(this->get_logger(), "Connected to ROV"); + + // initialize the timer to watch the incoming pings, check if ROV has been momentarily disconnected and reconnected + if (watchdog->is_canceled() == true) { + watchdog->reset(); + } else { + RCLCPP_WARN(this->get_logger(), "A drop was detected, connection has been restablished"); + } +} + + +void HeartbeatServer::watchdog_callback() { + // check to see if the time since last ping received exceeds 1000ms. + unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + if (currentMillisec > _lastPingTime && currentMillisec - _lastPingTime > 1000) { + // do something if drop was detected + RCLCPP_FATAL(this->get_logger(), "A drop has been detected, time since last ping: %ims", currentMillisec - _lastPingTime); + csm_common_interfaces::msg::EStop estop; + estop.is_fatal = true; + estopPub->publish(estop); + watchdog->cancel(); + exit(3); + } +} + +void HeartbeatServer::heartbeat_callback(std_srvs::srv::Empty::Request::SharedPtr request, std_srvs::srv::Empty::Response::SharedPtr response) { + // check if any previous connection has been made + if (watchdog->is_canceled() == true) { + RCLCPP_INFO(this->get_logger(), "Connected to ROV"); + watchdog->reset(); + } + + // Receive request and log time request received + unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + _lastPingTime = currentMillisec; + + // debug + // RCLCPP_INFO(this->get_logger(), "Received ping at %i", currentMillisec); +} + + +int main(int argc, char ** argv) { + rclcpp::init(argc, argv); + rclcpp::spin(std::make_shared()); + rclcpp::shutdown(); + return 0; +} + diff --git a/src/common/heartbeat2/src/server.cpp b/src/common/heartbeat2/src/server.cpp deleted file mode 100644 index fa0fa28..0000000 --- a/src/common/heartbeat2/src/server.cpp +++ /dev/null @@ -1,94 +0,0 @@ -#include -#include -#include - -#include "csm_common_interfaces/msg/e_stop.hpp" -#include "csm_common_interfaces/srv/heartbeat.hpp" - -class Server : public rclcpp::Node { -public: - Server() : Node("heartbeat_producer") { - // service to receive pings - heartbeatSrv = this->create_service("heartbeat_srv", std::bind(&Server::heartbeat_callback, this, std::placeholders::_1, std::placeholders::_2)); - - // Setting up handshake - RCLCPP_INFO(this->get_logger(), "Waiting for ROV"); - handshakeSrv = this->create_service("handshake", std::bind(&Server::handshake, this, std::placeholders::_1, std::placeholders::_2)); - - // create timers - watchdog = this->create_wall_timer(std::chrono::milliseconds(300), std::bind(&Server::watchdog_callback, this)); - watchdog->cancel(); - - // create estop publisher - estopPub = this->create_publisher("estop", 10); - } - -private: - void handshake(csm_common_interfaces::srv::Heartbeat::Request::SharedPtr request, csm_common_interfaces::srv::Heartbeat::Response::SharedPtr response) { - // send response to handshake request to start heartbeat - unsigned int millisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - response->milliseconds = request->milliseconds; - _lastPingTime = millisec; - RCLCPP_INFO(this->get_logger(), "Connected to ROV"); - - // initialize the timer to watch the incoming pings, check if ROV has been momentarily disconnected and reconnected - if (watchdog->is_canceled() == true) { - watchdog->reset(); - } else { - RCLCPP_WARN(this->get_logger(), "A drop was detected, connection has been restablished"); - } - } - - - void watchdog_callback() { - // check to see if the time since last ping received exceeds 1000ms. - unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - if (currentMillisec > _lastPingTime && currentMillisec - _lastPingTime > 1000) { - // do something if drop was detected - RCLCPP_FATAL(this->get_logger(), "A drop has been detected, time since last ping: %ims", currentMillisec - _lastPingTime); - csm_common_interfaces::msg::EStop estop; - estop.is_fatal = true; - estopPub->publish(estop); - watchdog->cancel(); - exit(3); - } - } - - void heartbeat_callback(csm_common_interfaces::srv::Heartbeat::Request::SharedPtr request, csm_common_interfaces::srv::Heartbeat::Response::SharedPtr response) { - // check if any previous connection has been made - if (watchdog->is_canceled() == true) { - RCLCPP_INFO(this->get_logger(), "Connected to ROV"); - watchdog->reset(); - } - - // Receive request and log request - unsigned int currentMillisec = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - _lastPingTime = currentMillisec; - - // debug - // RCLCPP_INFO(this->get_logger(), "Received ping at %i --- Time Elapsed: %i", int(request->milliseconds), int(currentMillisec - request->milliseconds)); - - // send request time back to client - response->milliseconds = request->milliseconds; - } - - - unsigned int _lastPingTime; - csm_common_interfaces::srv::Heartbeat::Request _lastPing; - rclcpp::Service::SharedPtr handshakeSrv; - rclcpp::Service::SharedPtr heartbeatSrv; - - // timer to periodically check for pings and send responses - rclcpp::TimerBase::SharedPtr watchdog; - - // estop - rclcpp::Publisher::SharedPtr estopPub; -}; - -int main(int argc, char ** argv) { - rclcpp::init(argc, argv); - rclcpp::spin(std::make_shared()); - rclcpp::shutdown(); - return 0; -} -