Skip to content

Commit

Permalink
Added polling check to socket read and muiltiple read calls in order …
Browse files Browse the repository at this point in the history
…to receive all desired bytes
  • Loading branch information
shaun-edwards committed Aug 8, 2013
1 parent ddc634a commit daf3fbc
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 129 deletions.
23 changes: 22 additions & 1 deletion simple_message/include/simple_message/socket/simple_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
* in order to avoid dynamic memory allocation)
*/
static const int MAX_BUFFER_SIZE = 1024;

/**
* \brief socket read timeout (ms)
*/
static const int SOCKET_READ_TO = 1000;

/**
* \brief internal data buffer for receiving
*/
Expand All @@ -206,14 +212,29 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection
LOG_ERROR("%s, rc: %d, errno: %d", msg, rc, errno);
}

/**
* \brief polls socket for data or error
*
* \param timeout (ms) negative or zero values result in blocking
* \param ready true if ready
* \param except true if exception
*
* \return true if function DID NOT timeout (must check flags)
*/
bool poll(int timeout, bool & ready, bool & error);

/**
* \brief returns true if socket data is ready to receive
*
* \param timeout (ms) negative or zero values result in blocking
*
* \return true if data is ready to recieve
*/
bool isReadyReceive(int timeout);
bool isReadyReceive(int timeout)
{
bool r, e;
return poll(timeout, r, e);
}

// Send/Receive functions (inherited classes should override raw methods
// Virtual
Expand Down
286 changes: 163 additions & 123 deletions simple_message/src/socket/simple_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,155 +42,195 @@ using namespace industrial::shared_types;

