Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect on read errors [HARVEY2-24] #6

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ The node `socketcan_bridge_ee` is also provided that uses the `EventsExecutor` t

#### Parameters

* `interface`
* `interface` (default=`can0`)
Name of the SocketCAN device, by default these devices are named can0 and upwards.
* `read_timeout`
Maximum duration to wait for data on the file descriptor
* `read_timeout` (default=`1.0`)
Maximum duration in seconds to wait for data on the file descriptor
* `reconnect_timeout` (default=`5.0`)
Sleep duration in seconds before reconnecting to the SocketCAN device

[can_msgs/Frame]: https://github.com/ros-industrial/ros_canopen/blob/dashing-devel/can_msgs/msg/Frame.msg

[socketcan_bridge]: https://wiki.ros.org/socketcan_bridge

[ros2_socketcan]: https://github.com/autowarefoundation/ros2_socketcan
[ros2_socketcan]: https://github.com/autowarefoundation/ros2_socketcan
9 changes: 7 additions & 2 deletions include/ros2_socketcan_bridge/socketcan_bridge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SocketCanBridge

SocketCanBridge(
const rclcpp::Logger & logger, rclcpp::Clock::SharedPtr clock, const std::string & interface,
double read_timeout, const CanCallback & receive_callback);
double read_timeout, double reconnect_timeout, const CanCallback & receive_callback);

~SocketCanBridge();

Expand All @@ -42,10 +42,15 @@ class SocketCanBridge
void close();

private:
void receive_loop(std::stop_token stoken) const;
void connect();
void ensure_connection(std::stop_token stoken);
void receive_loop(std::stop_token stoken);

rclcpp::Logger logger_;
rclcpp::Clock::SharedPtr clock_;
std::string interface_;
double read_timeout_;
double reconnect_timeout_;
int socket_;
CanCallback receive_callback_;
std::jthread receive_thread_;
Expand Down
110 changes: 77 additions & 33 deletions src/socketcan_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,15 @@ std::ostream & operator<<(std::ostream & os, const can_msgs::msg::Frame & msg)

SocketCanBridge::SocketCanBridge(
const rclcpp::Logger & logger, rclcpp::Clock::SharedPtr clock, const std::string & interface,
double read_timeout, const CanCallback & receive_callback)
: logger_(logger), clock_(clock), receive_callback_(receive_callback)
double read_timeout, double reconnect_timeout, const CanCallback & receive_callback)
: logger_(logger),
clock_(clock),
interface_(interface),
read_timeout_(read_timeout),
reconnect_timeout_(reconnect_timeout),
receive_callback_(receive_callback)
{
using std::placeholders::_1;

if ((socket_ = socket(PF_CAN, SOCK_RAW, CAN_RAW)) < 0) {
throw std::system_error(errno, std::generic_category());
}

// When SIGINT is received, the program needs to quit. But a `read()` call can't be interrupted.
// So I use SO_RCVTIMEO to make all reads return within `read_timeout` seconds.
auto microseconds = std::lround(read_timeout * 1'000'000);
timeval tv;
tv.tv_sec = microseconds / 1'000'000;
tv.tv_usec = microseconds - (tv.tv_sec * 1'000'000);
setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv);

ifreq ifr;
strncpy(ifr.ifr_name, interface.c_str(), sizeof(ifr.ifr_name));
if (ioctl(socket_, SIOCGIFINDEX, &ifr) < 0) {
throw std::system_error(errno, std::generic_category());
}

sockaddr_can addr;
memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;

if (bind(socket_, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) < 0) {
throw std::system_error(errno, std::generic_category());
}

receive_thread_ = std::jthread{std::bind(&SocketCanBridge::receive_loop, this, _1)};
}

Expand Down Expand Up @@ -98,15 +75,82 @@ void SocketCanBridge::close()
}
}

