Skip to content

Commit

Permalink
More formal fix for UDP communication.
Browse files Browse the repository at this point in the history
This should now make UDP sockets act almost exactly like the
TCP sockets.  The primary difference I notice at the moment
is that makeConnect is necessarily blocking for UDP, while for
TCP it just connects without the need for a handshake.

Here is a list of the changes:

 - UdpSocket now has its own read buffer. For rawReceiveBytes,
     it now only does a RECV_FROM to its UDP-specific buffer when there
     is no data in it.  Then it copies whatever it can from that buffer into the
     requested buffer.  The rawPoll operation acts almost the same as the previous poll,
     except it goes ahead and returns as ready when there is data left in the internal
     buffer. Thus, we should get almost the same functionality overall as the TCP
     code.

 - Removed original "fixes" I put in. receiveBytes acts the same as originally.

 - SimpleSocket::setConnected is now public (this is the only way you can
     encourage a socket to try and connect again, if you timeout in
     cyclic communication, the server should set this to false and
     try connecting again when it gets information on the point)

 - SimpleSocket::poll is now a virtual method named rawPoll to be implemented by *Socket
     (this is due to how UDP does reads now).  TCP's poll code is identical to the original.

 - UdpServer::makeConnect doesn't hard-block waiting for the handshake message anymore.

 - I have refactored most of the utest.cpp code to make it possible to run the same
     tests for both TCP and UDP.  Most of the code is identical, except that the makeConnect
     calls have to be run differently. Since for UDP, each call is blocking until the
     handshake is completed, you must call one of them in a thread.  It seems that this
     works fine for both TCP and UDP though.

 - I have added a new test which enables the UDP_TEST macro in the utest.cpp.
  • Loading branch information
kphawkins committed Aug 21, 2014
1 parent a0f2efd commit e00a9f7
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 249 deletions.
4 changes: 3 additions & 1 deletion simple_message/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ catkin_add_gtest(utest_float64 ${UTEST_SRC_FILES})
set_target_properties(utest_float64 PROPERTIES COMPILE_DEFINITIONS "TEST_PORT_BASE=13000;FLOAT64")
target_link_libraries(utest_float64 simple_message_float64)


catkin_add_gtest(utest_udp ${UTEST_SRC_FILES})
set_target_properties(utest_udp PROPERTIES COMPILE_DEFINITIONS "TEST_PORT_BASE=15000;UDP_TEST")
target_link_libraries(utest_udp simple_message)

install(
TARGETS simple_message simple_message_bswap simple_message_float64
Expand Down
33 changes: 16 additions & 17 deletions simple_message/include/simple_message/socket/simple_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
return connected_;
}

virtual void setConnected(bool connected)
{
this->connected_ = connected;
}

/**
* \brief returns true if socket data is ready to receive
*
Expand All @@ -164,7 +169,7 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
bool isReadyReceive(int timeout)
{
bool r, e;
poll(timeout, r, e);
rawPoll(timeout, r, e);
return r;
}

Expand Down Expand Up @@ -215,29 +220,13 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
{
this->sock_handle_ = sock_handle_;
}

virtual void setConnected(bool connected)
{
this->connected_ = connected;
}

void logSocketError(const char* msg, int rc)
{
int errno_ = errno;
LOG_ERROR("%s, rc: %d. Error: '%s' (errno: %d)", msg, rc, strerror(errno_), errno_);
}

/**
* \brief polls socket for data or error
*
* \param timeout (ms) negative or zero values result in blocking
* \param ready true if ready
* \param except true if exception
*
* \return true if function DID NOT timeout (must check flags)
*/
bool poll(int timeout, bool & ready, bool & error);

// Send/Receive functions (inherited classes should override raw methods
// Virtual
bool sendBytes(industrial::byte_array::ByteArray & buffer);
Expand All @@ -248,6 +237,16 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
industrial::shared_types::shared_int num_bytes)=0;
virtual int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes)=0;
/**
* \brief polls socket for data or error
*
* \param timeout (ms) negative or zero values result in blocking
* \param ready true if ready
* \param except true if exception
*
* \return true if function DID NOT timeout (must check flags)
*/
virtual bool rawPoll(int timeout, bool & ready, bool & error)=0;

};

Expand Down
1 change: 1 addition & 0 deletions simple_message/include/simple_message/socket/tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TcpSocket : public industrial::simple_socket::SimpleSocket
industrial::shared_types::shared_int num_bytes);
int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
bool rawPoll(int timeout, bool & ready, bool & error);

};

Expand Down
15 changes: 6 additions & 9 deletions simple_message/include/simple_message/socket/udp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,23 @@ class UdpSocket : public industrial::simple_socket::SimpleSocket
UdpSocket();
~UdpSocket();