namespace industrial
{
namespace simple_socket
{


bool SimpleSocket::sendBytes(ByteArray & buffer)
{
int rc = this->SOCKET_FAIL;
bool rtn = false;

if (this->isConnected())
namespace simple_socket
{
// Nothing restricts the ByteArray from being larger than the what the socket
// can handle.
if (this->MAX_BUFFER_SIZE > (int)buffer.getBufferSize())

bool SimpleSocket::sendBytes(ByteArray & buffer)
{

rc = rawSendBytes(buffer.getRawDataPtr(), buffer.getBufferSize());
if (this->SOCKET_FAIL != rc)
int rc = this->SOCKET_FAIL;
bool rtn = false;

if (this->isConnected())
{
rtn = true;
// Nothing restricts the ByteArray from being larger than the what the socket
// can handle.
if (this->MAX_BUFFER_SIZE > (int)buffer.getBufferSize())
{

rc = rawSendBytes(buffer.getRawDataPtr(), buffer.getBufferSize());
if (this->SOCKET_FAIL != rc)
{
rtn = true;
}
else
{
rtn = false;
logSocketError("Socket sendBytes failed", rc);
}

}
else
{
LOG_ERROR("Buffer size: %u, is greater than max socket size: %u", buffer.getBufferSize(), this->MAX_BUFFER_SIZE);
rtn = false;
}

}
else
{
rtn = false;
logSocketError("Socket sendBytes failed", rc);
LOG_WARN("Not connected, bytes not sent");
}


if (!rtn)
{
this->setConnected(false);
}
else
{
LOG_ERROR("Buffer size: %u, is greater than max socket size: %u", buffer.getBufferSize(), this->MAX_BUFFER_SIZE);
rtn = false;
}

}
else
{
rtn = false;
LOG_WARN("Not connected, bytes not sent");
}
return rtn;

if (!rtn)
{
this->setConnected(false);
}

return rtn;

}
bool SimpleSocket::receiveBytes(ByteArray & buffer, shared_int num_bytes)
{
int rc = this->SOCKET_FAIL;
bool rtn = false;
shared_int remainBytes = num_bytes;
bool ready, error;

bool SimpleSocket::receiveBytes(ByteArray & buffer, shared_int num_bytes)
{
int rc = this->SOCKET_FAIL;
bool rtn = false;

// Reset the buffer (this is not required since the buffer length should
// ensure that we don't read any of the garbage that may be left over from
// a previous read), but it is good practice.

memset(&this->buffer_, 0, sizeof(this->buffer_));

// Doing a sanity check to determine if the byte array buffer is larger than
// what can be sent in the socket. This should not happen and might be indicative
// of some code synchronization issues between the client and server base.
if (this->MAX_BUFFER_SIZE < (int)buffer.getMaxBufferSize())
{
LOG_WARN("Socket buffer max size: %u, is larger than byte array buffer: %u",
this->MAX_BUFFER_SIZE, buffer.getMaxBufferSize());
}
if (this->isConnected())
{
rc = rawReceiveBytes(this->buffer_, num_bytes);
// Reset the buffer (this is not required since the buffer length should
// ensure that we don't read any of the garbage that may be left over from
// a previous read), but it is good practice.

if (this->SOCKET_FAIL != rc)
{
if (rc > 0)
memset(&this->buffer_, 0, sizeof(this->buffer_));

// Doing a sanity check to determine if the byte array buffer is larger than
// what can be sent in the socket. This should not happen and might be indicative
// of some code synchronization issues between the client and server base.
if (this->MAX_BUFFER_SIZE < (int)buffer.getMaxBufferSize())
{
LOG_COMM("Byte array receive, bytes read: %u", rc);
buffer.init(&this->buffer_[0], rc);
rtn = true;
LOG_WARN("Socket buffer max size: %u, is larger than byte array buffer: %u",
this->MAX_BUFFER_SIZE, buffer.getMaxBufferSize());
}
if (this->isConnected())
{
buffer.init();
while (remainBytes > 0)
{
if (this->poll(this->SOCKET_READ_TO, ready, error))
{
if(ready)
{
rc = rawReceiveBytes(this->buffer_, num_bytes);
if (this->SOCKET_FAIL == rc)
{
this->logSocketError("Socket received failed", rc);
rtn = false;
break;
}
else if (0 == rc)
{
LOG_WARN("Recieved zero bytes: %u", rc);
rtn = false;
break;
}
else
{
remainBytes = num_bytes - rc;
LOG_COMM("Byte array receive, bytes read: %u, bytes left: %u",
rc, remainBytes);
buffer.load(&this->buffer_, rc);
rtn = true;
}
}
else if(error)
{
LOG_ERROR("Socket poll returned an error");
rtn = false;
break;
}
else
{
LOG_ERROR("Uknown error from socket poll");
rtn = false;
break;
}
}
else
{
LOG_WARN("Socket poll timeout, trying again");
}
}
}
else
{
LOG_WARN("Recieved zero bytes: %u", rc);
LOG_WARN("Not connected, bytes not sent");
rtn = false;
}

if (!rtn)
{
this->setConnected(false);
}
return rtn;
}
else

bool SimpleSocket::poll(int timeout, bool & ready, bool & error)
{
this->logSocketError("Socket received failed", rc);
rtn = false;
}
}
else
{
rtn = false;
LOG_WARN("Not connected, bytes not sent");
}
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;
ready = false;
error = false;

if (!rtn)
{
this->setConnected(false);
}
return rtn;
}
// The select function uses the timeval data structure
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;

bool SimpleSocket::isReadyReceive(int timeout)
{
timeval time;
fd_set read, write, except;
int rc = this->SOCKET_FAIL;
bool rtn = false;

// The select function uses the timeval data structure
time.tv_sec = timeout/1000;
time.tv_usec = (timeout%1000)*1000;

FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc)
{
if (0==rc)
{
LOG_DEBUG("Socket select timed out");
rtn = false;
}
else
{
LOG_DEBUG("Data is ready for reading");
rtn = true;
}
}
else
{
this->logSocketError("Socket select function failed", rc);
rtn = false;
}

return rtn;
}
FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&except);

FD_SET(this->getSockHandle(), &read);
FD_SET(this->getSockHandle(), &except);

rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time);

if (this->SOCKET_FAIL != rc)
{
if (0 == rc)
{
rtn = false;
}
else
{
if (FD_ISSET(this->getSockHandle(), &read))
{
ready = true;
rtn = true;
}
else if(FD_ISSET(this->getSockHandle(), &except))
{
error = true;
rtn = true;
}
else
{
LOG_WARN("Select returned, but no flags are set");
rtn = false;
}
}
}
else
{
this->logSocketError("Socket select function failed", rc);
rtn = false;
}

return rtn;
}

} //simple_socket
} //industrial
} //simple_socket
} //industrial

Loading

0 comments on commit daf3fbc

Please sign in to comment.