diff --git a/system/system_monitor/CMakeLists.txt b/system/system_monitor/CMakeLists.txt index 97eddcb590a5f..391c82a09c960 100644 --- a/system/system_monitor/CMakeLists.txt +++ b/system/system_monitor/CMakeLists.txt @@ -118,7 +118,8 @@ ament_auto_add_executable(hdd_reader ) ament_auto_add_executable(traffic_reader - reader/traffic_reader/traffic_reader.cpp + reader/traffic_reader/traffic_reader_main.cpp + reader/traffic_reader/traffic_reader_service.cpp ) find_library(NL3 nl-3 REQUIRED) diff --git a/system/system_monitor/README.md b/system/system_monitor/README.md index 87107d3937a82..6203ad45d4b3a 100644 --- a/system/system_monitor/README.md +++ b/system/system_monitor/README.md @@ -73,7 +73,8 @@ Every topic is published in 1 minute interval. | | HDD WriteIOPS | ✓ | ✓ | ✓ | | | | HDD Connection | ✓ | ✓ | ✓ | | | Memory Monitor | Memory Usage | ✓ | ✓ | ✓ | | -| Net Monitor | Network Usage | ✓ | ✓ | ✓ | | +| Net Monitor | Network Connection | ✓ | ✓ | ✓ | | +| | Network Usage | ✓ | ✓ | ✓ | Notification of usage only, normally error not generated. | | | Network CRC Error | ✓ | ✓ | ✓ | Warning occurs when the number of CRC errors in the period reaches the threshold value. The number of CRC errors that occur is the same as the value that can be confirmed with the ip command. | | | IP Packet Reassembles Failed | ✓ | ✓ | ✓ | | | NTP Monitor | NTP Offset | ✓ | ✓ | ✓ | | diff --git a/system/system_monitor/config/net_monitor.param.yaml b/system/system_monitor/config/net_monitor.param.yaml index cb7e1b48380a3..f2c68adda2efe 100644 --- a/system/system_monitor/config/net_monitor.param.yaml +++ b/system/system_monitor/config/net_monitor.param.yaml @@ -1,7 +1,6 @@ /**: ros__parameters: devices: ["*"] - traffic_reader_port: 7636 monitor_program: "greengrass" crc_error_check_duration: 1 crc_error_count_threshold: 1 diff --git a/system/system_monitor/docs/ros_parameters.md b/system/system_monitor/docs/ros_parameters.md index bba2c3fb40408..1d081d513c85c 100644 --- a/system/system_monitor/docs/ros_parameters.md +++ b/system/system_monitor/docs/ros_parameters.md @@ -61,14 +61,14 @@ mem_monitor: net_monitor: -| Name | Type | Unit | Default | Notes | -| :-------------------------------- | :----------: | :-----: | :-----: | :--------------------------------------------------------------------------------------------------------------------------------------------------- | -| devices | list[string] | n/a | none | The name of network interface to monitor. (e.g. eth0, \* for all network interfaces) | -| usage_warn | float | %(1e-2) | 0.95 | Generates warning when network usage reaches a specified value or higher. | -| crc_error_check_duration | int | sec | 1 | CRC error check duration. | -| crc_error_count_threshold | int | n/a | 1 | Generates warning when count of CRC errors during CRC error check duration reaches a specified value or higher. | -| reassembles_failed_check_duration | int | sec | 1 | IP packet reassembles failed check duration. | -| reassembles_failed_check_count | int | n/a | 1 | Generates warning when count of IP packet reassembles failed during IP packet reassembles failed check duration reaches a specified value or higher. | +| Name | Type | Unit | Default | Notes | +| :-------------------------------- | :----------: | :--: | :--------: | :--------------------------------------------------------------------------------------------------------------------------------------------------- | +| devices | list[string] | n/a | none | The name of network interface to monitor. (e.g. eth0, \* for all network interfaces) | +| monitor_program | string | n/a | greengrass | program name to be monitored by nethogs name. | +| crc_error_check_duration | int | sec | 1 | CRC error check duration. | +| crc_error_count_threshold | int | n/a | 1 | Generates warning when count of CRC errors during CRC error check duration reaches a specified value or higher. | +| reassembles_failed_check_duration | int | sec | 1 | IP packet reassembles failed check duration. | +| reassembles_failed_check_count | int | n/a | 1 | Generates warning when count of IP packet reassembles failed during IP packet reassembles failed check duration reaches a specified value or higher. | ## NTP Monitor diff --git a/system/system_monitor/docs/topics_net_monitor.md b/system/system_monitor/docs/topics_net_monitor.md index d223b359cdb08..edb067c0a5be7 100644 --- a/system/system_monitor/docs/topics_net_monitor.md +++ b/system/system_monitor/docs/topics_net_monitor.md @@ -1,5 +1,23 @@ # ROS topics: Net Monitor +## Network Connection + +/diagnostics/net_monitor: Network Connection + +[summary] + +| level | message | +| ----- | -------------- | +| OK | OK | +| WARN | no such device | + +[values] + +| key | value (example) | +| --------------------- | ------------------- | +| Network [0-9]: status | OK / no such device | +| HDD [0-9]: name | wlp82s0 | + ## Network Usage /diagnostics/net_monitor: Network Usage @@ -38,31 +56,19 @@ | ----- | ------- | | OK | OK | -[values] program +[values when specified program is detected] | key | value (example) | | -------------------------------- | ------------------------------------------- | -| nethogs [0-9]: PROGRAM | /lambda/greengrassSystemComponents/1384/999 | -| nethogs [0-9]: SENT (KB/Sec) | 1.13574 | -| nethogs [0-9]: RECEIVED (KB/Sec) | 0.261914 | - -[values] all - -| key | value (example) | -| --------------------- | -------------------------------------------------------------- | -| nethogs: all (KB/Sec) | python3.7/1520/999 0.274414 0.354883 | -| | /lambda/greengrassSystemComponents/1299/999 0.487305 0.0966797 | -| | sshd: muser@pts/5/15917/1002 0.396094 0.0585938 | -| | /usr/bin/python3.7/2371/999 0 0 | -| | /greengrass/ggc/packages/1.10.0/bin/daemon/906/0 0 0 | -| | python3.7/4362/999 0 0 | -| | unknown TCP/0/0 0 0 | +| nethogs [0-9]: program | /lambda/greengrassSystemComponents/1384/999 | +| nethogs [0-9]: sent (KB/Sec) | 1.13574 | +| nethogs [0-9]: received (KB/Sec) | 0.261914 | -[values] error +[values when error is occurring] -| key | value (example) | -| ----- | ----------------------------------------------------- | -| error | [nethogs -t] execve failed: No such file or directory | +| key | value (example) | +| ----- | ---------------------------------------- | +| error | execve failed: No such file or directory | ## Network CRC Error @@ -70,9 +76,10 @@ [summary] -| level | message | -| ----- | ------- | -| OK | OK | +| level | message | +| ----- | --------- | +| OK | OK | +| WARN | CRC error | [values] diff --git a/system/system_monitor/include/system_monitor/net_monitor/net_monitor.hpp b/system/system_monitor/include/system_monitor/net_monitor/net_monitor.hpp index 3ddb078fa00e1..ded9c398c6cd1 100644 --- a/system/system_monitor/include/system_monitor/net_monitor/net_monitor.hpp +++ b/system/system_monitor/include/system_monitor/net_monitor/net_monitor.hpp @@ -21,207 +21,290 @@ #define SYSTEM_MONITOR__NET_MONITOR__NET_MONITOR_HPP_ #include "system_monitor/net_monitor/nl80211.hpp" +#include "traffic_reader/traffic_reader_common.hpp" #include +#include + #include #include #include +#include #include #include -#define toMbit(X) (static_cast(X) / 1000000 * 8) +template +constexpr auto to_mbit(T value) +{ + return (static_cast(value) / 1000000 * 8); +} + +/** + * @brief Network information + */ +struct NetworkInfomation +{ + bool is_invalid; //!< @brief Valid network to be checked + int mtu_error_code; //!< @brief Error code set by ioctl() with SIOCGIFMTU + int ethtool_error_code; //!< @brief Error code set by ioctl() with SIOCETHTOOL + bool is_running; //!< @brief Resource allocated flag + std::string interface_name; //!< @brief Interface name + double speed; //!< @brief Network capacity + int mtu; //!< @brief MTU + double rx_traffic; //!< @brief Traffic received + double tx_traffic; //!< @brief Traffic transmitted + double rx_usage; //!< @brief Network capacity usage rate received + double tx_usage; //!< @brief Network capacity usage rate transmitted + unsigned int rx_bytes; //!< @brief Total bytes received + unsigned int rx_errors; //!< @brief Bad packets received + unsigned int tx_bytes; //!< @brief Total bytes transmitted + unsigned int tx_errors; //!< @brief Packet transmit problems + unsigned int collisions; //!< @brief Number of collisions during packet transmissions +}; /** * @brief Bytes information */ -typedef struct bytes +struct Bytes { - unsigned int rx_bytes; //!< @brief total bytes received - unsigned int tx_bytes; //!< @brief total bytes transmitted + unsigned int rx_bytes; //!< @brief Total bytes received + unsigned int tx_bytes; //!< @brief Total bytes transmitted +}; - bytes() : rx_bytes(0), tx_bytes(0) {} -} bytes; +/** + * @brief CRC errors information + */ +struct CrcErrors +{ + std::deque errors_queue{}; //!< @brief queue that holds count of CRC errors + unsigned int last_rx_crc_errors{0}; //!< @brief rx_crc_error at the time of the last monitoring +}; + +namespace local = boost::asio::local; class NetMonitor : public rclcpp::Node { public: /** - * @brief constructor + * @brief Constructor * @param [in] options Options associated with this node. */ explicit NetMonitor(const rclcpp::NodeOptions & options); + /** - * @brief destructor + * @brief Destructor */ - ~NetMonitor(); + ~NetMonitor() override; /** - * @brief Shutdown nl80211 object + * @brief Copy constructor */ - void shutdown_nl80211(); + NetMonitor(const NetMonitor &) = delete; + + /** + * @brief Copy assignment operator + */ + NetMonitor & operator=(const NetMonitor &) = delete; + + /** + * @brief Move constructor + */ + NetMonitor(const NetMonitor &&) = delete; + + /** + * @brief Move assignment operator + */ + NetMonitor & operator=(const NetMonitor &&) = delete; protected: using DiagStatus = diagnostic_msgs::msg::DiagnosticStatus; /** - * @brief check CPU usage - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @note NOLINT syntax is needed since diagnostic_updater asks for a non-const reference - * to pass diagnostic message updated in this function to diagnostic publish calls. + * @brief Check network connection + * @param [out] status diagnostic message passed directly to diagnostic publish calls + */ + void check_connection(diagnostic_updater::DiagnosticStatusWrapper & status); + + /** + * @brief Check network usage + * @param [out] status diagnostic message passed directly to diagnostic publish calls + */ + void check_usage(diagnostic_updater::DiagnosticStatusWrapper & status); + + /** + * @brief Monitor network traffic + * @param [out] status diagnostic message passed directly to diagnostic publish calls + */ + void monitor_traffic(diagnostic_updater::DiagnosticStatusWrapper & status); + + /** + * @brief Check CRC error + * @param [out] status diagnostic message passed directly to diagnostic publish calls */ - void checkUsage( - diagnostic_updater::DiagnosticStatusWrapper & stat); // NOLINT(runtime/references) + void check_crc_error(diagnostic_updater::DiagnosticStatusWrapper & status); /** - * @brief monitor traffic - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @note NOLINT syntax is needed since diagnostic_updater asks for a non-const reference - * to pass diagnostic message updated in this function to diagnostic publish calls. + * @brief Check IP packet reassembles failed + * @param [out] status diagnostic message passed directly to diagnostic publish calls */ - void monitorTraffic( - diagnostic_updater::DiagnosticStatusWrapper & stat); // NOLINT(runtime/references) + void check_reassembles_failed(diagnostic_updater::DiagnosticStatusWrapper & status); /** - * @brief check CRC error - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @note NOLINT syntax is needed since diagnostic_updater asks for a non-const reference - * to pass diagnostic message updated in this function to diagnostic publish calls. + * @brief Timer callback */ - void checkCrcError( - diagnostic_updater::DiagnosticStatusWrapper & stat); // NOLINT(runtime/references) + void on_timer(); /** - * @brief check IP packet reassembles failed - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @note NOLINT syntax is needed since diagnostic_updater asks for a non-const reference - * to pass diagnostic message updated in this function to diagnostic publish calls. + * @brief Determine if it is a supported network + * @param [in] network Network infomation + * @param [in] index Index of network infomation index + * @param [out] status Diagnostic message passed directly to diagnostic publish calls + * @param [out] error_message Error message */ - void checkReassemblesFailed( - diagnostic_updater::DiagnosticStatusWrapper & stat); // NOLINT(runtime/references) + static void make_invalid_diagnostic_status( + const NetworkInfomation & network, int index, + diagnostic_updater::DiagnosticStatusWrapper & status, std::string & error_message); /** - * @brief get wireless speed - * @param [in] ifa_name interface name - * @return wireless speed + * @brief Update list of network information */ - float getWirelessSpeed(const char * ifa_name); + void update_network_list(); /** - * @brief timer callback + * @brief Update network information by using socket + * @param [out] network Network information */ - void onTimer(); + void update_network_information_by_socket(NetworkInfomation & network); /** - * @brief update Network information list + * @brief Update network information about MTU + * @param [out] network Network information + * @param [in] socket File descriptor to socket */ - void updateNetworkInfoList(); + static void update_mtu(NetworkInfomation & network, int socket); /** - * @brief check NetMonitor General Infomation - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @return check result + * @brief Update network information about network capacity + * @param [out] network Network information + * @param [in] socket File descriptor to socket */ - bool checkGeneralInfo(diagnostic_updater::DiagnosticStatusWrapper & stat); + void update_network_capacity(NetworkInfomation & network, int socket); /** - * @brief Network information + * @brief Update network information by using routing netlink stats + * @param [out] network Network information + * @param [in] data Pointer to routing netlink stats + * @param [in] duration Time from previous measurement */ - struct NetworkInfo - { - int mtu_errno; //!< @brief errno set by ioctl() with SIOCGIFMTU - int ethtool_errno; //!< @brief errno set by ioctl() with SIOCETHTOOL - bool is_running; //!< @brief resource allocated flag - std::string interface_name; //!< @brief interface name - float speed; //!< @brief network capacity - int mtu; //!< @brief MTU - float rx_traffic; //!< @brief traffic received - float tx_traffic; //!< @brief traffic transmitted - float rx_usage; //!< @brief network capacity usage rate received - float tx_usage; //!< @brief network capacity usage rate transmitted - unsigned int rx_bytes; //!< @brief total bytes received - unsigned int rx_errors; //!< @brief bad packets received - unsigned int tx_bytes; //!< @brief total bytes transmitted - unsigned int tx_errors; //!< @brief packet transmit problems - unsigned int collisions; //!< @brief number of collisions during packet transmissions + void update_network_information_by_routing_netlink( + NetworkInfomation & network, void * data, const rclcpp::Duration & duration); - NetworkInfo() - : mtu_errno(0), - ethtool_errno(0), - is_running(false), - interface_name(""), - speed(0.0), - mtu(0), - rx_traffic(0.0), - tx_traffic(0.0), - rx_usage(0.0), - tx_usage(0.0), - rx_bytes(0), - rx_errors(0), - tx_bytes(0), - tx_errors(0), - collisions(0) - { - } - }; + /** + * @brief Update network information about network traffic + * @param [out] network Network information + * @param [in] stats Pointer to routing netlink stats + * @param [in] duration Time from previous measurement + */ + void update_traffic( + NetworkInfomation & network, const struct rtnl_link_stats * stats, + const rclcpp::Duration & duration); + + /** + * @brief Update network information about CRC error + * @param [out] network Network information + * @param [in] stats Pointer to routing netlink stats + */ + void update_crc_error(NetworkInfomation & network, const struct rtnl_link_stats * stats); + + /** + * @brief Shutdown nl80211 object + */ + void shutdown_nl80211(); + + /** + * @brief Send request to start nethogs + */ + void send_start_nethogs_request(); + + /** + * @brief Get result of nethogs + * @param [out] result result of nethogs + */ + void get_nethogs_result(traffic_reader_service::Result & result); + + /** + * @brief Connect to traffic-reader service + * @return true on success, false on error + */ + bool connect_service(); + + /** + * @brief Send data to traffic-reader service + * @param [in] request Request to traffic-reader service + * @return true on success, false on error + */ + bool send_data(traffic_reader_service::Request request); /** - * @brief determine if it is a supported network - * @param [in] net_info network infomation - * @param [in] index index of network infomation index - * @param [out] stat diagnostic message passed directly to diagnostic publish calls - * @param [out] error_str error string - * @return result of determining whether it is a supported network + * @brief Send data to traffic-reader service with parameters + * @param [in] request Request to traffic-reader service + * @param [in] parameters List of parameters + * @param[in] program_name Filter by program name + * @return true on success, false on error */ - bool isSupportedNetwork( - const NetworkInfo & net_info, int index, diagnostic_updater::DiagnosticStatusWrapper & stat, - std::string & error_str); + bool send_data_with_parameters( + traffic_reader_service::Request request, std::vector & parameters, + std::string & program_name); /** - * @brief search column index of IP packet reassembles failed in /proc/net/snmp + * @brief Receive data from traffic-reader service + * @param [out] result Status from traffic-reader service */ - void searchReassemblesFailedColumnIndex(); + void receive_data(traffic_reader_service::Result & result); + + /** + * @brief Close connection with traffic-reader service + */ + void close_connection(); + + /** + * @brief Get column index of IP packet reassembles failed from `/proc/net/snmp` + */ + void get_reassembles_failed_column_index(); /** * @brief get IP packet reassembles failed * @param [out] reassembles_failed IP packet reassembles failed * @return execution result */ - bool getReassemblesFailed(uint64_t & reassembles_failed); + bool get_reassembles_failed(uint64_t & reassembles_failed); diagnostic_updater::Updater updater_; //!< @brief Updater class which advertises to /diagnostics rclcpp::TimerBase::SharedPtr timer_; //!< @brief timer to get Network information - char hostname_[HOST_NAME_MAX + 1]; //!< @brief host name - std::map bytes_; //!< @brief list of bytes - rclcpp::Time last_update_time_; //!< @brief last update time - std::vector device_params_; //!< @brief list of devices - NL80211 nl80211_; //!< @brief 802.11 netlink-based interface - int getifaddrs_errno_; //!< @brief errno set by getifaddrs() - std::vector net_info_list_; //!< @brief list of Network information + char hostname_[HOST_NAME_MAX + 1]; //!< @brief host name + std::map bytes_; //!< @brief list of bytes + rclcpp::Time last_update_time_; //!< @brief last update time + std::vector device_params_; //!< @brief list of devices + NL80211 nl80211_; //!< @brief 802.11 netlink-based interface + int getifaddrs_error_code_; //!< @brief Error code set by getifaddrs() + std::vector network_list_; //!< @brief List of Network information - /** - * @brief CRC errors information - */ - typedef struct crc_errors - { - std::deque errors_queue; //!< @brief queue that holds count of CRC errors - unsigned int last_rx_crc_errors; //!< @brief rx_crc_error at the time of the last monitoring + std::string monitor_program_; //!< @brief nethogs monitor program name + std::string socket_path_; //!< @brief Path of UNIX domain socket + boost::asio::io_service io_service_; //!< @brief Core I/O functionality + std::unique_ptr acceptor_; //!< @brief UNIX domain acceptor + std::unique_ptr socket_; //!< @brief UNIX domain socket - crc_errors() : last_rx_crc_errors(0) {} - } crc_errors; - std::map crc_errors_; //!< @brief list of CRC errors + std::map crc_errors_; //!< @brief list of CRC errors + unsigned int crc_error_check_duration_; //!< @brief CRC error check duration + unsigned int crc_error_count_threshold_; //!< @brief CRC error count threshold std::deque reassembles_failed_queue_; //!< @brief queue that holds count of IP packet reassembles failed uint64_t last_reassembles_failed_; //!< @brief IP packet reassembles failed at the time of the //!< last monitoring - - std::string monitor_program_; //!< @brief nethogs monitor program name - bool nethogs_all_; //!< @brief nethogs result all mode - int traffic_reader_port_; //!< @brief port number to connect to traffic_reader - unsigned int crc_error_check_duration_; //!< @brief CRC error check duration - unsigned int crc_error_count_threshold_; //!< @brief CRC error count threshold unsigned int reassembles_failed_check_duration_; //!< @brief IP packet reassembles failed check duration unsigned int @@ -229,10 +312,16 @@ class NetMonitor : public rclcpp::Node unsigned int reassembles_failed_column_index_; //!< @brief column index of IP Reassembles failed //!< in /proc/net/snmp + /** + * @brief Network connection status messages + */ + const std::map connection_messages_ = { + {DiagStatus::OK, "OK"}, {DiagStatus::WARN, "no such device"}, {DiagStatus::ERROR, "unused"}}; + /** * @brief Network usage status messages */ - const std::map usage_dict_ = { + const std::map usage_messages_ = { {DiagStatus::OK, "OK"}, {DiagStatus::WARN, "high load"}, {DiagStatus::ERROR, "down"}}; }; diff --git a/system/system_monitor/include/traffic_reader/traffic_reader.hpp b/system/system_monitor/include/traffic_reader/traffic_reader_common.hpp similarity index 63% rename from system/system_monitor/include/traffic_reader/traffic_reader.hpp rename to system/system_monitor/include/traffic_reader/traffic_reader_common.hpp index 7b0276dee4c07..9d84072a42fc3 100644 --- a/system/system_monitor/include/traffic_reader/traffic_reader.hpp +++ b/system/system_monitor/include/traffic_reader/traffic_reader_common.hpp @@ -17,21 +17,33 @@ * @brief traffic reader definitions */ -#ifndef TRAFFIC_READER__TRAFFIC_READER_HPP_ -#define TRAFFIC_READER__TRAFFIC_READER_HPP_ +#ifndef TRAFFIC_READER__TRAFFIC_READER_COMMON_HPP_ +#define TRAFFIC_READER__TRAFFIC_READER_COMMON_HPP_ #include #include +#include #include +namespace traffic_reader_service +{ + +static constexpr char socket_path[] = "/tmp/traffic_reader"; + +enum Request { + NONE = 0, + START_NETHOGS, + GET_RESULT, +}; + /** - * @brief traffic information + * @brief Result of nethogs */ -struct TrafficReaderResult +struct Result { - int error_code_; //!< @brief error code, 0 on success, otherwise error - std::string str_; //!< @brief nethogs result string + int error_code; //!< @brief Error code, 0 on success, otherwise error + std::string output; //!< @brief Result output of nethogs /** * @brief Load or save data members. @@ -43,13 +55,13 @@ struct TrafficReaderResult template void serialize(archive & ar, const unsigned /*version*/) // NOLINT(runtime/references) { - ar & error_code_; - ar & str_; + ar & error_code; + ar & output; } }; -constexpr std::string_view GET_ALL_STR{""}; //!< @brief nethogs result all request string +// constexpr std::string_view GET_ALL_STR{""}; //!< @brief nethogs result all request string -constexpr int TRAFFIC_READER_PORT = 7636; //!< @brief traffic reader port: 7634-7647 Unassigned +} // namespace traffic_reader_service -#endif // TRAFFIC_READER__TRAFFIC_READER_HPP_ +#endif // TRAFFIC_READER__TRAFFIC_READER_COMMON_HPP_ diff --git a/system/system_monitor/include/traffic_reader/traffic_reader_service.hpp b/system/system_monitor/include/traffic_reader/traffic_reader_service.hpp new file mode 100644 index 0000000000000..a66d0b121ed46 --- /dev/null +++ b/system/system_monitor/include/traffic_reader/traffic_reader_service.hpp @@ -0,0 +1,110 @@ +// Copyright 2022 The Autoware Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TRAFFIC_READER__TRAFFIC_READER_SERVICE_HPP_ +#define TRAFFIC_READER__TRAFFIC_READER_SERVICE_HPP_ + +#include "traffic_reader/traffic_reader_common.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +namespace traffic_reader_service +{ + +namespace local = boost::asio::local; + +class TrafficReaderService +{ +public: + /** + * @brief Constructor + * @param[in] socket_path Path of UNIX domain socket + */ + explicit TrafficReaderService(std::string socket_path); + + /** + * @brief Initialization + * @return true on success, false on error + */ + bool initialize(); + + /** + * @brief Shutdown + */ + void shutdown(); + + /** + * @brief Main loop + */ + void run(); + +protected: + /** + * @brief Handle message + * @param[in] buffer Pointer to data received + */ + void handle_message(const char * buffer); + + /** + * @brief Start nethogs + * @param[in] archive Archive object for loading + */ + void start_nethogs(boost::archive::text_iarchive & archive); + + /** + * @brief Get command line of process from nethogs output + * @param[in] line nethogs output + * @return Command line of process + */ + static std::string get_command_line(const std::string & line); + + /** + * @brief Get command line of process from PID + * @param[in] pid PID + * @return Command line of process + */ + static std::string get_command_line_with_pid(pid_t pid); + + /** + * @brief Return result of nethogs + */ + void get_result(); + + /** + * @brief Execute nethogs + */ + void execute_nethogs(); + + std::string socket_path_; //!< @brief Path of UNIX domain socket + boost::asio::io_service io_service_; //!< @brief Core I/O functionality + std::unique_ptr acceptor_; //!< @brief UNIX domain acceptor + std::unique_ptr socket_; //!< @brief UNIX domain socket + std::thread thread_; //!< @brief Thread to run nethogs + std::mutex mutex_; //!< @brief Mutex guard for the flag + bool stop_; //!< @brief Flag to stop thread + std::vector devices_; //!< @brief List of devices + std::string program_name_; //!< @brief Filter by program name + traffic_reader_service::Result result_; //!< @brief Result of nethogs +}; + +} // namespace traffic_reader_service + +#endif // TRAFFIC_READER__TRAFFIC_READER_SERVICE_HPP_ diff --git a/system/system_monitor/reader/traffic_reader/traffic_reader.cpp b/system/system_monitor/reader/traffic_reader/traffic_reader.cpp deleted file mode 100644 index ebb6d976d9921..0000000000000 --- a/system/system_monitor/reader/traffic_reader/traffic_reader.cpp +++ /dev/null @@ -1,351 +0,0 @@ -// Copyright 2021 Autoware Foundation -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file traffic_reader.cpp - * @brief traffic information read class - */ - -#include - -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include -#include -#include - -namespace bp = boost::process; - -/** - * @brief thread safe nethogs result - */ -class NethogsResult -{ -public: - /** - * @brief get nethogs result string - * @param [out] str nethogs result string - * @return error code - */ - int get(std::string & str) - { - std::lock_guard lock(mutex_); - str = str_; - return err_; - } - /** - * @brief set nethogs result string and error code - * @param [in] str nethogs result string - * @param [in] err error code - * @return error code - */ - void set(const std::string & str, int err) - { - std::lock_guard lock(mutex_); - str_ = str; - err_ = err; - } - -private: - std::mutex mutex_; //!< @brief lock for thread safe - std::string str_; //!< @brief nethogs result string - int err_; //!< @brief error code, 0 on success, otherwise error -}; -static NethogsResult g_NethogsString; //!< @brief nethogs result -static volatile bool g_Running = false; //!< @brief run status, true running, false exit -static volatile int g_WatchDogCount = 0; //!< @brief executeNethogs watchdog, 0 refresh - -/** - * @brief print usage - */ -void usage() -{ - printf("Usage: traffic_reader [options]\n"); - printf(" -h --help : Display help\n"); - printf(" -p --port # : Port number to listen to.\n"); - printf("\n"); -} - -/** - * @brief thread function to execute nethogs - */ -void executeNethogs() -{ - const std::string cmd = "nethogs -t"; - // Output format of `nethogs -t` - // Blank Line | - // Start Line | Refreshing: - // Result Line 1 | PROGRAM/PID/UID SENT RECEIVED - // ... | ... - // Result Line n | PROGRAM/PID/UID SENT RECEIVED - // Last Line | unknown TCP/0/0 - std::string error_str; - std::string log_str; - - g_WatchDogCount = 0; - - while (g_Running) { - bp::ipstream is_out; - bp::ipstream is_err; - boost::process::child c; - try { - c = bp::child(cmd, bp::std_out > is_out, bp::std_err > is_err); - std::string result_buf; - std::string line; - while (std::getline(is_out, line)) { - if (!g_Running) { - // run() exit - return; - } - if (line.empty()) { - // Blank Line - if (!result_buf.empty()) { - g_NethogsString.set(result_buf, 0); - result_buf = ""; - } - } else if (line.find("unknown TCP/0/0") != std::string::npos) { - // Last Line - result_buf.append(line); - g_NethogsString.set(result_buf, 0); - result_buf = ""; - } else if (line.find("/0/0\t") != std::string::npos) { - // no-pid and root data skip. - } else if (line == std::string("Refreshing:")) { - // Start Line - g_WatchDogCount = 0; - if (!result_buf.empty()) { - g_NethogsString.set(result_buf, 0); - result_buf = ""; - } - if (!log_str.empty()) { - syslog(LOG_INFO, "[%s] command started.\n", cmd.c_str()); - log_str = ""; - } - } else { - // Result Line - result_buf.append(line + "\n"); - } - } // while - - c.wait(); - std::ostringstream os; - is_err >> os.rdbuf(); - error_str = "command terminate. " + os.str(); - } catch (boost::process::process_error & e) { - error_str = e.what(); - c.terminate(); - signal(SIGCHLD, SIG_IGN); - } - - int err_code = c.exit_code(); - // The err_code when killing a child process is usually 9. However, - // Because there were times when it was 0, 0 is changed to an error value. - if (err_code == 0) { - err_code = -1; - } - g_NethogsString.set("[" + cmd + "] " + error_str, err_code); - - if (log_str != error_str) { - log_str = error_str; - syslog(LOG_ERR, "[%s] err(%d) %s\n", cmd.c_str(), err_code, error_str.c_str()); - } - - for (int cnt = 10; cnt > 0 && g_Running; --cnt) { - g_WatchDogCount = 0; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } // while - - std::terminate(); -} - -/** - * @brief check NET temperature - * @param [in] port port to listen - */ -void run(int port) -{ - // Create a new socket - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - syslog(LOG_ERR, "Failed to create a new socket. %s\n", strerror(errno)); - return; - } - - // Allow address reuse - int ret = 0; - int opt = 1; - ret = setsockopt( - sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&opt), (socklen_t)sizeof(opt)); - if (ret < 0) { - syslog(LOG_ERR, "Failed to set socket FD's option. %s\n", strerror(errno)); - close(sock); - return; - } - - // Give the socket FD the local address ADDR - sockaddr_in addr{}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr)); - if (ret < 0) { - syslog(LOG_ERR, "Failed to give the socket FD the local address ADDR. %s\n", strerror(errno)); - close(sock); - return; - } - - // Prepare to accept connections on socket FD - ret = listen(sock, 5); - if (ret < 0) { - syslog(LOG_ERR, "Failed to prepare to accept connections on socket FD. %s\n", strerror(errno)); - close(sock); - return; - } - - sockaddr_in client{}; - socklen_t len = sizeof(client); - - while (true) { - // Await a connection on socket FD - int new_sock = accept(sock, reinterpret_cast(&client), &len); - if (new_sock < 0) { - syslog( - LOG_ERR, "Failed to prepare to accept connections on socket FD. %s\n", strerror(errno)); - close(sock); - return; - } - - // Receive list of device from a socket - char buf[1024]{}; - ret = recv(new_sock, buf, sizeof(buf) - 1, 0); - if (ret < 0) { - syslog(LOG_ERR, "Failed to receive. %s\n", strerror(errno)); - close(new_sock); - close(sock); - return; - } - // No data received - if (ret == 0) { - syslog(LOG_ERR, "No data received. %s\n", strerror(errno)); - close(new_sock); - close(sock); - return; - } - - buf[sizeof(buf) - 1] = '\0'; - const std::string program_name = std::string(buf); - - // set result data - TrafficReaderResult result; - result.error_code_ = g_NethogsString.get(result.str_); - if (result.error_code_ == 0 && program_name != GET_ALL_STR) { - std::stringstream lines{result.str_}; - std::string line; - std::string result_str; - while (std::getline(lines, line)) { - if (line.find(program_name) != std::string::npos) { - result_str.append(line + "\n"); - } - } // while - result.str_ = result_str; - } - - std::ostringstream oss; - boost::archive::text_oarchive oa(oss); - oa << result; - // Write N bytes of BUF to FD - ret = write(new_sock, oss.str().c_str(), oss.str().length()); - if (ret < 0) { - syslog(LOG_ERR, "Failed to write N bytes of BUF to FD. %s\n", strerror(errno)); - } - - // Close the file descriptor FD - ret = close(new_sock); - if (ret < 0) { - syslog(LOG_ERR, "Failed to close the file descriptor FD. %s\n", strerror(errno)); - } - - if (++g_WatchDogCount >= 3) { - syslog(LOG_ERR, "nethogs command thread error\n"); - close(sock); - return; - } - } - - close(sock); -} - -int main(int argc, char ** argv) -{ - static struct option long_options[] = { - {"help", no_argument, nullptr, 'h'}, - {"port", required_argument, nullptr, 'p'}, - {nullptr, 0, nullptr, 0}}; - - // Parse command-line options - int c = 0; - int option_index = 0; - int port = TRAFFIC_READER_PORT; - while ((c = getopt_long(argc, argv, "hp:", long_options, &option_index)) != -1) { - switch (c) { - case 'h': - usage(); - return EXIT_SUCCESS; - - case 'p': - try { - port = boost::lexical_cast(optarg); - } catch (const boost::bad_lexical_cast & e) { - printf("Error: %s\n", e.what()); - return EXIT_FAILURE; - } - break; - - default: - break; - } - } - - // Put the program in the background - if (daemon(0, 0) < 0) { - printf("Failed to put the program in the background. %s\n", strerror(errno)); - return errno; - } - - // Open connection to system logger - openlog(nullptr, LOG_PID, LOG_DAEMON); - - g_Running = true; - std::thread th = std::thread(&executeNethogs); - th.detach(); - - run(port); - - g_Running = false; - - // Close descriptor used to write to system logger - closelog(); - - return EXIT_SUCCESS; -} diff --git a/system/system_monitor/reader/traffic_reader/traffic_reader_main.cpp b/system/system_monitor/reader/traffic_reader/traffic_reader_main.cpp new file mode 100644 index 0000000000000..0af3b0d89d312 --- /dev/null +++ b/system/system_monitor/reader/traffic_reader/traffic_reader_main.cpp @@ -0,0 +1,89 @@ +// Copyright 2022 Autoware Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include +#include + +/** + * @brief print usage + */ +void usage() +{ + printf("Usage: traffic_reader [options]\n"); + printf(" -h --help : Display help\n"); + printf(" -s --socket # : Path of UNIX domain socket.\n"); + printf("\n"); +} + +int main(int argc, char ** argv) +{ + static struct option long_options[] = { + {"help", no_argument, nullptr, 'h'}, + {"socket", required_argument, nullptr, 's'}, + {nullptr, 0, nullptr, 0}}; + + // Parse command-line options + int c = 0; + int option_index = 0; + std::string socket_path = traffic_reader_service::socket_path; + + while ((c = getopt_long(argc, argv, "hs:", long_options, &option_index)) != -1) { + switch (c) { + case 'h': + usage(); + return EXIT_SUCCESS; + + case 's': + socket_path = optarg; + break; + + default: + break; + } + } + + // Put the program in the background + if (daemon(0, 0) < 0) { + printf("Failed to put the program in the background. %s\n", strerror(errno)); + return errno; + } + + // Open connection to system logger + openlog(nullptr, LOG_PID, LOG_DAEMON); // NOLINT [hicpp-signed-bitwise] + + // Initialize traffic-reader service + traffic_reader_service::TrafficReaderService service(socket_path); + + if (!service.initialize()) { + service.shutdown(); + closelog(); + return EXIT_FAILURE; + } + + // Run main loop + service.run(); + + // Shutdown traffic-reader service + service.shutdown(); + + // Close descriptor used to write to system logger + closelog(); + + return EXIT_SUCCESS; +} diff --git a/system/system_monitor/reader/traffic_reader/traffic_reader_service.cpp b/system/system_monitor/reader/traffic_reader/traffic_reader_service.cpp new file mode 100644 index 0000000000000..90d8867235e37 --- /dev/null +++ b/system/system_monitor/reader/traffic_reader/traffic_reader_service.cpp @@ -0,0 +1,288 @@ +// Copyright 2022 Autoware Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file traffic_reader_service.cpp + * @brief traffic information read class + */ + +#include + +#include +#include + +#include +#include + +namespace process = boost::process; + +namespace traffic_reader_service +{ + +TrafficReaderService::TrafficReaderService(std::string socket_path) +: socket_path_(std::move(socket_path)), stop_(false) +{ +} + +bool TrafficReaderService::initialize() +{ + // Remove previous binding + remove(socket_path_.c_str()); + + local::stream_protocol::endpoint endpoint(socket_path_); + acceptor_ = std::make_unique(io_service_, endpoint); + + // Run I/O service processing loop + boost::system::error_code error_code; + io_service_.run(error_code); + if (error_code) { + syslog(LOG_ERR, "Failed to run I/O service. %s\n", error_code.message().c_str()); + return false; + } + + // Give permission to other users to access to socket + // NOLINTNEXTLINE [hicpp-signed-bitwise] + if (chmod(socket_path_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO) != 0) { + syslog(LOG_ERR, "Failed to give permission to unix domain socket. %s\n", strerror(errno)); + return false; + } + + return true; +} + +void TrafficReaderService::shutdown() { io_service_.stop(); } + +void TrafficReaderService::run() +{ + while (true) { + socket_ = std::make_unique(io_service_); + + // Accept a new connection + boost::system::error_code error_code; + acceptor_->accept(*socket_, error_code); + + if (error_code) { + syslog(LOG_ERR, "Failed to accept new connection. %s\n", error_code.message().c_str()); + socket_->close(); + continue; + } + + // Read data from socket + char buffer[1024]{}; + socket_->read_some(boost::asio::buffer(buffer, sizeof(buffer)), error_code); + + if (error_code) { + syslog(LOG_ERR, "Failed to read data from socket. %s\n", error_code.message().c_str()); + socket_->close(); + continue; + } + + // Handle message + handle_message(buffer); + + socket_->close(); + } +} + +void TrafficReaderService::handle_message(const char * buffer) +{ + uint8_t request_id = Request::NONE; + + // Restore request from ros node + std::istringstream in_stream(buffer); + boost::archive::text_iarchive archive(in_stream); + + try { + archive >> request_id; + } catch (const std::exception & e) { + syslog(LOG_ERR, "Failed to restore message. %s\n", e.what()); + return; + } + + switch (request_id) { + case Request::START_NETHOGS: + start_nethogs(archive); + break; + case Request::GET_RESULT: + get_result(); + break; + default: + syslog(LOG_WARNING, "Unknown message. %d\n", request_id); + break; + } +} + +void TrafficReaderService::start_nethogs(boost::archive::text_iarchive & archive) +{ + syslog(LOG_INFO, "Starting nethogs...\n"); + + // Restore list of devices from ros node + try { + archive >> devices_; + archive >> program_name_; + } catch (const std::exception & e) { + syslog(LOG_ERR, "Failed to restore optional commands. %s\n", e.what()); + devices_.clear(); + } + + // Stop thread if already running + if (thread_.joinable()) { + { + std::lock_guard lock(mutex_); + stop_ = true; + } + thread_.join(); + } + + // Run nethogs + stop_ = false; + thread_ = std::thread(&TrafficReaderService::execute_nethogs, this); +} + +void TrafficReaderService::get_result() +{ + // Inform ros node + std::ostringstream out_stream; + boost::archive::text_oarchive archive(out_stream); + archive & Request::GET_RESULT; + { + std::lock_guard lock(mutex_); + archive & result_; + } + + // Write data to socket + boost::system::error_code error_code; + socket_->write_some( + boost::asio::buffer(out_stream.str().c_str(), out_stream.str().length()), error_code); + + if (error_code) { + syslog(LOG_ERR, "Failed to write data to socket. %s\n", error_code.message().c_str()); + } +} + +/** + * @brief thread function to execute nethogs + */ +void TrafficReaderService::execute_nethogs() +{ + std::stringstream command; + command << "nethogs -t"; + + for (const auto & device : devices_) { + command << " " << device; + } + + syslog(LOG_INFO, "%s\n", command.str().c_str()); + + process::ipstream is_out; + process::ipstream is_error; + std::string line; + + boost::process::child c(command.str(), process::std_out > is_out, process::std_err > is_error); + + // Output format of `nethogs -t [device(s)]` + // + // Refreshing: + // [PROGRAM/PID/UID SENT RECEIVED] + // usr/share/code/code/3102772/1000 0.565234 1.06855 + // ... + // unknown TCP/0/0 0 0 + while (std::getline(is_out, line)) { + // Exit loop when stop is requested + std::lock_guard lock(mutex_); + if (stop_) break; + + // Skip if line is empty + if (line.empty()) { + continue; + } + // Start of output + if (line == "Refreshing:") { + result_.error_code = EXIT_SUCCESS; + result_.output.clear(); + continue; + } + // Skip if no PID + if (line.find("/0/0\t") != std::string::npos) { + continue; + } + // End of output + if (line == "unknown TCP/0/0\t0\t0") { + continue; + } + + std::string command_line = get_command_line(line); + if (!command_line.empty()) { + // Add nethogs output and command line + if (program_name_ == "*" || command_line.find(program_name_) != std::string::npos) { + result_.output.append(fmt::format("{}\t{}\n", line, command_line)); + } + } + } + + c.terminate(); + + std::ostringstream out_stream; + is_error >> out_stream.rdbuf(); + + // Remove new line from output + std::string message = out_stream.str(); + message.erase(std::remove(message.begin(), message.end(), '\n'), message.cend()); + syslog(LOG_INFO, "%s\n", message.c_str()); + + { + std::lock_guard lock(mutex_); + result_.error_code = c.exit_code(); + result_.output = message; + } +} + +std::string TrafficReaderService::get_command_line(const std::string & line) +{ + std::vector tag_tokens; + std::vector slash_tokens; + + // usr/share/code/code/3102772/1000 0.565234 1.06855 + // Get first token separated by tab + boost::split(tag_tokens, line, boost::is_any_of("\t"), boost::token_compress_on); + + // usr/share/code/code/3102772/1000 + // Get second from the last separated by slash + boost::split( + slash_tokens, tag_tokens[0].c_str(), boost::is_any_of("/"), boost::token_compress_on); + if (slash_tokens.size() >= 3) { + return get_command_line_with_pid(std::atoi(slash_tokens[slash_tokens.size() - 2].c_str())); + } + + return ""; +} + +std::string TrafficReaderService::get_command_line_with_pid(pid_t pid) +{ + std::ifstream file(fmt::format("/proc/{}/cmdline", pid)); + + if (!file) { + return ""; + } + + std::string line; + getline(file, line); + + // cmdline is null separated, replace with space + std::replace(line.begin(), line.end(), '\0', ' '); + + return line; +} + +} // namespace traffic_reader_service diff --git a/system/system_monitor/src/net_monitor/net_monitor.cpp b/system/system_monitor/src/net_monitor/net_monitor.cpp index d9a42f8c03835..4ceec63ca862c 100644 --- a/system/system_monitor/src/net_monitor/net_monitor.cpp +++ b/system/system_monitor/src/net_monitor/net_monitor.cpp @@ -21,499 +21,268 @@ #include "system_monitor/system_monitor_utility.hpp" -#include +#include +#include #include -#include -// #include // workaround for build errors +#include +#define FMT_HEADER_ONLY #include #include #include #include #include -#include -#include -#include - -#include -#include -#include -#include -#include NetMonitor::NetMonitor(const rclcpp::NodeOptions & options) : Node("net_monitor", options), updater_(this), + hostname_(), last_update_time_{0, 0, this->get_clock()->get_clock_type()}, device_params_( declare_parameter>("devices", std::vector())), - last_reassembles_failed_(0), + getifaddrs_error_code_(0), monitor_program_(declare_parameter("monitor_program", "greengrass")), - traffic_reader_port_(declare_parameter("traffic_reader_port", TRAFFIC_READER_PORT)), + socket_path_(declare_parameter("socket_path", traffic_reader_service::socket_path)), crc_error_check_duration_(declare_parameter("crc_error_check_duration", 1)), crc_error_count_threshold_(declare_parameter("crc_error_count_threshold", 1)), + last_reassembles_failed_(0), reassembles_failed_check_duration_( declare_parameter("reassembles_failed_check_duration", 1)), reassembles_failed_check_count_(declare_parameter("reassembles_failed_check_count", 1)), reassembles_failed_column_index_(0) { - using namespace std::literals::chrono_literals; - if (monitor_program_.empty()) { - monitor_program_ = GET_ALL_STR; - nethogs_all_ = true; - } else { - nethogs_all_ = false; + monitor_program_ = "*"; } gethostname(hostname_, sizeof(hostname_)); updater_.setHardwareID(hostname_); - updater_.add("Network Usage", this, &NetMonitor::checkUsage); - updater_.add("Network Traffic", this, &NetMonitor::monitorTraffic); - updater_.add("Network CRC Error", this, &NetMonitor::checkCrcError); - updater_.add("IP Packet Reassembles Failed", this, &NetMonitor::checkReassemblesFailed); + updater_.add("Network Connection", this, &NetMonitor::check_connection); + updater_.add("Network Usage", this, &NetMonitor::check_usage); + updater_.add("Network Traffic", this, &NetMonitor::monitor_traffic); + updater_.add("Network CRC Error", this, &NetMonitor::check_crc_error); + updater_.add("IP Packet Reassembles Failed", this, &NetMonitor::check_reassembles_failed); nl80211_.init(); - searchReassemblesFailedColumnIndex(); + // Run I/O service processing loop + boost::system::error_code error_code; + io_service_.run(error_code); + if (error_code) { + RCLCPP_WARN(get_logger(), "Failed to run I/O service. %s", error_code.message().c_str()); + } - // get Network information for the first time - updateNetworkInfoList(); + // Update list of network information + update_network_list(); + using namespace std::literals::chrono_literals; + timer_ = rclcpp::create_timer(this, get_clock(), 1s, std::bind(&NetMonitor::on_timer, this)); - timer_ = rclcpp::create_timer(this, get_clock(), 1s, std::bind(&NetMonitor::onTimer, this)); + // Get column index of IP packet reassembles failed from `/proc/net/snmp` + get_reassembles_failed_column_index(); + + // Send request to start nethogs + send_start_nethogs_request(); } NetMonitor::~NetMonitor() { shutdown_nl80211(); } -void NetMonitor::shutdown_nl80211() { nl80211_.shutdown(); } - -void NetMonitor::onTimer() { updateNetworkInfoList(); } - -// cspell: ignore ifas, ifrm, ifrc -void NetMonitor::updateNetworkInfoList() +void NetMonitor::check_connection(diagnostic_updater::DiagnosticStatusWrapper & status) { - net_info_list_.clear(); + // Remember start time to measure elapsed time + const auto t_start = SystemMonitorUtility::startMeasurement(); if (device_params_.empty()) { + status.summary(DiagStatus::ERROR, "invalid device parameter"); return; } - - const struct ifaddrs * ifa; - struct ifaddrs * ifas = nullptr; - - rclcpp::Duration duration = this->now() - last_update_time_; - - // Get network interfaces - getifaddrs_errno_ = 0; - if (getifaddrs(&ifas) < 0) { - getifaddrs_errno_ = errno; + if (getifaddrs_error_code_ != 0) { + status.summary(DiagStatus::ERROR, "getifaddrs error"); + status.add("getifaddrs", strerror(getifaddrs_error_code_)); return; } - for (ifa = ifas; ifa; ifa = ifa->ifa_next) { - // Skip no addr - if (!ifa->ifa_addr) { - continue; - } - // Skip loopback - if (ifa->ifa_flags & IFF_LOOPBACK) { - continue; - } - // Skip non AF_PACKET - if (ifa->ifa_addr->sa_family != AF_PACKET) { - continue; - } - // Skip device not specified - if ( - boost::find(device_params_, ifa->ifa_name) == device_params_.end() && - boost::find(device_params_, "*") == device_params_.end()) { - continue; - } - - int fd; - struct ifreq ifrm; - struct ifreq ifrc; - struct ethtool_cmd edata; - - net_info_list_.emplace_back(); - auto & net_info = net_info_list_.back(); - - net_info.interface_name = std::string(ifa->ifa_name); - - // Get MTU information - fd = socket(AF_INET, SOCK_DGRAM, 0); - strncpy(ifrm.ifr_name, ifa->ifa_name, IFNAMSIZ - 1); - if (ioctl(fd, SIOCGIFMTU, &ifrm) < 0) { - net_info.mtu_errno = errno; - close(fd); - continue; - } + int index = 0; + int whole_level = DiagStatus::OK; + std::string error_message; - // Get network capacity - strncpy(ifrc.ifr_name, ifa->ifa_name, IFNAMSIZ - 1); - ifrc.ifr_data = (caddr_t)&edata; - edata.cmd = ETHTOOL_GSET; - if (ioctl(fd, SIOCETHTOOL, &ifrc) < 0) { - // possibly wireless connection, get bitrate(MBit/s) - net_info.speed = nl80211_.getBitrate(ifa->ifa_name); - if (net_info.speed <= 0) { - net_info.ethtool_errno = errno; - close(fd); - continue; - } + for (const auto & network : network_list_) { + if (network.is_invalid) { + make_invalid_diagnostic_status(network, index, status, error_message); } else { - net_info.speed = edata.speed; - } - - net_info.is_running = (ifa->ifa_flags & IFF_RUNNING); - - auto * stats = (struct rtnl_link_stats *)ifa->ifa_data; - if (bytes_.find(net_info.interface_name) != bytes_.end()) { - net_info.rx_traffic = - toMbit(stats->rx_bytes - bytes_[net_info.interface_name].rx_bytes) / duration.seconds(); - net_info.tx_traffic = - toMbit(stats->tx_bytes - bytes_[net_info.interface_name].tx_bytes) / duration.seconds(); - net_info.rx_usage = net_info.rx_traffic / net_info.speed; - net_info.tx_usage = net_info.tx_traffic / net_info.speed; + status.add(fmt::format("Network {}: status", index), connection_messages_.at(DiagStatus::OK)); + status.add(fmt::format("Network {}: name", index), network.interface_name); } + ++index; + } - net_info.mtu = ifrm.ifr_mtu; - net_info.rx_bytes = stats->rx_bytes; - net_info.rx_errors = stats->rx_errors; - net_info.tx_bytes = stats->tx_bytes; - net_info.tx_errors = stats->tx_errors; - net_info.collisions = stats->collisions; - - close(fd); + // Check if specified device exists + for (const auto & device : device_params_) { + // Skip if device not specified + if (device == "*") continue; - bytes_[net_info.interface_name].rx_bytes = stats->rx_bytes; - bytes_[net_info.interface_name].tx_bytes = stats->tx_bytes; + // Check if device exists in detected networks + const auto object = std::find_if( + network_list_.begin(), network_list_.end(), + [&device](const auto & network) { return network.interface_name == device; }); - // Get the count of CRC errors - crc_errors & crc_ers = crc_errors_[net_info.interface_name]; - crc_ers.errors_queue.push_back(stats->rx_crc_errors - crc_ers.last_rx_crc_errors); - while (crc_ers.errors_queue.size() > crc_error_check_duration_) { - crc_ers.errors_queue.pop_front(); + if (object == network_list_.end()) { + whole_level = std::max(whole_level, static_cast(DiagStatus::WARN)); + error_message = "no such device"; + status.add( + fmt::format("Network {}: status", index), connection_messages_.at(DiagStatus::WARN)); + status.add(fmt::format("Network {}: name", index), device); } - crc_ers.last_rx_crc_errors = stats->rx_crc_errors; + + ++index; } - freeifaddrs(ifas); + if (!error_message.empty()) { + status.summary(whole_level, error_message); + } else { + status.summary(whole_level, connection_messages_.at(whole_level)); + } - last_update_time_ = this->now(); + // Measure elapsed time since start time and report + SystemMonitorUtility::stopMeasurement(t_start, status); } -void NetMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat) +void NetMonitor::check_usage(diagnostic_updater::DiagnosticStatusWrapper & status) { // Remember start time to measure elapsed time const auto t_start = SystemMonitorUtility::startMeasurement(); - if (!checkGeneralInfo(stat)) { - return; - } - int level = DiagStatus::OK; - int whole_level = DiagStatus::OK; int index = 0; std::string error_str; std::vector interface_names; - for (const auto & net_info : net_info_list_) { - if (!isSupportedNetwork(net_info, index, stat, error_str)) { - ++index; - interface_names.push_back(net_info.interface_name); - continue; - } - - level = net_info.is_running ? DiagStatus::OK : DiagStatus::ERROR; - - stat.add(fmt::format("Network {}: status", index), usage_dict_.at(level)); - stat.add(fmt::format("Network {}: interface name", index), net_info.interface_name); - stat.addf(fmt::format("Network {}: rx_usage", index), "%.2f%%", net_info.rx_usage * 1e+2); - stat.addf(fmt::format("Network {}: tx_usage", index), "%.2f%%", net_info.tx_usage * 1e+2); - stat.addf(fmt::format("Network {}: rx_traffic", index), "%.2f MBit/s", net_info.rx_traffic); - stat.addf(fmt::format("Network {}: tx_traffic", index), "%.2f MBit/s", net_info.tx_traffic); - stat.addf(fmt::format("Network {}: capacity", index), "%.1f MBit/s", net_info.speed); - stat.add(fmt::format("Network {}: mtu", index), net_info.mtu); - stat.add(fmt::format("Network {}: rx_bytes", index), net_info.rx_bytes); - stat.add(fmt::format("Network {}: rx_errors", index), net_info.rx_errors); - stat.add(fmt::format("Network {}: tx_bytes", index), net_info.tx_bytes); - stat.add(fmt::format("Network {}: tx_errors", index), net_info.tx_errors); - stat.add(fmt::format("Network {}: collisions", index), net_info.collisions); + for (const auto & network : network_list_) { + // Skip if network is not supported + if (network.is_invalid) continue; + + level = network.is_running ? DiagStatus::OK : DiagStatus::ERROR; + + status.add(fmt::format("Network {}: status", index), usage_messages_.at(level)); + status.add(fmt::format("Network {}: interface name", index), network.interface_name); + status.addf(fmt::format("Network {}: rx_usage", index), "%.2f%%", network.rx_usage * 1e+2); + status.addf(fmt::format("Network {}: tx_usage", index), "%.2f%%", network.tx_usage * 1e+2); + status.addf(fmt::format("Network {}: rx_traffic", index), "%.2f MBit/s", network.rx_traffic); + status.addf(fmt::format("Network {}: tx_traffic", index), "%.2f MBit/s", network.tx_traffic); + status.addf(fmt::format("Network {}: capacity", index), "%.1f MBit/s", network.speed); + status.add(fmt::format("Network {}: mtu", index), network.mtu); + status.add(fmt::format("Network {}: rx_bytes", index), network.rx_bytes); + status.add(fmt::format("Network {}: rx_errors", index), network.rx_errors); + status.add(fmt::format("Network {}: tx_bytes", index), network.tx_bytes); + status.add(fmt::format("Network {}: tx_errors", index), network.tx_errors); + status.add(fmt::format("Network {}: collisions", index), network.collisions); ++index; - - interface_names.push_back(net_info.interface_name); } - // Check if specified device exists - for (const auto & device : device_params_) { - // Skip if all devices specified - if (device == "*") { - continue; - } - // Skip if device already appended - if (boost::find(interface_names, device) != interface_names.end()) { - continue; - } - - stat.add(fmt::format("Network {}: status", index), "No Such Device"); - stat.add(fmt::format("Network {}: interface name", index), device); - error_str = "no such device"; - ++index; - } - - if (!error_str.empty()) { - stat.summary(DiagStatus::ERROR, error_str); - } else { - stat.summary(whole_level, usage_dict_.at(whole_level)); - } + status.summary(DiagStatus::OK, "OK"); // Measure elapsed time since start time and report - SystemMonitorUtility::stopMeasurement(t_start, stat); + SystemMonitorUtility::stopMeasurement(t_start, status); } -void NetMonitor::checkCrcError(diagnostic_updater::DiagnosticStatusWrapper & stat) +void NetMonitor::check_crc_error(diagnostic_updater::DiagnosticStatusWrapper & status) { // Remember start time to measure elapsed time const auto t_start = SystemMonitorUtility::startMeasurement(); - if (!checkGeneralInfo(stat)) { - return; - } - int whole_level = DiagStatus::OK; int index = 0; - std::string error_str; + std::string error_message; - for (const auto & net_info : net_info_list_) { - if (!isSupportedNetwork(net_info, index, stat, error_str)) { - ++index; - continue; - } + for (const auto & network : network_list_) { + // Skip if network is not supported + if (network.is_invalid) continue; - crc_errors & crc_ers = crc_errors_[net_info.interface_name]; + CrcErrors & crc_errors = crc_errors_[network.interface_name]; unsigned int unit_rx_crc_errors = 0; - for (auto errors : crc_ers.errors_queue) { + for (auto errors : crc_errors.errors_queue) { unit_rx_crc_errors += errors; } - stat.add(fmt::format("Network {}: interface name", index), net_info.interface_name); - stat.add(fmt::format("Network {}: total rx_crc_errors", index), crc_ers.last_rx_crc_errors); - stat.add(fmt::format("Network {}: rx_crc_errors per unit time", index), unit_rx_crc_errors); + status.add(fmt::format("Network {}: interface name", index), network.interface_name); + status.add( + fmt::format("Network {}: total rx_crc_errors", index), crc_errors.last_rx_crc_errors); + status.add(fmt::format("Network {}: rx_crc_errors per unit time", index), unit_rx_crc_errors); if (unit_rx_crc_errors >= crc_error_count_threshold_) { whole_level = std::max(whole_level, static_cast(DiagStatus::WARN)); - error_str = "CRC error"; + error_message = "CRC error"; } ++index; } - if (!error_str.empty()) { - stat.summary(whole_level, error_str); + if (!error_message.empty()) { + status.summary(whole_level, error_message); } else { - stat.summary(whole_level, "OK"); + status.summary(whole_level, "OK"); } // Measure elapsed time since start time and report - SystemMonitorUtility::stopMeasurement(t_start, stat); + SystemMonitorUtility::stopMeasurement(t_start, status); } -bool NetMonitor::checkGeneralInfo(diagnostic_updater::DiagnosticStatusWrapper & stat) -{ - if (device_params_.empty()) { - stat.summary(DiagStatus::ERROR, "invalid device parameter"); - return false; - } - - if (getifaddrs_errno_ != 0) { - stat.summary(DiagStatus::ERROR, "getifaddrs error"); - stat.add("getifaddrs", strerror(getifaddrs_errno_)); - return false; - } - return true; -} - -bool NetMonitor::isSupportedNetwork( - const NetworkInfo & net_info, int index, diagnostic_updater::DiagnosticStatusWrapper & stat, - std::string & error_str) -{ - // MTU information - if (net_info.mtu_errno != 0) { - if (net_info.mtu_errno == ENOTSUP) { - stat.add(fmt::format("Network {}: status", index), "Not Supported"); - } else { - stat.add(fmt::format("Network {}: status", index), "Error"); - error_str = "ioctl error"; - } - - stat.add(fmt::format("Network {}: interface name", index), net_info.interface_name); - stat.add("ioctl(SIOCGIFMTU)", strerror(net_info.mtu_errno)); - return false; - } - - // network capacity - if (net_info.speed <= 0) { - if (net_info.ethtool_errno == ENOTSUP) { - stat.add(fmt::format("Network {}: status", index), "Not Supported"); - } else { - stat.add(fmt::format("Network {}: status", index), "Error"); - error_str = "ioctl error"; - } - - stat.add(fmt::format("Network {}: interface name", index), net_info.interface_name); - stat.add("ioctl(SIOCETHTOOL)", strerror(net_info.ethtool_errno)); - return false; - } - return true; -} - -#include // workaround for build errors - -void NetMonitor::monitorTraffic(diagnostic_updater::DiagnosticStatusWrapper & stat) +void NetMonitor::monitor_traffic(diagnostic_updater::DiagnosticStatusWrapper & status) { // Remember start time to measure elapsed time const auto t_start = SystemMonitorUtility::startMeasurement(); - // Create a new socket - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - stat.summary(DiagStatus::ERROR, "socket error"); - stat.add("socket", strerror(errno)); - return; - } - - // Specify the receiving timeouts until reporting an error - struct timeval tv; - tv.tv_sec = 10; - tv.tv_usec = 0; - int ret = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - if (ret < 0) { - stat.summary(DiagStatus::ERROR, "setsockopt error"); - stat.add("setsockopt", strerror(errno)); - close(sock); - return; - } - - // Connect the socket referred to by the file descriptor - sockaddr_in addr; - memset(&addr, 0, sizeof(sockaddr_in)); - addr.sin_family = AF_INET; - addr.sin_port = htons(traffic_reader_port_); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr)); - if (ret < 0) { - stat.summary(DiagStatus::ERROR, "connect error"); - stat.add("connect", strerror(errno)); - close(sock); - return; - } - - // Write list of devices to FD - ret = write(sock, monitor_program_.c_str(), monitor_program_.length()); - if (ret < 0) { - stat.summary(DiagStatus::ERROR, "write error"); - stat.add("write", strerror(errno)); - RCLCPP_ERROR(get_logger(), "write error"); - close(sock); - return; - } - - // Receive messages from a socket - std::string rcv_str; - char buf[16 * 1024 + 1]; - do { - ret = recv(sock, buf, sizeof(buf) - 1, 0); - if (ret < 0) { - stat.summary(DiagStatus::ERROR, "recv error"); - stat.add("recv", strerror(errno)); - close(sock); - return; - } - if (ret > 0) { - buf[ret] = '\0'; - rcv_str += std::string(buf); - } - } while (ret > 0); - - // Close the file descriptor FD - ret = close(sock); - if (ret < 0) { - stat.summary(DiagStatus::ERROR, "close error"); - stat.add("close", strerror(errno)); - return; - } - - // No data received - if (rcv_str.length() == 0) { - stat.summary(DiagStatus::ERROR, "recv error"); - stat.add("recv", "No data received"); - return; - } - - // Restore information list - TrafficReaderResult result; - try { - std::istringstream iss(rcv_str); - boost::archive::text_iarchive oa(iss); - oa >> result; - } catch (const std::exception & e) { - stat.summary(DiagStatus::ERROR, "recv error"); - stat.add("recv", e.what()); - return; - } + // Get result of nethogs + traffic_reader_service::Result result; + get_nethogs_result(result); // traffic_reader result to output - if (result.error_code_ != 0) { - stat.summary(DiagStatus::ERROR, "traffic_reader error"); - stat.add("error", result.str_); + if (result.error_code != EXIT_SUCCESS) { + status.summary(DiagStatus::ERROR, "traffic_reader error"); + status.add("error", result.output); } else { - stat.summary(DiagStatus::OK, "OK"); + status.summary(DiagStatus::OK, "OK"); - if (result.str_.length() == 0) { - stat.add("nethogs: result", "nothing"); - } else if (nethogs_all_) { - stat.add("nethogs: all (KB/sec):", result.str_); + if (result.output.empty()) { + status.add("nethogs: result", fmt::format("No data monitored: {}", monitor_program_)); } else { - std::stringstream lines{result.str_}; + std::stringstream lines{result.output}; std::string line; std::vector list; - int idx = 0; + int index = 0; while (std::getline(lines, line)) { - if (line.empty()) { - continue; - } + if (line.empty()) continue; + boost::split(list, line, boost::is_any_of("\t"), boost::token_compress_on); if (list.size() >= 3) { - stat.add(fmt::format("nethogs {}: PROGRAM", idx), list[0].c_str()); - stat.add(fmt::format("nethogs {}: SENT (KB/sec)", idx), list[1].c_str()); - stat.add(fmt::format("nethogs {}: RECEIVED (KB/sec)", idx), list[2].c_str()); + status.add(fmt::format("nethogs {}: program", index), list[3].c_str()); + status.add(fmt::format("nethogs {}: sent (KB/s)", index), list[1].c_str()); + status.add(fmt::format("nethogs {}: received (KB/sec)", index), list[2].c_str()); } else { - stat.add(fmt::format("nethogs {}: result", idx), line); + status.add(fmt::format("nethogs {}: result", index), line); } - idx++; - } // while + ++index; + } } } // Measure elapsed time since start time and report - SystemMonitorUtility::stopMeasurement(t_start, stat); + SystemMonitorUtility::stopMeasurement(t_start, status); } -void NetMonitor::checkReassemblesFailed(diagnostic_updater::DiagnosticStatusWrapper & stat) +void NetMonitor::check_reassembles_failed(diagnostic_updater::DiagnosticStatusWrapper & status) { // Remember start time to measure elapsed time const auto t_start = SystemMonitorUtility::startMeasurement(); int whole_level = DiagStatus::OK; - std::string error_str; + std::string error_message; uint64_t total_reassembles_failed = 0; uint64_t unit_reassembles_failed = 0; - if (getReassemblesFailed(total_reassembles_failed)) { + if (get_reassembles_failed(total_reassembles_failed)) { reassembles_failed_queue_.push_back(total_reassembles_failed - last_reassembles_failed_); while (reassembles_failed_queue_.size() > reassembles_failed_check_duration_) { reassembles_failed_queue_.pop_front(); @@ -523,32 +292,226 @@ void NetMonitor::checkReassemblesFailed(diagnostic_updater::DiagnosticStatusWrap unit_reassembles_failed += reassembles_failed; } - stat.add(fmt::format("total packet reassembles failed"), total_reassembles_failed); - stat.add(fmt::format("packet reassembles failed per unit time"), unit_reassembles_failed); + status.add(fmt::format("total packet reassembles failed"), total_reassembles_failed); + status.add(fmt::format("packet reassembles failed per unit time"), unit_reassembles_failed); if (unit_reassembles_failed >= reassembles_failed_check_count_) { whole_level = std::max(whole_level, static_cast(DiagStatus::WARN)); - error_str = "reassembles failed"; + error_message = "reassembles failed"; } last_reassembles_failed_ = total_reassembles_failed; } else { reassembles_failed_queue_.push_back(0); whole_level = std::max(whole_level, static_cast(DiagStatus::ERROR)); - error_str = "failed to read /proc/net/snmp"; + error_message = "failed to read /proc/net/snmp"; } - if (!error_str.empty()) { - stat.summary(whole_level, error_str); + if (!error_message.empty()) { + status.summary(whole_level, error_message); } else { - stat.summary(whole_level, "OK"); + status.summary(whole_level, "OK"); } // Measure elapsed time since start time and report - SystemMonitorUtility::stopMeasurement(t_start, stat); + SystemMonitorUtility::stopMeasurement(t_start, status); } -void NetMonitor::searchReassemblesFailedColumnIndex() +void NetMonitor::on_timer() +{ + // Update list of network information + update_network_list(); +} + +void NetMonitor::shutdown_nl80211() { nl80211_.shutdown(); } + +void NetMonitor::make_invalid_diagnostic_status( + const NetworkInfomation & network, int index, + diagnostic_updater::DiagnosticStatusWrapper & status, std::string & error_message) +{ + // MTU information + if (network.mtu_error_code != 0) { + if (network.mtu_error_code == ENOTSUP) { + status.add(fmt::format("Network {}: status", index), "Not Supported"); + // Assume no error, no error message + } else { + status.add(fmt::format("Network {}: status", index), "Error"); + error_message = "ioctl error"; + } + + status.add(fmt::format("Network {}: interface name", index), network.interface_name); + status.add("ioctl(SIOCGIFMTU)", strerror(network.mtu_error_code)); + return; + } + + // network capacity + if (network.speed <= 0) { + if (network.ethtool_error_code == ENOTSUP) { + status.add(fmt::format("Network {}: status", index), "Not Supported"); + // Assume no error, no error message + } else { + status.add(fmt::format("Network {}: status", index), "Error"); + error_message = "ioctl error"; + } + + status.add(fmt::format("Network {}: interface name", index), network.interface_name); + status.add("ioctl(SIOCETHTOOL)", strerror(network.ethtool_error_code)); + } +} + +void NetMonitor::update_network_list() +{ + rclcpp::Duration duration = this->now() - last_update_time_; + + struct ifaddrs * interfaces = {}; + network_list_.clear(); + + // Get network interfaces + if (getifaddrs(&interfaces) < 0) { + getifaddrs_error_code_ = errno; + return; + } + + for (const auto * interface = interfaces; interface; interface = interface->ifa_next) { + // Skip no addr + if (!interface->ifa_addr) { + continue; + } + // Skip loopback + if (interface->ifa_flags & IFF_LOOPBACK) { + continue; + } + // Skip non AF_PACKET + if (interface->ifa_addr->sa_family != AF_PACKET) { + continue; + } + // Skip device not specified + const auto object = std::find_if( + device_params_.begin(), device_params_.end(), + [&interface](const auto & device) { return device == "*" || device == interface->ifa_name; }); + if (object == device_params_.end()) { + continue; + } + + NetworkInfomation network{}; + network.interface_name = interface->ifa_name; + network.is_running = (interface->ifa_flags & IFF_RUNNING); + + // Update network information using socket + update_network_information_by_socket(network); + + // Update network information using routing netlink stats + update_network_information_by_routing_netlink(network, interface->ifa_data, duration); + + network_list_.emplace_back(network); + } + + freeifaddrs(interfaces); + + last_update_time_ = this->now(); +} + +void NetMonitor::update_network_information_by_socket(NetworkInfomation & network) +{ + // Get MTU information + int fd = socket(AF_INET, SOCK_DGRAM, 0); + + // Update MTU information + update_mtu(network, fd); + + // Update network capacity + update_network_capacity(network, fd); + + close(fd); +} + +void NetMonitor::update_mtu(NetworkInfomation & network, int socket) +{ + struct ifreq request = {}; + + // NOLINTNEXTLINE [cppcoreguidelines-pro-type-union-access] + strncpy(request.ifr_name, network.interface_name.c_str(), IFNAMSIZ - 1); + if (ioctl(socket, SIOCGIFMTU, &request) < 0) { + network.is_invalid = true; + network.mtu_error_code = errno; + return; + } + + network.mtu = request.ifr_mtu; // NOLINT [cppcoreguidelines-pro-type-union-access] +} + +void NetMonitor::update_network_capacity(NetworkInfomation & network, int socket) +{ + struct ifreq request = {}; + struct ethtool_cmd ether_request = {}; + + // NOLINTNEXTLINE [cppcoreguidelines-pro-type-union-access] + strncpy(request.ifr_name, network.interface_name.c_str(), IFNAMSIZ - 1); + request.ifr_data = (caddr_t)ðer_request; // NOLINT [cppcoreguidelines-pro-type-cstyle-cast] + + ether_request.cmd = ETHTOOL_GSET; + if (ioctl(socket, SIOCETHTOOL, &request) >= 0) { + network.speed = ether_request.speed; + return; + } + + // Possibly wireless connection, get bitrate(MBit/s) + float ret = nl80211_.getBitrate(network.interface_name.c_str()); + if (ret <= 0) { + network.is_invalid = true; + network.ethtool_error_code = errno; + } else { + network.speed = ret; + } +} + +void NetMonitor::update_network_information_by_routing_netlink( + NetworkInfomation & network, void * data, const rclcpp::Duration & duration) +{ + auto * stats = static_cast(data); + + update_traffic(network, stats, duration); + + update_crc_error(network, stats); +} + +void NetMonitor::update_traffic( + NetworkInfomation & network, const struct rtnl_link_stats * stats, + const rclcpp::Duration & duration) +{ + network.rx_bytes = stats->rx_bytes; + network.rx_errors = stats->rx_errors; + network.tx_bytes = stats->tx_bytes; + network.tx_errors = stats->tx_errors; + network.collisions = stats->collisions; + + // Calculate traffic and usage if interface is entried in bytes + const auto bytes_entry = bytes_.find(network.interface_name); + if (bytes_entry != bytes_.end()) { + network.rx_traffic = + to_mbit(stats->rx_bytes - bytes_entry->second.rx_bytes) / duration.seconds(); + network.tx_traffic = + to_mbit(stats->tx_bytes - bytes_entry->second.tx_bytes) / duration.seconds(); + network.rx_usage = network.rx_traffic / network.speed; + network.tx_usage = network.tx_traffic / network.speed; + } + + bytes_[network.interface_name].rx_bytes = stats->rx_bytes; + bytes_[network.interface_name].tx_bytes = stats->tx_bytes; +} + +void NetMonitor::update_crc_error(NetworkInfomation & network, const struct rtnl_link_stats * stats) +{ + // Get the count of CRC errors + CrcErrors & crc_errors = crc_errors_[network.interface_name]; + crc_errors.errors_queue.push_back(stats->rx_crc_errors - crc_errors.last_rx_crc_errors); + while (crc_errors.errors_queue.size() > crc_error_check_duration_) { + crc_errors.errors_queue.pop_front(); + } + crc_errors.last_rx_crc_errors = stats->rx_crc_errors; +} + +void NetMonitor::get_reassembles_failed_column_index() { std::ifstream ifs("/proc/net/snmp"); if (!ifs) { @@ -556,45 +519,43 @@ void NetMonitor::searchReassemblesFailedColumnIndex() return; } - // /proc/net/snmp - // Ip: Forwarding DefaultTTL InReceives ... ReasmTimeout ReasmReqds ReasmOKs ReasmFails ... - // Ip: 2 64 5636471397 ... 135 2303339 216166 270 ... - std::string line; - // Find column index of 'ReasmFails' + std::string line; if (!std::getline(ifs, line)) { - RCLCPP_WARN(get_logger(), "Failed to get /proc/net/snmp first line."); + RCLCPP_WARN(get_logger(), "Failed to get header of /proc/net/snmp."); return; } - std::vector title_list; - boost::split(title_list, line, boost::is_space()); + // /proc/net/snmp + // Ip: Forwarding DefaultTTL InReceives ... ReasmTimeout ReasmReqds ReasmOKs ReasmFails ... + // Ip: 2 64 5636471397 ... 135 2303339 216166 270 .. + std::vector header_list; + boost::split(header_list, line, boost::is_space()); - if (title_list.empty()) { - RCLCPP_WARN(get_logger(), "/proc/net/snmp first line is empty."); + if (header_list.empty()) { + RCLCPP_WARN(get_logger(), "Failed to get header list of /proc/net/snmp."); return; } - if (title_list[0] != "Ip:") { + if (header_list[0] != "Ip:") { RCLCPP_WARN( - get_logger(), "/proc/net/snmp line title column is invalid. : %s", title_list[0].c_str()); + get_logger(), "Header column is invalid in /proc/net/snmp. %s", header_list[0].c_str()); return; } int index = 0; - for (auto itr = title_list.begin(); itr != title_list.end(); ++itr, ++index) { - if (*itr == "ReasmFails") { + for (const auto & header : header_list) { + if (header == "ReasmFails") { reassembles_failed_column_index_ = index; break; } + ++index; } } -bool NetMonitor::getReassemblesFailed(uint64_t & reassembles_failed) +bool NetMonitor::get_reassembles_failed(uint64_t & reassembles_failed) { if (reassembles_failed_column_index_ == 0) { - RCLCPP_WARN( - get_logger(), "reassembles failed column index is invalid. : %d", - reassembles_failed_column_index_); + RCLCPP_WARN(get_logger(), "Column index is invalid. : %d", reassembles_failed_column_index_); return false; } @@ -606,15 +567,15 @@ bool NetMonitor::getReassemblesFailed(uint64_t & reassembles_failed) std::string line; - // Skip title row + // Skip header row if (!std::getline(ifs, line)) { - RCLCPP_WARN(get_logger(), "Failed to get /proc/net/snmp first line."); + RCLCPP_WARN(get_logger(), "Failed to get header of /proc/net/snmp."); return false; } // Find a value of 'ReasmFails' if (!std::getline(ifs, line)) { - RCLCPP_WARN(get_logger(), "Failed to get /proc/net/snmp second line."); + RCLCPP_WARN(get_logger(), "Failed to get a line of /proc/net/snmp."); return false; } @@ -634,5 +595,143 @@ bool NetMonitor::getReassemblesFailed(uint64_t & reassembles_failed) return true; } +void NetMonitor::send_start_nethogs_request() +{ + // Connect to boot/shutdown service + if (!connect_service()) { + close_connection(); + return; + } + + diagnostic_updater::DiagnosticStatusWrapper stat; + std::vector interface_names; + + for (const auto & network : network_list_) { + // Skip if network is not supported + if (network.is_invalid) continue; + + interface_names.push_back(network.interface_name); + } + + // Send data to traffic-reader service + send_data_with_parameters( + traffic_reader_service::START_NETHOGS, interface_names, monitor_program_); + + // Close connection with traffic-reader service + close_connection(); +} + +void NetMonitor::get_nethogs_result(traffic_reader_service::Result & result) +{ + // Connect to traffic-reader service + if (!connect_service()) { + close_connection(); + return; + } + + // Send data to traffic-reader service + if (!send_data(traffic_reader_service::Request::GET_RESULT)) { + close_connection(); + return; + } + + // Receive data from traffic-reader service + receive_data(result); + + // Close connection with traffic-reader service + close_connection(); +} + +bool NetMonitor::connect_service() +{ + local::stream_protocol::endpoint endpoint(socket_path_); + socket_ = std::make_unique(io_service_); + + // Connect socket + boost::system::error_code error_code; + socket_->connect(endpoint, error_code); + + if (error_code) { + RCLCPP_ERROR(get_logger(), "Failed to connect socket. %s", error_code.message().c_str()); + return false; + } + + return true; +} + +bool NetMonitor::send_data(traffic_reader_service::Request request) +{ + std::ostringstream out_stream; + boost::archive::text_oarchive archive(out_stream); + archive & request; + + // Write data to socket + boost::system::error_code error_code; + socket_->write_some( + boost::asio::buffer(out_stream.str().c_str(), out_stream.str().length()), error_code); + + if (error_code) { + RCLCPP_ERROR(get_logger(), "Failed to write data to socket. %s", error_code.message().c_str()); + return false; + } + + return true; +} + +bool NetMonitor::send_data_with_parameters( + traffic_reader_service::Request request, std::vector & parameters, + std::string & program_name) +{ + std::ostringstream out_stream; + boost::archive::text_oarchive archive(out_stream); + archive & request; + archive & parameters; + archive & program_name; + + // Write data to socket + boost::system::error_code error_code; + socket_->write_some( + boost::asio::buffer(out_stream.str().c_str(), out_stream.str().length()), error_code); + + if (error_code) { + RCLCPP_ERROR(get_logger(), "Failed to write data to socket. %s", error_code.message().c_str()); + return false; + } + + return true; +} + +void NetMonitor::receive_data(traffic_reader_service::Result & result) +{ + uint8_t request_id = traffic_reader_service::Request::NONE; + + // Read data from socket + char buffer[10240]{}; + boost::system::error_code error_code; + socket_->read_some(boost::asio::buffer(buffer, sizeof(buffer)), error_code); + + if (error_code) { + RCLCPP_ERROR( + get_logger(), "Failed to read data from socket. %s\n", error_code.message().c_str()); + return; + } + + // Restore device status list + try { + std::istringstream in_stream(buffer); + boost::archive::text_iarchive archive(in_stream); + archive >> request_id; + archive >> result; + } catch (const std::exception & e) { + RCLCPP_ERROR(get_logger(), "Failed to restore message. %s\n", e.what()); + } +} + +void NetMonitor::close_connection() +{ + // Close socket + socket_->close(); +} + #include RCLCPP_COMPONENTS_REGISTER_NODE(NetMonitor)