Skip to content

Commit

Permalink
Merge pull request #1323 from ros/backports_kinetic
Browse files Browse the repository at this point in the history
changes between 1.12.12 and 1.13.6 for backporting
  • Loading branch information
dirk-thomas authored Feb 21, 2018
2 parents 078481c + dfa607b commit e2f1023
Show file tree
Hide file tree
Showing 31 changed files with 344 additions and 100 deletions.
3 changes: 3 additions & 0 deletions clients/roscpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ catkin_package(

include(CheckIncludeFiles)
include(CheckFunctionExists)
include(CheckCXXSymbolExists)

# Not everybody has <ifaddrs.h> (e.g., embedded arm-linux)
CHECK_INCLUDE_FILES(ifaddrs.h HAVE_IFADDRS_H)
# Not everybody has trunc (e.g., Windows, embedded arm-linux)
CHECK_FUNCTION_EXISTS(trunc HAVE_TRUNC)
# Not everybody has epoll (e.g., Windows, BSD, embedded arm-linux)
CHECK_CXX_SYMBOL_EXISTS(epoll_wait "sys/epoll.h" HAVE_EPOLL)

# Output test results to config.h
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/libros/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
Expand Down
16 changes: 15 additions & 1 deletion clients/roscpp/include/ros/callback_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,28 @@
#ifndef ROSCPP_CALLBACK_QUEUE_H
#define ROSCPP_CALLBACK_QUEUE_H

// check if we might need to include our own backported version boost::condition_variable
// in order to use CLOCK_MONOTONIC for the condition variable
// the include order here is important!
#ifdef BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
#include <boost/version.hpp>
#if BOOST_VERSION < 106100
// use backported version of boost condition variable, see https://svn.boost.org/trac/boost/ticket/6377
#include "boost_161_condition_variable.h"
#else // Boost version is 1.61 or greater and has the steady clock fixes
#include <boost/thread/condition_variable.hpp>
#endif
#else // !BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
#include <boost/thread/condition_variable.hpp>
#endif // BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC

#include "ros/callback_queue_interface.h"
#include "ros/time.h"
#include "common.h"

#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/tss.hpp>

#include <list>
Expand Down
10 changes: 9 additions & 1 deletion clients/roscpp/include/ros/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,26 @@ namespace ros {
typedef struct pollfd socket_pollfd;
#endif

typedef boost::shared_ptr<std::vector<socket_pollfd> > pollfd_vector_ptr;

/*****************************************************************************
** Functions
*****************************************************************************/

ROSCPP_DECL int last_socket_error();
ROSCPP_DECL const char* last_socket_error_string();
ROSCPP_DECL bool last_socket_error_is_would_block();
ROSCPP_DECL int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout);
ROSCPP_DECL int set_non_blocking(socket_fd_t &socket);
ROSCPP_DECL int close_socket(socket_fd_t &socket);
ROSCPP_DECL int create_signal_pair(signal_fd_t signal_pair[2]);

ROSCPP_DECL int create_socket_watcher();
ROSCPP_DECL void close_socket_watcher(int fd);
ROSCPP_DECL void add_socket_to_watcher(int epfd, int fd);
ROSCPP_DECL void del_socket_from_watcher(int epfd, int fd);
ROSCPP_DECL void set_events_on_socket(int epfd, int fd, int events);

/*****************************************************************************
** Inlines - almost direct api replacements, should stay fast.
*****************************************************************************/
Expand Down
2 changes: 2 additions & 0 deletions clients/roscpp/include/ros/poll_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class ROSCPP_DECL PollSet

boost::mutex signal_mutex_;
signal_fd_t signal_pipe_[2];

int epfd_;
};

}
Expand Down
17 changes: 15 additions & 2 deletions clients/roscpp/src/libros/callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,23 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

// Make sure we use CLOCK_MONOTONIC for the condition variable wait_for if not Apple.
#ifndef __APPLE__
#define BOOST_THREAD_HAS_CONDATTR_SET_CLOCK_MONOTONIC
#endif

#include "ros/callback_queue.h"
#include "ros/assert.h"
#include <boost/scope_exit.hpp>

