Skip to content

Commit

Permalink
Merge pull request ros-industrial#70 from gt-ros-pkg/hydro-devel
Browse files Browse the repository at this point in the history
Fixing receiveBytes for UDP
  • Loading branch information
shaun-edwards committed Nov 19, 2014
2 parents 9ea9ee4 + 35255d7 commit ce43961
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 246 deletions.
4 changes: 3 additions & 1 deletion simple_message/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ catkin_add_gtest(utest_float64 ${UTEST_SRC_FILES})
set_target_properties(utest_float64 PROPERTIES COMPILE_DEFINITIONS "TEST_PORT_BASE=13000;FLOAT64")
target_link_libraries(utest_float64 simple_message_float64)


catkin_add_gtest(utest_udp ${UTEST_SRC_FILES})
set_target_properties(utest_udp PROPERTIES COMPILE_DEFINITIONS "TEST_PORT_BASE=15000;UDP_TEST")
target_link_libraries(utest_udp simple_message)

install(
TARGETS simple_message simple_message_bswap simple_message_float64
Expand Down
41 changes: 24 additions & 17 deletions simple_message/include/simple_message/socket/simple_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
return connected_;
}

// Internally set the state of the connection to be disconnected.
// This is needed in UDP connections to signal when a timeout has occurred
// and the connection needs to be reestablished using the handshake protocol.
virtual void setDisconnected()
{
setConnected(false);
}

/**
* \brief returns true if socket data is ready to receive
*
Expand All @@ -166,7 +174,8 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
bool isReadyReceive(int timeout)
{
bool r, e;
return poll(timeout, r, e);
rawPoll(timeout, r, e);
return r;
}

protected:
Expand Down Expand Up @@ -216,29 +225,13 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
{
this->sock_handle_ = sock_handle_;
}

virtual void setConnected(bool connected)
{
this->connected_ = connected;
}

void logSocketError(const char* msg, int rc)
{
int errno_ = errno;
LOG_ERROR("%s, rc: %d. Error: '%s' (errno: %d)", msg, rc, strerror(errno_), errno_);
}

/**
* \brief polls socket for data or error
*
* \param timeout (ms) negative or zero values result in blocking
* \param ready true if ready
* \param except true if exception
*
* \return true if function DID NOT timeout (must check flags)
*/
bool poll(int timeout, bool & ready, bool & error);

// Send/Receive functions (inherited classes should override raw methods
// Virtual
bool sendBytes(industrial::byte_array::ByteArray & buffer);
Expand All @@ -249,6 +242,20 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
industrial::shared_types::shared_int num_bytes)=0;
virtual int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes)=0;
/**
* \brief polls socket for data or error
*
* \param timeout (ms) negative or zero values result in blocking
* \param ready true if ready
* \param except true if exception
*
* \return true if function DID NOT timeout (must check flags)
*/
virtual bool rawPoll(int timeout, bool & ready, bool & error)=0;
virtual void setConnected(bool connected)
{
this->connected_ = connected;
}

};

Expand Down
1 change: 1 addition & 0 deletions simple_message/include/simple_message/socket/tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TcpSocket : public industrial::simple_socket::SimpleSocket
industrial::shared_types::shared_int num_bytes);
int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
bool rawPoll(int timeout, bool & ready, bool & error);

};

Expand Down
15 changes: 6 additions & 9 deletions simple_message/include/simple_message/socket/udp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,23 @@ class UdpSocket : public industrial::simple_socket::SimpleSocket
UdpSocket();
~UdpSocket();

// Override
// receive is overridden because the base class implementation assumed
// socket data could be read partially. UDP socket data is lost when
// only a portion of it is read. For that reason this receive method
// reads the entire data stream (assumed to be a single message).
bool receiveMsg(industrial::simple_message::SimpleMessage & message);


protected:

/**
* \brief udp socket connect handshake value
*/
static const char CONNECT_HANDSHAKE = 255;
static const char CONNECT_HANDSHAKE = 142;

char udp_read_buffer_[MAX_BUFFER_SIZE + 1];
char* udp_read_head_;
size_t udp_read_len_;

// Virtual
int rawSendBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
bool rawPoll(int timeout, bool & ready, bool & error);

};

Expand Down
58 changes: 1 addition & 57 deletions simple_message/src/socket/simple_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ namespace industrial
// Polling the socket results in an "interruptable" socket read. This
// allows Control-C to break out of a socket read. Without polling,
// a sig-term is required to kill a program in a socket read function.
if (this->poll(this->SOCKET_POLL_TO, ready, error))
if (this->rawPoll(this->SOCKET_POLL_TO, ready, error))
{
if(ready)
{
Expand Down Expand Up @@ -180,62 +180,6 @@ namespace industrial
return rtn;
}

bool SimpleSocket::poll(int timeout, bool & ready, bool & error)
{
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;
ready = false;
error = false;

// The select function uses the timeval data structure
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;

FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);
FD_SET(this->getSockHandle(), &except);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc)
{
if (0 == rc)
{
rtn = false;
}
else
{
if (FD_ISSET(this->getSockHandle(), &read))
{
ready = true;
rtn = true;
}
else if(FD_ISSET(this->getSockHandle(), &except))
{
error = true;
rtn = true;
}
else
{
LOG_WARN("Select returned, but no flags are set");
rtn = false;
}
}
}
else
{
this->logSocketError("Socket select function failed", rc);
rtn = false;
}

return rtn;
}

} //simple_socket
} //industrial

46 changes: 46 additions & 0 deletions simple_message/src/socket/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,52 @@ int TcpSocket::rawReceiveBytes(char *buffer, shared_int num_bytes)
return rc;
}

bool TcpSocket::rawPoll(int timeout, bool & ready, bool & error)
{
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;
ready = false;
error = false;

// The select function uses the timeval data structure
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;

FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);
FD_SET(this->getSockHandle(), &except);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc) {
if (0 == rc)
rtn = false;
else {
if (FD_ISSET(this->getSockHandle(), &read)) {
ready = true;
rtn = true;
}
else if(FD_ISSET(this->getSockHandle(), &except)) {
error = true;
rtn = true;
}
else {
LOG_WARN("Select returned, but no flags are set");
rtn = false;
}
}
} else {
this->logSocketError("Socket select function failed", rc);
rtn = false;
}
return rtn;
}

} //tcp_socket
} //industrial

16 changes: 10 additions & 6 deletions simple_message/src/socket/udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ bool UdpServer::makeConnect()
char sendHS = this->CONNECT_HANDSHAKE;
char recvHS = 0;
int bytesRcvd = 0;
const int timeout = 1000; // Time (ms) between handshake sends
bool rtn = false;

send.load((void*)&sendHS, sizeof(sendHS));
Expand All @@ -126,13 +127,16 @@ bool UdpServer::makeConnect()
{
ByteArray recv;
recvHS = 0;
bytesRcvd = this->rawReceiveBytes(this->buffer_, 0);

if (bytesRcvd > 0)
if (this->isReadyReceive(timeout))
{
LOG_DEBUG("UDP server received %d bytes while waiting for handshake", bytesRcvd);
recv.init(&this->buffer_[0], bytesRcvd);
recv.unload((void*)&recvHS, sizeof(recvHS));
bytesRcvd = this->rawReceiveBytes(this->buffer_, 0);

if (bytesRcvd > 0)
{
LOG_DEBUG("UDP server received %d bytes while waiting for handshake", bytesRcvd);
recv.init(&this->buffer_[0], bytesRcvd);
recv.unload((void*)&recvHS, sizeof(recvHS));
}
}

}
Expand Down
Loading

0 comments on commit ce43961

Please sign in to comment.