From 2c9e00f2daa7e57be17963bc22365344e4a6e1ec Mon Sep 17 00:00:00 2001 From: Rein Appeldoorn Date: Thu, 6 Jun 2024 10:47:38 +0200 Subject: [PATCH] feat: reconnect on read errors --- README.md | 10 +- .../socketcan_bridge.hpp | 9 +- src/socketcan_bridge.cpp | 110 ++++++++++++------ src/socketcan_bridge_node.cpp | 2 +- 4 files changed, 91 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index fa0a435..fcf5433 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,15 @@ The node `socketcan_bridge_ee` is also provided that uses the `EventsExecutor` t #### Parameters -* `interface` +* `interface` (default=`can0`) Name of the SocketCAN device, by default these devices are named can0 and upwards. -* `read_timeout` - Maximum duration to wait for data on the file descriptor +* `read_timeout` (default=`1.0`) + Maximum duration in seconds to wait for data on the file descriptor +* `reconnect_timeout` (default=`5.0`) + Sleep duration in seconds before reconnecting to the SocketCAN device [can_msgs/Frame]: https://github.com/ros-industrial/ros_canopen/blob/dashing-devel/can_msgs/msg/Frame.msg [socketcan_bridge]: https://wiki.ros.org/socketcan_bridge -[ros2_socketcan]: https://github.com/autowarefoundation/ros2_socketcan \ No newline at end of file +[ros2_socketcan]: https://github.com/autowarefoundation/ros2_socketcan diff --git a/include/ros2_socketcan_bridge/socketcan_bridge.hpp b/include/ros2_socketcan_bridge/socketcan_bridge.hpp index aea7ab2..77f6c78 100644 --- a/include/ros2_socketcan_bridge/socketcan_bridge.hpp +++ b/include/ros2_socketcan_bridge/socketcan_bridge.hpp @@ -28,7 +28,7 @@ class SocketCanBridge SocketCanBridge( const rclcpp::Logger & logger, rclcpp::Clock::SharedPtr clock, const std::string & interface, - double read_timeout, const CanCallback & receive_callback); + double read_timeout, double reconnect_timeout, const CanCallback & receive_callback); ~SocketCanBridge(); @@ -42,10 +42,15 @@ class SocketCanBridge void close(); private: - void receive_loop(std::stop_token stoken) const; + void connect(); + void ensure_connection(std::stop_token stoken); + void receive_loop(std::stop_token stoken); rclcpp::Logger logger_; rclcpp::Clock::SharedPtr clock_; + std::string interface_; + double read_timeout_; + double reconnect_timeout_; int socket_; CanCallback receive_callback_; std::jthread receive_thread_; diff --git a/src/socketcan_bridge.cpp b/src/socketcan_bridge.cpp index 177388c..146aa31 100644 --- a/src/socketcan_bridge.cpp +++ b/src/socketcan_bridge.cpp @@ -28,38 +28,15 @@ std::ostream & operator<<(std::ostream & os, const can_msgs::msg::Frame & msg) SocketCanBridge::SocketCanBridge( const rclcpp::Logger & logger, rclcpp::Clock::SharedPtr clock, const std::string & interface, - double read_timeout, const CanCallback & receive_callback) -: logger_(logger), clock_(clock), receive_callback_(receive_callback) + double read_timeout, double reconnect_timeout, const CanCallback & receive_callback) +: logger_(logger), + clock_(clock), + interface_(interface), + read_timeout_(read_timeout), + reconnect_timeout_(reconnect_timeout), + receive_callback_(receive_callback) { using std::placeholders::_1; - - if ((socket_ = socket(PF_CAN, SOCK_RAW, CAN_RAW)) < 0) { - throw std::system_error(errno, std::generic_category()); - } - - // When SIGINT is received, the program needs to quit. But a `read()` call can't be interrupted. - // So I use SO_RCVTIMEO to make all reads return within `read_timeout` seconds. - auto microseconds = std::lround(read_timeout * 1'000'000); - timeval tv; - tv.tv_sec = microseconds / 1'000'000; - tv.tv_usec = microseconds - (tv.tv_sec * 1'000'000); - setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv); - - ifreq ifr; - strncpy(ifr.ifr_name, interface.c_str(), sizeof(ifr.ifr_name)); - if (ioctl(socket_, SIOCGIFINDEX, &ifr) < 0) { - throw std::system_error(errno, std::generic_category()); - } - - sockaddr_can addr; - memset(&addr, 0, sizeof(addr)); - addr.can_family = AF_CAN; - addr.can_ifindex = ifr.ifr_ifindex; - - if (bind(socket_, reinterpret_cast(&addr), sizeof(addr)) < 0) { - throw std::system_error(errno, std::generic_category()); - } - receive_thread_ = std::jthread{std::bind(&SocketCanBridge::receive_loop, this, _1)}; } @@ -98,15 +75,82 @@ void SocketCanBridge::close() } } -void SocketCanBridge::receive_loop(std::stop_token stoken) const +void SocketCanBridge::connect() +{ + RCLCPP_INFO(logger_, "Connecting to the CAN interface %s ..", interface_.c_str()); + + if ((socket_ = socket(PF_CAN, SOCK_RAW, CAN_RAW)) < 0) { + throw std::system_error(errno, std::generic_category()); + } + + // When SIGINT is received, the program needs to quit. But a `read()` call can't be interrupted. + // So I use SO_RCVTIMEO to make all reads return within `read_timeout` seconds. + auto microseconds = std::lround(read_timeout_ * 1'000'000); + timeval tv; + tv.tv_sec = microseconds / 1'000'000; + tv.tv_usec = microseconds - (tv.tv_sec * 1'000'000); + if (setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) < 0) { + throw std::system_error(errno, std::generic_category()); + } + + ifreq ifr; + strncpy(ifr.ifr_name, interface_.c_str(), sizeof(ifr.ifr_name)); + if (ioctl(socket_, SIOCGIFINDEX, &ifr) < 0) { + throw std::system_error(errno, std::generic_category()); + } + + sockaddr_can addr; + memset(&addr, 0, sizeof(addr)); + addr.can_family = AF_CAN; + addr.can_ifindex = ifr.ifr_ifindex; + + if (bind(socket_, reinterpret_cast(&addr), sizeof(addr)) < 0) { + throw std::system_error(errno, std::generic_category()); + } + + RCLCPP_INFO(logger_, "Connected to the CAN interface %s", interface_.c_str()); +} + +void SocketCanBridge::ensure_connection(std::stop_token stoken) +{ + while (!stoken.stop_requested()) { + try { + connect(); + break; + } catch (const std::system_error & e) { + RCLCPP_ERROR( + logger_, "Error connecting to %s: %s, retrying in %.2f seconds", interface_.c_str(), + e.what(), reconnect_timeout_); + clock_->sleep_for(rclcpp::Duration::from_seconds(reconnect_timeout_)); + } + } +} + +void SocketCanBridge::receive_loop(std::stop_token stoken) { RCLCPP_INFO(logger_, "Receive loop started"); + + ensure_connection(stoken); + while (!stoken.stop_requested()) { + RCLCPP_DEBUG(logger_, "Waiting for a CAN frame .."); can_frame frame; auto nbytes = read(socket_, &frame, sizeof(can_frame)); + RCLCPP_DEBUG(logger_, "Received %zd bytes", nbytes); + if (nbytes < 0) { - if (errno == EAGAIN) continue; - throw std::system_error(errno, std::generic_category()); + if (errno == EAGAIN) { + RCLCPP_DEBUG( + logger_, "Error reading from the socket: %s (%d)", strerror(errno), + static_cast(errno)); + continue; + } + RCLCPP_ERROR( + logger_, "Error reading from the socket: %s (%d), reconnecting in %.2f seconds ..", + strerror(errno), static_cast(errno), reconnect_timeout_); + clock_->sleep_for(rclcpp::Duration::from_seconds(reconnect_timeout_)); + ensure_connection(stoken); + continue; } if (nbytes != sizeof(frame)) { RCLCPP_ERROR(logger_, "Incomplete CAN frame received, skipping"); diff --git a/src/socketcan_bridge_node.cpp b/src/socketcan_bridge_node.cpp index 50c4a56..aa0b4cf 100644 --- a/src/socketcan_bridge_node.cpp +++ b/src/socketcan_bridge_node.cpp @@ -14,7 +14,7 @@ SocketCanBridgeNode::SocketCanBridgeNode(const rclcpp::NodeOptions & options) can_pub(this->create_publisher("~/rx", 100)), bridge( this->get_logger(), this->get_clock(), this->declare_parameter("interface", "can0"), - this->declare_parameter("read_timeout", 1.0), + this->declare_parameter("read_timeout", 1.0), this->declare_parameter("reconnect_timeout", 5.0), [this](const can_msgs::msg::Frame & msg) { can_pub->publish(msg); }), can_sub(this->create_subscription( "~/tx", 100, [this](can_msgs::msg::Frame::ConstSharedPtr msg) { bridge.send(*msg); }))