// check if we have really included the backported boost condition variable
// just in case someone messes with the include order...
#if BOOST_VERSION < 106100
#ifndef USING_BACKPORTED_BOOST_CONDITION_VARIABLE
#error "needs boost version >= 1.61 or the backported headers!"
#endif
#endif

namespace ros
{

Expand Down Expand Up @@ -229,7 +242,7 @@ CallbackQueue::CallOneResult CallbackQueue::callOne(ros::WallDuration timeout)
{
if (!timeout.isZero())
{
condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
}

if (callbacks_.empty())
Expand Down Expand Up @@ -305,7 +318,7 @@ void CallbackQueue::callAvailable(ros::WallDuration timeout)
{
if (!timeout.isZero())
{
condition_.timed_wait(lock, boost::posix_time::microseconds(timeout.toSec() * 1000000.0f));
condition_.wait_for(lock, boost::chrono::nanoseconds(timeout.toNSec()));
}

if (callbacks_.empty() || !enabled_)
Expand Down
1 change: 1 addition & 0 deletions clients/roscpp/src/libros/config.h.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#cmakedefine HAVE_TRUNC
#cmakedefine HAVE_IFADDRS_H
#cmakedefine HAVE_EPOLL
25 changes: 14 additions & 11 deletions clients/roscpp/src/libros/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,6 @@ void start()
}
}

char* env_ipv6 = NULL;
#ifdef _MSC_VER
_dupenv_s(&env_ipv6, NULL, "ROS_IPV6");
#else
env_ipv6 = getenv("ROS_IPV6");
#endif

bool use_ipv6 = (env_ipv6 && strcmp(env_ipv6,"on") == 0);
TransportTCP::s_use_ipv6_ = use_ipv6;
XmlRpc::XmlRpcSocket::s_use_ipv6_ = use_ipv6;

#ifdef _MSC_VER
if (env_ipv6)
{
Expand Down Expand Up @@ -430,6 +419,19 @@ void start()
}
}

void check_ipv6_environment() {
char* env_ipv6 = NULL;
#ifdef _MSC_VER
_dupenv_s(&env_ipv6, NULL, "ROS_IPV6");
#else
env_ipv6 = getenv("ROS_IPV6");
#endif

bool use_ipv6 = (env_ipv6 && strcmp(env_ipv6,"on") == 0);
TransportTCP::s_use_ipv6_ = use_ipv6;
XmlRpc::XmlRpcSocket::s_use_ipv6_ = use_ipv6;
}

