From daf3fbcf1b970c592b9e7bf7e2b596f0f40b7994 Mon Sep 17 00:00:00 2001 From: Shaun Edwards Date: Wed, 7 Aug 2013 22:10:31 -0500 Subject: [PATCH] Added polling check to socket read and muiltiple read calls in order to receive all desired bytes --- .../simple_message/socket/simple_socket.h | 23 +- simple_message/src/socket/simple_socket.cpp | 286 ++++++++++-------- simple_message/test/utest.cpp | 20 +- 3 files changed, 200 insertions(+), 129 deletions(-) diff --git a/simple_message/include/simple_message/socket/simple_socket.h b/simple_message/include/simple_message/socket/simple_socket.h index 61df3f59..5ab5234d 100644 --- a/simple_message/include/simple_message/socket/simple_socket.h +++ b/simple_message/include/simple_message/socket/simple_socket.h @@ -181,6 +181,12 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection * in order to avoid dynamic memory allocation) */ static const int MAX_BUFFER_SIZE = 1024; + + /** + * \brief socket read timeout (ms) + */ + static const int SOCKET_READ_TO = 1000; + /** * \brief internal data buffer for receiving */ @@ -206,6 +212,17 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection LOG_ERROR("%s, rc: %d, errno: %d", msg, rc, errno); } + /** + * \brief polls socket for data or error + * + * \param timeout (ms) negative or zero values result in blocking + * \param ready true if ready + * \param except true if exception + * + * \return true if function DID NOT timeout (must check flags) + */ + bool poll(int timeout, bool & ready, bool & error); + /** * \brief returns true if socket data is ready to receive * @@ -213,7 +230,11 @@ class SimpleSocket : public industrial::smpl_msg_connection::SmplMsgConnection * * \return true if data is ready to recieve */ - bool isReadyReceive(int timeout); + bool isReadyReceive(int timeout) + { + bool r, e; + return poll(timeout, r, e); + } // Send/Receive functions (inherited classes should override raw methods // Virtual diff --git a/simple_message/src/socket/simple_socket.cpp b/simple_message/src/socket/simple_socket.cpp index c050c674..7bb95f20 100644 --- a/simple_message/src/socket/simple_socket.cpp +++ b/simple_message/src/socket/simple_socket.cpp @@ -42,155 +42,195 @@ using namespace industrial::shared_types; namespace industrial { -namespace simple_socket -{ - - -bool SimpleSocket::sendBytes(ByteArray & buffer) -{ - int rc = this->SOCKET_FAIL; - bool rtn = false; - - if (this->isConnected()) + namespace simple_socket { - // Nothing restricts the ByteArray from being larger than the what the socket - // can handle. - if (this->MAX_BUFFER_SIZE > (int)buffer.getBufferSize()) + + bool SimpleSocket::sendBytes(ByteArray & buffer) { - - rc = rawSendBytes(buffer.getRawDataPtr(), buffer.getBufferSize()); - if (this->SOCKET_FAIL != rc) + int rc = this->SOCKET_FAIL; + bool rtn = false; + + if (this->isConnected()) { - rtn = true; + // Nothing restricts the ByteArray from being larger than the what the socket + // can handle. + if (this->MAX_BUFFER_SIZE > (int)buffer.getBufferSize()) + { + + rc = rawSendBytes(buffer.getRawDataPtr(), buffer.getBufferSize()); + if (this->SOCKET_FAIL != rc) + { + rtn = true; + } + else + { + rtn = false; + logSocketError("Socket sendBytes failed", rc); + } + + } + else + { + LOG_ERROR("Buffer size: %u, is greater than max socket size: %u", buffer.getBufferSize(), this->MAX_BUFFER_SIZE); + rtn = false; + } + } else { rtn = false; - logSocketError("Socket sendBytes failed", rc); + LOG_WARN("Not connected, bytes not sent"); } - + + if (!rtn) + { + this->setConnected(false); } - else - { - LOG_ERROR("Buffer size: %u, is greater than max socket size: %u", buffer.getBufferSize(), this->MAX_BUFFER_SIZE); - rtn = false; - } - } - else - { - rtn = false; - LOG_WARN("Not connected, bytes not sent"); - } + return rtn; - if (!rtn) - { - this->setConnected(false); } - return rtn; - -} + bool SimpleSocket::receiveBytes(ByteArray & buffer, shared_int num_bytes) + { + int rc = this->SOCKET_FAIL; + bool rtn = false; + shared_int remainBytes = num_bytes; + bool ready, error; -bool SimpleSocket::receiveBytes(ByteArray & buffer, shared_int num_bytes) -{ - int rc = this->SOCKET_FAIL; - bool rtn = false; - - // Reset the buffer (this is not required since the buffer length should - // ensure that we don't read any of the garbage that may be left over from - // a previous read), but it is good practice. - - memset(&this->buffer_, 0, sizeof(this->buffer_)); - - // Doing a sanity check to determine if the byte array buffer is larger than - // what can be sent in the socket. This should not happen and might be indicative - // of some code synchronization issues between the client and server base. - if (this->MAX_BUFFER_SIZE < (int)buffer.getMaxBufferSize()) - { - LOG_WARN("Socket buffer max size: %u, is larger than byte array buffer: %u", - this->MAX_BUFFER_SIZE, buffer.getMaxBufferSize()); - } - if (this->isConnected()) - { - rc = rawReceiveBytes(this->buffer_, num_bytes); + // Reset the buffer (this is not required since the buffer length should + // ensure that we don't read any of the garbage that may be left over from + // a previous read), but it is good practice. - if (this->SOCKET_FAIL != rc) - { - if (rc > 0) + memset(&this->buffer_, 0, sizeof(this->buffer_)); + + // Doing a sanity check to determine if the byte array buffer is larger than + // what can be sent in the socket. This should not happen and might be indicative + // of some code synchronization issues between the client and server base. + if (this->MAX_BUFFER_SIZE < (int)buffer.getMaxBufferSize()) { - LOG_COMM("Byte array receive, bytes read: %u", rc); - buffer.init(&this->buffer_[0], rc); - rtn = true; + LOG_WARN("Socket buffer max size: %u, is larger than byte array buffer: %u", + this->MAX_BUFFER_SIZE, buffer.getMaxBufferSize()); + } + if (this->isConnected()) + { + buffer.init(); + while (remainBytes > 0) + { + if (this->poll(this->SOCKET_READ_TO, ready, error)) + { + if(ready) + { + rc = rawReceiveBytes(this->buffer_, num_bytes); + if (this->SOCKET_FAIL == rc) + { + this->logSocketError("Socket received failed", rc); + rtn = false; + break; + } + else if (0 == rc) + { + LOG_WARN("Recieved zero bytes: %u", rc); + rtn = false; + break; + } + else + { + remainBytes = num_bytes - rc; + LOG_COMM("Byte array receive, bytes read: %u, bytes left: %u", + rc, remainBytes); + buffer.load(&this->buffer_, rc); + rtn = true; + } + } + else if(error) + { + LOG_ERROR("Socket poll returned an error"); + rtn = false; + break; + } + else + { + LOG_ERROR("Uknown error from socket poll"); + rtn = false; + break; + } + } + else + { + LOG_WARN("Socket poll timeout, trying again"); + } + } } else { - LOG_WARN("Recieved zero bytes: %u", rc); + LOG_WARN("Not connected, bytes not sent"); rtn = false; } + + if (!rtn) + { + this->setConnected(false); + } + return rtn; } - else + + bool SimpleSocket::poll(int timeout, bool & ready, bool & error) { - this->logSocketError("Socket received failed", rc); - rtn = false; - } - } - else - { - rtn = false; - LOG_WARN("Not connected, bytes not sent"); - } + timeval time; + fd_set read, write, except; + int rc = this->SOCKET_FAIL; + bool rtn = false; + ready = false; + error = false; - if (!rtn) - { - this->setConnected(false); - } - return rtn; -} + // The select function uses the timeval data structure + time.tv_sec = timeout / 1000; + time.tv_usec = (timeout % 1000) * 1000; -bool SimpleSocket::isReadyReceive(int timeout) -{ - timeval time; - fd_set read, write, except; - int rc = this->SOCKET_FAIL; - bool rtn = false; - - // The select function uses the timeval data structure - time.tv_sec = timeout/1000; - time.tv_usec = (timeout%1000)*1000; - - FD_ZERO(&read); - FD_ZERO(&write); - FD_ZERO(&except); - - FD_SET(this->getSockHandle(), &read); - - rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time); - - if (this->SOCKET_FAIL != rc) - { - if (0==rc) - { - LOG_DEBUG("Socket select timed out"); - rtn = false; - } - else - { - LOG_DEBUG("Data is ready for reading"); - rtn = true; - } - } - else - { - this->logSocketError("Socket select function failed", rc); - rtn = false; - } - - return rtn; -} + FD_ZERO(&read); + FD_ZERO(&write); + FD_ZERO(&except); + + FD_SET(this->getSockHandle(), &read); + FD_SET(this->getSockHandle(), &except); + rc = SELECT(this->getSockHandle() + 1, &read, &write, &except, &time); + if (this->SOCKET_FAIL != rc) + { + if (0 == rc) + { + rtn = false; + } + else + { + if (FD_ISSET(this->getSockHandle(), &read)) + { + ready = true; + rtn = true; + } + else if(FD_ISSET(this->getSockHandle(), &except)) + { + error = true; + rtn = true; + } + else + { + LOG_WARN("Select returned, but no flags are set"); + rtn = false; + } + } + } + else + { + this->logSocketError("Socket select function failed", rc); + rtn = false; + } + + return rtn; + } -} //simple_socket -} //industrial + } //simple_socket +} //industrial diff --git a/simple_message/test/utest.cpp b/simple_message/test/utest.cpp index 9e8d057b..86dc8691 100644 --- a/simple_message/test/utest.cpp +++ b/simple_message/test/utest.cpp @@ -255,7 +255,7 @@ class TestTcpServer : public TcpServer return TcpServer::receiveBytes(buffer, num_bytes); } }; -TEST(SocketSuite, issue25) +TEST(SocketSuite, read) { const int tcpPort = TEST_PORT_BASE; char ipAddr[] = "127.0.0.1"; @@ -264,7 +264,8 @@ TEST(SocketSuite, issue25) TestTcpServer tcpServer; ByteArray send, recv; shared_int DATA = 99; - shared_int RECV_BYTES = 2 * sizeof(shared_int); + shared_int TWO_INTS = 2 * sizeof(shared_int); + shared_int ONE_INTS = 1 * sizeof(shared_int); // Construct server ASSERT_TRUE(tcpServer.init(tcpPort)); @@ -277,12 +278,21 @@ TEST(SocketSuite, issue25) ASSERT_TRUE(send.load(DATA)); + // Send just right amount ASSERT_TRUE(tcpClient.sendBytes(send)); + ASSERT_TRUE(tcpClient.sendBytes(send)); + ASSERT_TRUE(tcpServer.receiveBytes(recv, TWO_INTS)); + ASSERT_EQ(TWO_INTS, recv.getBufferSize()); - // Give incorrect byte length to receive bytes - ASSERT_TRUE(tcpServer.receiveBytes(recv, RECV_BYTES)); - ASSERT_EQ(RECV_BYTES, recv.getBufferSize()); + // Send too many bytes + ASSERT_TRUE(tcpClient.sendBytes(send)); + ASSERT_TRUE(tcpClient.sendBytes(send)); + ASSERT_TRUE(tcpClient.sendBytes(send)); + ASSERT_TRUE(tcpServer.receiveBytes(recv, TWO_INTS)); + ASSERT_EQ(TWO_INTS, recv.getBufferSize()); + ASSERT_TRUE(tcpServer.receiveBytes(recv, ONE_INTS)); + ASSERT_EQ(ONE_INTS, recv.getBufferSize()); }