Skip to content

Commit

Permalink
add an optional behaviour to flush all pending data in the internal b…
Browse files Browse the repository at this point in the history
…uffer on timeout

This is required in cases where it is expected that corrupted data
would come in (i.e. corrupting medium). Otherwise, the driver might get
stuck at a broken packet, trying forever for data.

The behaviour, when enabled, is to try and read packets from the
immediately available data, discarding everything that is not a full
packet -- it would for instance discard a partial packet.
  • Loading branch information
doudou committed May 18, 2016
1 parent 178712f commit 401cec7
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 10 deletions.
61 changes: 60 additions & 1 deletion src/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ string Driver::binary_com(char const* str, size_t str_size)
Driver::Driver(int max_packet_size, bool extract_last)
: internal_buffer(new uint8_t[max_packet_size]), internal_buffer_size(0)
, MAX_PACKET_SIZE(max_packet_size)
, m_stream(0), m_auto_close(true), m_extract_last(extract_last)
, m_stream(0), m_auto_close(true), m_extract_last(extract_last)
, m_flush_on_timeout(false)
{
if(MAX_PACKET_SIZE <= 0)
std::runtime_error("Driver: max_packet_size cannot be smaller or equal to 0!");
Expand Down Expand Up @@ -118,10 +119,53 @@ void Driver::removeListener(IOListener* listener)

void Driver::clear()
{
internal_buffer_size = 0;
if (m_stream)
m_stream->clear();
}

bool Driver::getFlushOnTimeout() const
{
return m_flush_on_timeout;
}
void Driver::setFlushOnTimeout(bool flag)
{
m_flush_on_timeout = flag;
}

int Driver::flushReadBuffer(uint8_t* buffer, int bufsize)
{
do
{
try {
int packetSize = readPacket(buffer, bufsize, 0, 0, false);
return packetSize;
}
catch(TimeoutError) {
if (!internal_buffer_size)
return 0;
skipReadBytes(1);
}
} while (internal_buffer_size);
return 0;
}

void Driver::skipReadBytes(size_t byte_count)
{
if (byte_count > internal_buffer_size)
throw std::invalid_argument("skipReadBytes(): byte count provided is more than the amount of bytes in the internal read buffer");

m_stats.stamp = base::Time::now();
m_stats.bad_rx += byte_count;
internal_buffer_size -= byte_count;
memmove(internal_buffer, internal_buffer + byte_count, internal_buffer_size);
}

bool Driver::isReadBufferEmpty() const
{
return (internal_buffer_size == 0);
}

Status Driver::getStatus() const
{ return m_stats; }
void Driver::resetStatus()
Expand Down Expand Up @@ -631,6 +675,11 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size,
first_byte_timeout.toMilliseconds());
}
int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout)
{
return readPacket(buffer, buffer_size, packet_timeout, first_byte_timeout, getFlushOnTimeout());
}

int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout, bool flush_on_timeout)
{
if (first_byte_timeout > packet_timeout)
first_byte_timeout = -1;
Expand Down Expand Up @@ -670,8 +719,18 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int

// if there was no data to read _and_ packet_timeout is zero, we'll throw
if (packet_timeout == 0)
{
if (flush_on_timeout && internal_buffer_size)
{
skipReadBytes(1);
int packet_size = flushReadBuffer(buffer, buffer_size);
if (packet_size)
return packet_size;
}

throw TimeoutError(TimeoutError::FIRST_BYTE,
"readPacket(): no data to read while a packet_timeout of 0 was given");
}

int timeout;
TimeoutError::TIMEOUT_TYPE timeout_type;
Expand Down
56 changes: 56 additions & 0 deletions src/Driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class Driver
*/
bool m_extract_last;

/** Whether the driver should flush on timeout
*
* @see setFlushOnTimeout
*/
bool m_flush_on_timeout;

/** Default read timeout for readPacket
*
* @see getReadTimeout setReadTimeout readPacket
Expand Down Expand Up @@ -158,6 +164,9 @@ class Driver
void openIPServer(int port, addrinfo const& hints);
void openIPClient(std::string const& hostname, int port, addrinfo const& hints);

/** Skip that amount of bytes from the internal buffer */
void skipReadBytes(size_t byte_count);

public:
/** Creates an Driver class for a packet-based protocol
*
Expand Down Expand Up @@ -189,6 +198,46 @@ class Driver
/** Removes all data that is pending on the file descriptor */
void clear();

/** Attempt to read one packet from the internal buffer, not waiting for
* anything.
*
* This is meant to be used in a loop, to read all pending packages from the
* internal buffer, ignoring anything that is not a full packet
*
* <code>
* while ((packet_size = flushReadBuffer(buffer, bufsize)))
* {
* // do something with the packet in 'buffer'
* }
* </code>
*
* @return a packet size if there is a packet and zero if there is none.
* When zero is returned, it is guaranteed that the underlying IO has been
* cleared of any pending data and that there is nothing left in the
* internal buffer.
*/
int flushReadBuffer(uint8_t* buffer, int bufsize);

/** Tests whether the internal read buffer is empty */
bool isReadBufferEmpty() const;

/** Checks the current behaviour of the driver on timeout
*
* @see setFlushOnTimeout
*/
bool getFlushOnTimeout() const;

/** Controls the behaviour of the driver on timeout
*
* If the underlying communication medium is known to be lossy, it is
* beneficial to flush the internal buffer on timeout, that is try to
* process as much data as is available, until nothing's left. This allows
* to skip a corrupted package, while losing as little data as possible.
*
* The default is to not flush.
*/
void setFlushOnTimeout(bool flush);

/** Returns the I/O statistics
*
* Use resetStats() to set them back to 0
Expand Down Expand Up @@ -358,6 +407,13 @@ class Driver
*/
int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout = -1);

