diff --git a/src/Driver.cpp b/src/Driver.cpp index 6e0ada9..70f4562 100644 --- a/src/Driver.cpp +++ b/src/Driver.cpp @@ -44,6 +44,7 @@ using namespace std; using namespace iodrivers_base; +using base::Time; string Driver::printable_com(std::string const& str) { return printable_com(str.c_str(), str.size()); } @@ -87,7 +88,8 @@ string Driver::binary_com(char const* str, size_t str_size) Driver::Driver(int max_packet_size, bool extract_last) : internal_buffer(new uint8_t[max_packet_size]), internal_buffer_size(0) , MAX_PACKET_SIZE(max_packet_size) - , m_stream(0), m_auto_close(true), m_extract_last(extract_last) + , m_stream(0), m_auto_close(true), m_extract_last(extract_last) + , m_flush_on_timeout(false) { if(MAX_PACKET_SIZE <= 0) std::runtime_error("Driver: max_packet_size cannot be smaller or equal to 0!"); @@ -129,6 +131,48 @@ void Driver::clear() internal_buffer_size = 0; } +bool Driver::getFlushOnTimeout() const +{ + return m_flush_on_timeout; +} +void Driver::setFlushOnTimeout(bool flag) +{ + m_flush_on_timeout = flag; +} + +int Driver::flushReadBuffer(uint8_t* buffer, int bufsize) +{ + do + { + try { + int packetSize = readPacket(buffer, bufsize, Time(), Time(), false); + return packetSize; + } + catch(TimeoutError) { + if (!internal_buffer_size) + return 0; + skipReadBytes(1); + } + } while (internal_buffer_size); + return 0; +} + +void Driver::skipReadBytes(size_t byte_count) +{ + if (byte_count > internal_buffer_size) + throw std::invalid_argument("skipReadBytes(): byte count provided is more than the amount of bytes in the internal read buffer"); + + m_stats.stamp = Time::now(); + m_stats.bad_rx += byte_count; + internal_buffer_size -= byte_count; + memmove(internal_buffer, internal_buffer + byte_count, internal_buffer_size); +} + +bool Driver::isReadBufferEmpty() const +{ + return (internal_buffer_size == 0); +} + Status Driver::getStatus() const { m_stats.queued_bytes = internal_buffer_size; @@ -539,7 +583,7 @@ std::pair Driver::findPacket(uint8_t const* buffer, int buf if (m_extract_last) { - m_stats.stamp = base::Time::now(); + m_stats.stamp = Time::now(); m_stats.bad_rx += packet_start; m_stats.good_rx += packet_size; } @@ -579,7 +623,7 @@ int Driver::doPacketExtraction(uint8_t* buffer) pair packet = findPacket(internal_buffer, internal_buffer_size); if (!m_extract_last) { - m_stats.stamp = base::Time::now(); + m_stats.stamp = Time::now(); m_stats.bad_rx += packet.first - internal_buffer; m_stats.good_rx += packet.second; } @@ -670,30 +714,29 @@ bool Driver::hasPacket() const return (packet.second > 0); } -void Driver::setReadTimeout(base::Time const& timeout) +void Driver::setReadTimeout(Time const& timeout) { m_read_timeout = timeout; } -base::Time Driver::getReadTimeout() const +Time Driver::getReadTimeout() const { return m_read_timeout; } int Driver::readPacket(uint8_t* buffer, int buffer_size) { - return readPacket(buffer, buffer_size, getReadTimeout()); + return readPacket(buffer, buffer_size, getReadTimeout(), Time(), getFlushOnTimeout()); } -int Driver::readPacket(uint8_t* buffer, int buffer_size, - base::Time const& packet_timeout) +int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout) { - return readPacket(buffer, buffer_size, packet_timeout, - packet_timeout + base::Time::fromSeconds(1)); + return readPacket(buffer, buffer_size, packet_timeout, Time(), getFlushOnTimeout()); } -int Driver::readPacket(uint8_t* buffer, int buffer_size, - base::Time const& packet_timeout, base::Time const& first_byte_timeout) +int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout, Time const& first_byte_timeout) { - return readPacket(buffer, buffer_size, packet_timeout.toMilliseconds(), - first_byte_timeout.toMilliseconds()); + return readPacket(buffer, buffer_size, packet_timeout, first_byte_timeout, getFlushOnTimeout()); } -int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout) +int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout, Time const& first_byte_timeout_unvalidated, bool flush_on_timeout) { - if (first_byte_timeout > packet_timeout) - first_byte_timeout = -1; + Time first_byte_timeout; + if (first_byte_timeout_unvalidated > packet_timeout) + first_byte_timeout = Time(); + else + first_byte_timeout = first_byte_timeout_unvalidated; if (buffer_size < MAX_PACKET_SIZE) throw length_error("readPacket(): provided buffer too small (got " @@ -712,30 +755,35 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int "readPacket(): no packet in the internal buffer and no FD to read from"); } - if(!m_stream) - throw std::runtime_error("Driver::writePacket : invalid stream, did you forget to call open ?"); - Timeout time_out; bool read_something = false; while(true) { - pair read_state = readPacketInternal(buffer, buffer_size); int packet_size = read_state.first; - read_something = read_something || read_state.second; if (packet_size > 0) return packet_size; // if there was no data to read _and_ packet_timeout is zero, we'll throw - if (packet_timeout == 0) + if (packet_timeout.isNull()) + { + if (flush_on_timeout && internal_buffer_size) + { + skipReadBytes(1); + int packet_size = flushReadBuffer(buffer, buffer_size); + if (packet_size) + return packet_size; + } + throw TimeoutError(TimeoutError::FIRST_BYTE, "readPacket(): no data to read while a packet_timeout of 0 was given"); + } - int timeout; + Time timeout; TimeoutError::TIMEOUT_TYPE timeout_type; - if (first_byte_timeout != -1 && !read_something) + if (!first_byte_timeout.isNull() && !read_something) { timeout = first_byte_timeout; timeout_type = TimeoutError::FIRST_BYTE; @@ -754,11 +802,11 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int } // we still have time left to wait for arriving data. see how much - int remaining_timeout = time_out.timeLeft(timeout); + Time remaining_timeout = time_out.remaining(timeout); try { // calls select and waits until a new read can be actually performed (in the next // while-iteration) - m_stream->waitRead(base::Time::fromMicroseconds(remaining_timeout * 1000)); + m_stream->waitRead(remaining_timeout); } catch(TimeoutError& e) { @@ -770,18 +818,33 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int } } -void Driver::setWriteTimeout(base::Time const& timeout) + + + +int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout) +{ + return readPacket(buffer, buffer_size, + Time::fromMilliseconds(packet_timeout), + Time::fromMilliseconds(first_byte_timeout), + getFlushOnTimeout()); +} + +void Driver::setWriteTimeout(Time const& timeout) { m_write_timeout = timeout; } -base::Time Driver::getWriteTimeout() const +Time Driver::getWriteTimeout() const { return m_write_timeout; } bool Driver::writePacket(uint8_t const* buffer, int buffer_size) { return writePacket(buffer, buffer_size, getWriteTimeout()); } -bool Driver::writePacket(uint8_t const* buffer, int buffer_size, base::Time const& timeout) -{ return writePacket(buffer, buffer_size, timeout.toMilliseconds()); } + bool Driver::writePacket(uint8_t const* buffer, int buffer_size, int timeout) +{ + return writePacket(buffer, buffer_size, Time::fromMilliseconds(timeout)); +} + +bool Driver::writePacket(uint8_t const* buffer, int buffer_size, Time const& timeout) { if(!m_stream) throw std::runtime_error("Driver::writePacket : invalid stream, did you forget to call open ?"); @@ -795,7 +858,7 @@ bool Driver::writePacket(uint8_t const* buffer, int buffer_size, int timeout) written += c; if (written == buffer_size) { - m_stats.stamp = base::Time::now(); + m_stats.stamp = Time::now(); m_stats.tx += buffer_size; return true; } @@ -803,8 +866,7 @@ bool Driver::writePacket(uint8_t const* buffer, int buffer_size, int timeout) if (time_out.elapsed()) throw TimeoutError(TimeoutError::PACKET, "writePacket(): timeout"); - int remaining_timeout = time_out.timeLeft(); - m_stream->waitWrite(base::Time::fromMicroseconds(remaining_timeout * 1000)); + m_stream->waitWrite(time_out.remaining()); } } diff --git a/src/Driver.hpp b/src/Driver.hpp index ef0940c..da54986 100644 --- a/src/Driver.hpp +++ b/src/Driver.hpp @@ -105,6 +105,12 @@ class Driver */ bool m_extract_last; + /** Whether the driver should flush on timeout + * + * @see setFlushOnTimeout + */ + bool m_flush_on_timeout; + /** Default read timeout for readPacket * * @see getReadTimeout setReadTimeout readPacket @@ -158,6 +164,9 @@ class Driver void openIPClient(std::string const& hostname, int port, addrinfo const& hints); + /** Skip that amount of bytes from the internal buffer */ + void skipReadBytes(size_t byte_count); + public: /** Creates an Driver class for a packet-based protocol * @@ -189,6 +198,46 @@ class Driver /** Removes all data that is pending on the file descriptor */ void clear(); + /** Attempt to read one packet from the internal buffer, not waiting for + * anything. + * + * This is meant to be used in a loop, to read all pending packages from the + * internal buffer, ignoring anything that is not a full packet + * + * + * while ((packet_size = flushReadBuffer(buffer, bufsize))) + * { + * // do something with the packet in 'buffer' + * } + * + * + * @return a packet size if there is a packet and zero if there is none. + * When zero is returned, it is guaranteed that the underlying IO has been + * cleared of any pending data and that there is nothing left in the + * internal buffer. + */ + int flushReadBuffer(uint8_t* buffer, int bufsize); + + /** Tests whether the internal read buffer is empty */ + bool isReadBufferEmpty() const; + + /** Checks the current behaviour of the driver on timeout + * + * @see setFlushOnTimeout + */ + bool getFlushOnTimeout() const; + + /** Controls the behaviour of the driver on timeout + * + * If the underlying communication medium is known to be lossy, it is + * beneficial to flush the internal buffer on timeout, that is try to + * process as much data as is available, until nothing's left. This allows + * to skip a corrupted package, while losing as little data as possible. + * + * The default is to not flush. + */ + void setFlushOnTimeout(bool flush); + /** Returns the I/O statistics * * Use resetStats() to set them back to 0 @@ -361,12 +410,11 @@ class Driver */ int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout); - /** @overload @deprecated + /** @overload * - * @arg packet_timeout in milliseconds, see readPacket for semantics - * @arg first_byte_timeout in milliseconds, see readPacket for semantics + * Calls readPacket without a first byte timeout */ - int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout = -1); + int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout); /** Tries to read a packet from the file descriptor and to save it in the * provided buffer. +packet_timeout+ is the timeout to receive a complete @@ -386,7 +434,14 @@ class Driver * @throws TimeoutError on timeout or no data, and UnixError on reading problems * @returns the size of the packet */ - int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout); + int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout, bool flush_on_timeout); + + /** @overload @deprecated + * + * @arg packet_timeout in milliseconds, see readPacket for semantics + * @arg first_byte_timeout in milliseconds, see readPacket for semantics + */ + int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout = -1); /** @overload * diff --git a/src/Timeout.cpp b/src/Timeout.cpp index 74572e1..e40bcd0 100644 --- a/src/Timeout.cpp +++ b/src/Timeout.cpp @@ -2,13 +2,15 @@ using namespace iodrivers_base; -Timeout::Timeout(unsigned int timeout) - : timeout(timeout) { - gettimeofday(&start_time, 0); +Timeout::Timeout(base::Time timeout) + : timeout(timeout) + , start_time(base::Time::now()) +{ } -void Timeout::restart() { - gettimeofday(&start_time, 0); +void Timeout::restart() +{ + start_time = base::Time::now(); } bool Timeout::elapsed() const @@ -16,34 +18,43 @@ bool Timeout::elapsed() const return elapsed(timeout); } -bool Timeout::elapsed(unsigned int timeout) const +bool Timeout::elapsed(base::Time timeout) const +{ + return timeout <= (base::Time::now() - start_time); +} + +base::Time Timeout::remaining() const +{ + return remaining(timeout); +} + +base::Time Timeout::remaining(base::Time timeout) const +{ + base::Time elapsed = base::Time::now() - start_time; + if (timeout < elapsed) + return base::Time(); + else + return (timeout - elapsed); +} + +Timeout::Timeout(unsigned int timeout) + : timeout(base::Time::fromMilliseconds(timeout)) + , start_time(base::Time::now()) { - timeval current_time; - gettimeofday(¤t_time, 0); - unsigned int elapsed = - (current_time.tv_sec - start_time.tv_sec) * 1000 - + (static_cast(current_time.tv_usec) - - static_cast(start_time.tv_usec)) / 1000; - return timeout < elapsed; } unsigned int Timeout::timeLeft() const { - return timeLeft(timeout); + return remaining(timeout).toMilliseconds(); } unsigned int Timeout::timeLeft(unsigned int timeout) const { - timeval current_time; - gettimeofday(¤t_time, 0); - int elapsed = - (current_time.tv_sec - start_time.tv_sec) * 1000 - + (static_cast(current_time.tv_usec) - - static_cast(start_time.tv_usec)) / 1000; - if ((int)timeout < elapsed) - return 0; - return timeout - elapsed; + return remaining(base::Time::fromMilliseconds(timeout)).toMilliseconds(); } - +bool Timeout::elapsed(unsigned int timeout) const +{ + return elapsed(base::Time::fromMilliseconds(timeout)); +} diff --git a/src/Timeout.hpp b/src/Timeout.hpp index 813a627..33303ed 100644 --- a/src/Timeout.hpp +++ b/src/Timeout.hpp @@ -1,7 +1,7 @@ #ifndef IODRIVERS_BASE_TIMEOUT_HPP #define IODRIVERS_BASE_TIMEOUT_HPP -#include +#include namespace iodrivers_base { @@ -9,15 +9,15 @@ namespace iodrivers_base { */ class Timeout { private: - unsigned int timeout; - timeval start_time; + base::Time timeout; + base::Time start_time; public: + /** * Initializes and starts a timeout - * @param timeout time in ms */ - Timeout(unsigned int timeout = 0); + Timeout(base::Time timeout = base::Time()); /** * Restarts the timeout @@ -31,28 +31,46 @@ class Timeout { */ bool elapsed() const; + /** Returns the timeout set at construction time */ + base::Time getTimeout() const; + /** * Checks if the timeout is already elapsed. * This uses a syscall, so use sparingly and cache results * @param timeout a custom timeout * @returns true if the timeout is elapsed */ - bool elapsed(unsigned int timeout) const; + bool elapsed(base::Time timeout) const; /** * Calculates the time left for this timeout - * This uses a syscall, so use sparingly and cache results - * @returns number of milliseconds this timeout as left */ - unsigned int timeLeft() const; + base::Time remaining() const; /** - * Calculates the time left for this timeout - * This uses a syscall, so use sparingly and cache results - * @param timeout a custom timeout - * @returns number of milliseconds this timeout as left + * Calculates the time left before the given timeout expires + * + * @param timeout a custom timeout + * @returns the time left until the given timeout expires. It returns a null + * time if the timeout expired already. */ + base::Time remaining(base::Time timeout) const; + + /** @overloaded @deprecated */ + Timeout(unsigned int timeout); + + /** * @deprecated + * + * Use remaining instead + */ + unsigned int timeLeft() const; + + /** @overloaded @deprecated */ unsigned int timeLeft(unsigned int timeout) const; + + /** @overloaded @deprecated */ + bool elapsed(unsigned int timeout) const; + }; } diff --git a/test/suite.cpp b/test/suite.cpp index 43f7749..af38cdc 100644 --- a/test/suite.cpp +++ b/test/suite.cpp @@ -3,5 +3,3 @@ // only once (here) #define BOOST_TEST_MAIN #include - - diff --git a/test/test_Driver.cpp b/test/test_Driver.cpp index cb8aa78..5ec516b 100644 --- a/test/test_Driver.cpp +++ b/test/test_Driver.cpp @@ -29,7 +29,8 @@ class DriverTest : public Driver } }; -int setupDriver(Driver& driver) +template +int setupDriver(DriverType& driver) { int pipes[2]; BOOST_REQUIRE(pipe(pipes) == 0); @@ -45,7 +46,9 @@ int setupDriver(Driver& driver) void writeToDriver(Driver& driver, int tx, uint8_t const* data, int size) { - write(tx, data, size); + int tx_bytes = write(tx, data, size); + if (tx_bytes != size) + throw std::runtime_error("failed to write"); } BOOST_AUTO_TEST_SUITE(FileGuardSuite) @@ -372,4 +375,79 @@ BOOST_AUTO_TEST_CASE(test_send_from_bidirectional_udp) BOOST_REQUIRE((count == 4) && (memcmp(buffer, msg, count) == 0)); } -BOOST_AUTO_TEST_SUITE_END() +class FlushReadBufferTest : public Driver +{ +public: + FlushReadBufferTest() + : Driver(100) {} + + int extractPacket(uint8_t const* buffer, size_t buffer_size) const + { + if (buffer[0] != 0) + return -1; + int size = buffer[1]; + if (buffer_size < size) + return 0; + + if (buffer[size - 1] == 0) + return size; + return -1; + } +}; + +BOOST_AUTO_TEST_CASE(test_flushReadBuffer_skips_partial_packages_until_it_finds_one) +{ + FlushReadBufferTest test; + int tx = setupDriver(test); + FileGuard tx_guard(tx); + writeToDriver(test, tx, reinterpret_cast("\x0\xf\x0\xf\x0\x3\x0"), 7); + uint8_t buffer[100]; + size_t size = test.flushReadBuffer(buffer, 100); + BOOST_REQUIRE_EQUAL(3, size); + BOOST_REQUIRE(!memcmp(buffer, "\x0\x3\x0", 3)); +} + +BOOST_AUTO_TEST_CASE(test_flushReadBuffer_completely_empties_the_read_buffer_if_there_is_no_packet_in_it) +{ + FlushReadBufferTest test; + int tx = setupDriver(test); + FileGuard tx_guard(tx); + writeToDriver(test, tx, reinterpret_cast("\x0\xf\x0\xf\x0\x3"), 6); + uint8_t buffer[100]; + BOOST_REQUIRE_EQUAL(0, test.flushReadBuffer(buffer, 100)); + BOOST_REQUIRE(test.isReadBufferEmpty()); +} +BOOST_AUTO_TEST_CASE(test_flushReadBuffer_accounts_for_the_lost_bytes_as_bad_rx) +{ + FlushReadBufferTest test; + int tx = setupDriver(test); + FileGuard tx_guard(tx); + writeToDriver(test, tx, reinterpret_cast("\x0\xf\x0\xf\x0\x3\x0"), 7); + uint8_t buffer[100]; + test.flushReadBuffer(buffer, 100); + BOOST_REQUIRE_EQUAL(4, test.getStats().bad_rx); +} + +BOOST_AUTO_TEST_CASE(test_readPacket_will_flush_the_read_buffer_if_setFlushOnTimeout_is_set) +{ + FlushReadBufferTest test; + test.setFlushOnTimeout(true); + int tx = setupDriver(test); + FileGuard tx_guard(tx); + writeToDriver(test, tx, reinterpret_cast("\x0\xf\x0\xf\x0\x3\x0"), 7); + uint8_t buffer[100]; + size_t size = test.readPacket(buffer, 100); + BOOST_REQUIRE_EQUAL(3, size); + BOOST_REQUIRE(!memcmp(buffer, "\x0\x3\x0", 3)); +} + +BOOST_AUTO_TEST_CASE(test_readPacket_will_throw_a_timeout_error_if_setFlushOnTimeout_is_set_and_no_package_is_available) +{ + FlushReadBufferTest test; + test.setFlushOnTimeout(true); + int tx = setupDriver(test); + FileGuard tx_guard(tx); + writeToDriver(test, tx, reinterpret_cast("\x0\xf\x0\xff sfa\x0\x3"), 10); + uint8_t buffer[100]; + BOOST_REQUIRE_THROW(test.readPacket(buffer, 100), TimeoutError); +}