// Override
// receive is overridden because the base class implementation assumed
// socket data could be read partially. UDP socket data is lost when
// only a portion of it is read. For that reason this receive method
// reads the entire data stream (assumed to be a single message).
bool receiveMsg(industrial::simple_message::SimpleMessage & message);


protected:

/**
* \brief udp socket connect handshake value
*/
static const char CONNECT_HANDSHAKE = 255;
static const char CONNECT_HANDSHAKE = 142;

char udp_read_buffer_[MAX_BUFFER_SIZE + 1];
char* udp_read_head_;
size_t udp_read_len_;

// Virtual
int rawSendBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
int rawReceiveBytes(char *buffer,
industrial::shared_types::shared_int num_bytes);
bool rawPoll(int timeout, bool & ready, bool & error);

};

Expand Down
62 changes: 2 additions & 60 deletions simple_message/src/socket/simple_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ namespace industrial
if (this->isConnected())
{
buffer.init();
while (remainBytes > 0 || num_bytes == 0)
while (remainBytes > 0)
{
// Polling the socket results in an "interruptable" socket read. This
// allows Control-C to break out of a socket read. Without polling,
// a sig-term is required to kill a program in a socket read function.
if (this->poll(this->SOCKET_POLL_TO, ready, error))
if (this->rawPoll(0, ready, error))
{
if(ready)
{
Expand Down Expand Up @@ -165,8 +165,6 @@ namespace industrial
{
LOG_COMM("Socket poll timeout, trying again");
}
if(num_bytes == 0)
break;
}
}
else
Expand All @@ -182,62 +180,6 @@ namespace industrial
return rtn;
}

bool SimpleSocket::poll(int timeout, bool & ready, bool & error)
{
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;
ready = false;
error = false;

// The select function uses the timeval data structure
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;

FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);
FD_SET(this->getSockHandle(), &except);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc)
{
if (0 == rc)
{
rtn = false;
}
else
{
if (FD_ISSET(this->getSockHandle(), &read))
{
ready = true;
rtn = true;
}
else if(FD_ISSET(this->getSockHandle(), &except))
{
error = true;
rtn = true;
}
else
{
LOG_WARN("Select returned, but no flags are set");
rtn = false;
}
}
}
else
{
this->logSocketError("Socket select function failed", rc);
rtn = false;
}

return rtn;
}

} //simple_socket
} //industrial

46 changes: 46 additions & 0 deletions simple_message/src/socket/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,52 @@ int TcpSocket::rawReceiveBytes(char *buffer, shared_int num_bytes)
return rc;
}

bool TcpSocket::rawPoll(int timeout, bool & ready, bool & error)
{
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;
ready = false;
error = false;

// The select function uses the timeval data structure
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;

FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);
FD_SET(this->getSockHandle(), &except);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc) {
if (0 == rc)
rtn = false;
else {
if (FD_ISSET(this->getSockHandle(), &read)) {
ready = true;
rtn = true;
}
else if(FD_ISSET(this->getSockHandle(), &except)) {
error = true;
rtn = true;
}
else {
LOG_WARN("Select returned, but no flags are set");
rtn = false;
}
}
} else {
this->logSocketError("Socket select function failed", rc);
rtn = false;
}
return rtn;
}

} //tcp_socket
} //industrial

16 changes: 10 additions & 6 deletions simple_message/src/socket/udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ bool UdpServer::makeConnect()
char sendHS = this->CONNECT_HANDSHAKE;
char recvHS = 0;
int bytesRcvd = 0;
const int timeout = 1000; // Time (ms) between handshake sends
bool rtn = false;

send.load((void*)&sendHS, sizeof(sendHS));
Expand All @@ -126,13 +127,16 @@ bool UdpServer::makeConnect()
{
ByteArray recv;
recvHS = 0;
bytesRcvd = this->rawReceiveBytes(this->buffer_, 0);

if (bytesRcvd > 0)
if (this->isReadyReceive(timeout))
{
LOG_DEBUG("UDP server received %d bytes while waiting for handshake", bytesRcvd);
recv.init(&this->buffer_[0], bytesRcvd);
recv.unload((void*)&recvHS, sizeof(recvHS));
bytesRcvd = this->rawReceiveBytes(this->buffer_, 0);

if (bytesRcvd > 0)
{
LOG_DEBUG("UDP server received %d bytes while waiting for handshake", bytesRcvd);
recv.init(&this->buffer_[0], bytesRcvd);
recv.unload((void*)&recvHS, sizeof(recvHS));
}
}

}
Expand Down
Loading

0 comments on commit e00a9f7

Please sign in to comment.