/** @overload @deprecated
*
* @arg packet_timeout in milliseconds, see readPacket for semantics
* @arg first_byte_timeout in milliseconds, see readPacket for semantics
*/
int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout, bool flush_on_timeout);

/** Tries to read a packet from the file descriptor and to save it in the
* provided buffer. +packet_timeout+ is the timeout to receive a complete
* packet. There is no infinite timeout value, and 0 is non-blocking at all
Expand Down
2 changes: 1 addition & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rock_testsuite(test_suite test.cpp
rock_testsuite(test_suite suite.cpp test.cpp
DEPS iodrivers_base)

rock_executable(test_tcp_read test_tcp_read.cpp
Expand Down
5 changes: 5 additions & 0 deletions test/suite.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Do NOT add anything to this file
// This header from boost takes ages to compile, so we make sure it is compiled
// only once (here)
#define BOOST_TEST_MAIN
#include <boost/test/unit_test.hpp>
92 changes: 84 additions & 8 deletions test/test.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MAIN
#define BOOST_TEST_MODULE "iodrivers"
#define BOOST_AUTO_TEST_MAIN
#include <boost/test/auto_unit_test.hpp>
#include <boost/test/unit_test.hpp>

#include <sys/types.h>
Expand Down Expand Up @@ -34,10 +29,12 @@ class DriverTest : public Driver
}
};

int setupDriver(Driver& driver)
template<typename DriverType>
int setupDriver(DriverType& driver)
{
int pipes[2];
pipe(pipes);
if (pipe(pipes) == -1)
throw std::runtime_error("failed to create the pipe for the test driver");
int rx = pipes[0];
int tx = pipes[1];

Expand All @@ -50,7 +47,9 @@ int setupDriver(Driver& driver)

void writeToDriver(Driver& driver, int tx, uint8_t const* data, int size)
{
write(tx, data, size);
int tx_bytes = write(tx, data, size);
if (tx_bytes != size)
throw std::runtime_error("failed to write");
}


Expand Down Expand Up @@ -310,3 +309,80 @@ BOOST_AUTO_TEST_CASE(test_hasPacket_returns_false_on_internal_buffer_with_garbag
BOOST_REQUIRE_EQUAL(4, test.readPacket(buffer, 100, 10, 1));
BOOST_REQUIRE(!test.hasPacket());
}

class FlushReadBufferTest : public Driver
{
public:
FlushReadBufferTest()
: Driver(100) {}

int extractPacket(uint8_t const* buffer, size_t buffer_size) const
{
if (buffer[0] != 0)
return -1;
int size = buffer[1];
if (buffer_size < size)
return 0;

if (buffer[size - 1] == 0)
return size;
return -1;
}
};

BOOST_AUTO_TEST_CASE(test_flushReadBuffer_skips_partial_packages_until_it_finds_one)
{
FlushReadBufferTest test;
int tx = setupDriver(test);
FileGuard tx_guard(tx);
writeToDriver(test, tx, reinterpret_cast<uint8_t const*>("\x0\xf\x0\xf\x0\x3\x0"), 7);
uint8_t buffer[100];
size_t size = test.flushReadBuffer(buffer, 100);
BOOST_REQUIRE_EQUAL(3, size);
BOOST_REQUIRE(!memcmp(buffer, "\x0\x3\x0", 3));
}

BOOST_AUTO_TEST_CASE(test_flushReadBuffer_completely_empties_the_read_buffer_if_there_is_no_packet_in_it)
{
FlushReadBufferTest test;
int tx = setupDriver(test);
FileGuard tx_guard(tx);
writeToDriver(test, tx, reinterpret_cast<uint8_t const*>("\x0\xf\x0\xf\x0\x3"), 6);
uint8_t buffer[100];
BOOST_REQUIRE_EQUAL(0, test.flushReadBuffer(buffer, 100));
BOOST_REQUIRE(test.isReadBufferEmpty());
}
BOOST_AUTO_TEST_CASE(test_flushReadBuffer_accounts_for_the_lost_bytes_as_bad_rx)
{
FlushReadBufferTest test;
int tx = setupDriver(test);
FileGuard tx_guard(tx);
writeToDriver(test, tx, reinterpret_cast<uint8_t const*>("\x0\xf\x0\xf\x0\x3\x0"), 7);
uint8_t buffer[100];
test.flushReadBuffer(buffer, 100);
BOOST_REQUIRE_EQUAL(4, test.getStats().bad_rx);
}

BOOST_AUTO_TEST_CASE(test_readPacket_will_flush_the_read_buffer_if_setFlushOnTimeout_is_set)
{
FlushReadBufferTest test;
test.setFlushOnTimeout(true);
int tx = setupDriver(test);
FileGuard tx_guard(tx);
writeToDriver(test, tx, reinterpret_cast<uint8_t const*>("\x0\xf\x0\xf\x0\x3\x0"), 7);
uint8_t buffer[100];
size_t size = test.readPacket(buffer, 100);
BOOST_REQUIRE_EQUAL(3, size);
BOOST_REQUIRE(!memcmp(buffer, "\x0\x3\x0", 3));
}

BOOST_AUTO_TEST_CASE(test_readPacket_will_throw_a_timeout_error_if_setFlushOnTimeout_is_set_and_no_package_is_available)
{
FlushReadBufferTest test;
test.setFlushOnTimeout(true);
int tx = setupDriver(test);
FileGuard tx_guard(tx);
writeToDriver(test, tx, reinterpret_cast<uint8_t const*>("\x0\xf\x0\xff sfa\x0\x3"), 10);
uint8_t buffer[100];
BOOST_REQUIRE_THROW(test.readPacket(buffer, 100), TimeoutError);
}

0 comments on commit 401cec7

Please sign in to comment.