void init(const M_string& remappings, const std::string& name, uint32_t options)
{
if (!g_atexit_registered)
Expand All @@ -453,6 +455,7 @@ void init(const M_string& remappings, const std::string& name, uint32_t options)
#ifndef WIN32
signal(SIGPIPE, SIG_IGN);
#endif
check_ipv6_environment();
network::init(remappings);
master::init(remappings);
// names:: namespace is initialized by this_node
Expand Down
154 changes: 142 additions & 12 deletions clients/roscpp/src/libros/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
** Includes
*****************************************************************************/

#include "config.h"

#include <ros/io.h>
#include <ros/assert.h> // don't need if we dont call the pipe functions.
#include <errno.h> // for EFAULT and co.
Expand All @@ -46,6 +48,16 @@
#include <fcntl.h> // for non-blocking configuration
#endif

#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif

/*****************************************************************************
** Macros
*****************************************************************************/

#define UNUSED(expr) do { (void)(expr); } while (0)

/*****************************************************************************
** Namespaces
*****************************************************************************/
Expand Down Expand Up @@ -87,6 +99,69 @@ bool last_socket_error_is_would_block() {
#endif
}

int create_socket_watcher() {
int epfd = -1;
#if defined(HAVE_EPOLL)
epfd = ::epoll_create1(0);
if (epfd < 0)
{
ROS_ERROR("Unable to create epoll watcher: %s", strerror(errno));
}
#endif
return epfd;
}

void close_socket_watcher(int fd) {
if (fd >= 0)
::close(fd);
}

void add_socket_to_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = 0;
ev.data.fd = fd;

if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
{
ROS_ERROR("Unable to add FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void del_socket_from_watcher(int epfd, int fd) {
#if defined(HAVE_EPOLL)
if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
{
ROS_ERROR("Unable to remove FD to epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
#endif
}

void set_events_on_socket(int epfd, int fd, int events) {
#if defined(HAVE_EPOLL)
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
{
ROS_ERROR("Unable to modify FD epoll: %s", strerror(errno));
}
#else
UNUSED(epfd);
UNUSED(fd);
UNUSED(events);
#endif
}



/*****************************************************************************
** Service Robotics/Libssh Functions
*****************************************************************************/
Expand All @@ -96,22 +171,26 @@ bool last_socket_error_is_would_block() {
* Windows doesn't have a polling function until Vista (WSAPoll) and even then
* its implementation is not supposed to be great. This works for a broader
* range of platforms and will suffice.
* @param epfd - the socket watcher to poll on.
* @param fds - the socket set (descriptor's and events) to poll for.
* @param nfds - the number of descriptors to poll for.
* @param timeout - timeout in milliseconds.
* @return int : -1 on error, 0 on timeout, +ve number of structures with received events.
* @return pollfd_vector_ptr : NULL on error, empty on timeout, a list of structures with received events.
*/
int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {
#if defined(WIN32)
fd_set readfds, writefds, exceptfds;
struct timeval tv, *ptv;
socket_fd_t max_fd;
int rc;
nfds_t i;
boost::shared_ptr<std::vector<socket_pollfd> > ofds;

UNUSED(epfd);

if (fds == NULL) {
errno = EFAULT;
return -1;
return ofds;
}

FD_ZERO (&readfds);
Expand Down Expand Up @@ -146,7 +225,7 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

if (rc == -1) {
errno = EINVAL;
return -1;
return ofds;
}
/*********************
** Setting the timeout
Expand All @@ -166,9 +245,11 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {

rc = select (max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
if (rc < 0) {
return -1;
} else if ( rc == 0 ) {
return 0;
return ofds;
}
ofds.reset(new std::vector<socket_pollfd>);
if ( rc == 0 ) {
return ofds;
}

for (rc = 0, i = 0; i < nfds; i++) {
Expand Down Expand Up @@ -213,18 +294,67 @@ int poll_sockets(socket_pollfd *fds, nfds_t nfds, int timeout) {
} else {
fds[i].revents = POLLNVAL;
}
ofds->push_back(fds[i]);
}
return rc;
return ofds;
#elif defined (HAVE_EPOLL)
UNUSED(nfds);
UNUSED(fds);
struct epoll_event ev[nfds];
pollfd_vector_ptr ofds;

int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);

if (fd_cnt < 0)
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno != EINTR) {
ROS_ERROR("Error in epoll_wait! %s", strerror(errno));
ofds.reset();
}
}
else
{
ofds.reset(new std::vector<socket_pollfd>);
for (int i = 0; i < fd_cnt; i++)
{
socket_pollfd pfd;
pfd.fd = ev[i].data.fd;
pfd.revents = ev[i].events;
ofds->push_back(pfd);
}
}
return ofds;
#else
UNUSED(epfd);
pollfd_vector_ptr ofds(new std::vector<socket_pollfd>);
// Clear the `revents` fields
for (nfds_t i = 0; i < nfds; i++)
{
fds[i].revents = 0;
}

// use an existing poll implementation
int result = poll(fds, nfds, timeout);
if ( result < 0 ) {
if ( result < 0 )
{
// EINTR means that we got interrupted by a signal, and is not an error
if(errno == EINTR) {
result = 0;
if(errno != EINTR)
{
ROS_ERROR("Error in poll! %s", strerror(errno));
ofds.reset();
}
} else {
for (nfds_t i = 0; i < nfds; i++)
{
if (fds[i].revents)
{
ofds->push_back(fds[i]);
fds[i].revents = 0;
}
}
}
return result;
return ofds;
#endif // poll_sockets functions
}
/*****************************************************************************
Expand Down
Loading

0 comments on commit e2f1023

Please sign in to comment.