void SocketCanBridge::receive_loop(std::stop_token stoken) const
void SocketCanBridge::connect()
{
RCLCPP_INFO(logger_, "Connecting to the CAN interface %s ..", interface_.c_str());

if ((socket_ = socket(PF_CAN, SOCK_RAW, CAN_RAW)) < 0) {
throw std::system_error(errno, std::generic_category());
}

// When SIGINT is received, the program needs to quit. But a `read()` call can't be interrupted.
// So I use SO_RCVTIMEO to make all reads return within `read_timeout` seconds.
auto microseconds = std::lround(read_timeout_ * 1'000'000);
timeval tv;
tv.tv_sec = microseconds / 1'000'000;
tv.tv_usec = microseconds - (tv.tv_sec * 1'000'000);
if (setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) < 0) {
throw std::system_error(errno, std::generic_category());
}

ifreq ifr;
strncpy(ifr.ifr_name, interface_.c_str(), sizeof(ifr.ifr_name));
if (ioctl(socket_, SIOCGIFINDEX, &ifr) < 0) {
throw std::system_error(errno, std::generic_category());
}

sockaddr_can addr;
memset(&addr, 0, sizeof(addr));
addr.can_family = AF_CAN;
addr.can_ifindex = ifr.ifr_ifindex;

if (bind(socket_, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) < 0) {
throw std::system_error(errno, std::generic_category());
}

RCLCPP_INFO(logger_, "Connected to the CAN interface %s", interface_.c_str());
}

void SocketCanBridge::ensure_connection(std::stop_token stoken)
{
while (!stoken.stop_requested()) {
try {
connect();
break;
} catch (const std::system_error & e) {
RCLCPP_ERROR(
logger_, "Error connecting to %s: %s, retrying in %.2f seconds", interface_.c_str(),
e.what(), reconnect_timeout_);
clock_->sleep_for(rclcpp::Duration::from_seconds(reconnect_timeout_));
}
}
}

void SocketCanBridge::receive_loop(std::stop_token stoken)
{
RCLCPP_INFO(logger_, "Receive loop started");

ensure_connection(stoken);

while (!stoken.stop_requested()) {
RCLCPP_DEBUG(logger_, "Waiting for a CAN frame ..");
can_frame frame;
auto nbytes = read(socket_, &frame, sizeof(can_frame));
RCLCPP_DEBUG(logger_, "Received %zd bytes", nbytes);

if (nbytes < 0) {
if (errno == EAGAIN) continue;
throw std::system_error(errno, std::generic_category());
if (errno == EAGAIN) {
RCLCPP_DEBUG(
logger_, "Error reading from the socket: %s (%d)", strerror(errno),
static_cast<int>(errno));
continue;
}
RCLCPP_ERROR(
logger_, "Error reading from the socket: %s (%d), reconnecting in %.2f seconds ..",
strerror(errno), static_cast<int>(errno), reconnect_timeout_);
clock_->sleep_for(rclcpp::Duration::from_seconds(reconnect_timeout_));
ensure_connection(stoken);
continue;
}
if (nbytes != sizeof(frame)) {
RCLCPP_ERROR(logger_, "Incomplete CAN frame received, skipping");
Expand Down
2 changes: 1 addition & 1 deletion src/socketcan_bridge_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SocketCanBridgeNode::SocketCanBridgeNode(const rclcpp::NodeOptions & options)
can_pub(this->create_publisher<can_msgs::msg::Frame>("~/rx", 100)),
bridge(
this->get_logger(), this->get_clock(), this->declare_parameter("interface", "can0"),
this->declare_parameter("read_timeout", 1.0),
this->declare_parameter("read_timeout", 1.0), this->declare_parameter("reconnect_timeout", 5.0),
[this](const can_msgs::msg::Frame & msg) { can_pub->publish(msg); }),
can_sub(this->create_subscription<can_msgs::msg::Frame>(
"~/tx", 100, [this](can_msgs::msg::Frame::ConstSharedPtr msg) { bridge.send(*msg); }))
Expand Down
Loading