Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add an optional behaviour to flush all pending data in the internal b… #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 96 additions & 34 deletions src/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

using namespace std;
using namespace iodrivers_base;
using base::Time;

string Driver::printable_com(std::string const& str)
{ return printable_com(str.c_str(), str.size()); }
Expand Down Expand Up @@ -87,7 +88,8 @@ string Driver::binary_com(char const* str, size_t str_size)
Driver::Driver(int max_packet_size, bool extract_last)
: internal_buffer(new uint8_t[max_packet_size]), internal_buffer_size(0)
, MAX_PACKET_SIZE(max_packet_size)
, m_stream(0), m_auto_close(true), m_extract_last(extract_last)
, m_stream(0), m_auto_close(true), m_extract_last(extract_last)
, m_flush_on_timeout(false)
{
if(MAX_PACKET_SIZE <= 0)
std::runtime_error("Driver: max_packet_size cannot be smaller or equal to 0!");
Expand Down Expand Up @@ -129,6 +131,48 @@ void Driver::clear()
internal_buffer_size = 0;
}

bool Driver::getFlushOnTimeout() const
{
return m_flush_on_timeout;
}
void Driver::setFlushOnTimeout(bool flag)
{
m_flush_on_timeout = flag;
}

int Driver::flushReadBuffer(uint8_t* buffer, int bufsize)
{
do
{
try {
int packetSize = readPacket(buffer, bufsize, Time(), Time(), false);
return packetSize;
}
catch(TimeoutError) {
if (!internal_buffer_size)
return 0;
skipReadBytes(1);
}
} while (internal_buffer_size);
return 0;
}

void Driver::skipReadBytes(size_t byte_count)
{
if (byte_count > internal_buffer_size)
throw std::invalid_argument("skipReadBytes(): byte count provided is more than the amount of bytes in the internal read buffer");

m_stats.stamp = Time::now();
m_stats.bad_rx += byte_count;
internal_buffer_size -= byte_count;
memmove(internal_buffer, internal_buffer + byte_count, internal_buffer_size);
}

bool Driver::isReadBufferEmpty() const
{
return (internal_buffer_size == 0);
}

Status Driver::getStatus() const
{
m_stats.queued_bytes = internal_buffer_size;
Expand Down Expand Up @@ -539,7 +583,7 @@ std::pair<uint8_t const*, int> Driver::findPacket(uint8_t const* buffer, int buf

if (m_extract_last)
{
m_stats.stamp = base::Time::now();
m_stats.stamp = Time::now();
m_stats.bad_rx += packet_start;
m_stats.good_rx += packet_size;
}
Expand Down Expand Up @@ -579,7 +623,7 @@ int Driver::doPacketExtraction(uint8_t* buffer)
pair<uint8_t const*, int> packet = findPacket(internal_buffer, internal_buffer_size);
if (!m_extract_last)
{
m_stats.stamp = base::Time::now();
m_stats.stamp = Time::now();
m_stats.bad_rx += packet.first - internal_buffer;
m_stats.good_rx += packet.second;
}
Expand Down Expand Up @@ -670,30 +714,29 @@ bool Driver::hasPacket() const
return (packet.second > 0);
}

void Driver::setReadTimeout(base::Time const& timeout)
void Driver::setReadTimeout(Time const& timeout)
{ m_read_timeout = timeout; }
base::Time Driver::getReadTimeout() const
Time Driver::getReadTimeout() const
{ return m_read_timeout; }
int Driver::readPacket(uint8_t* buffer, int buffer_size)
{
return readPacket(buffer, buffer_size, getReadTimeout());
return readPacket(buffer, buffer_size, getReadTimeout(), Time(), getFlushOnTimeout());
}
int Driver::readPacket(uint8_t* buffer, int buffer_size,
base::Time const& packet_timeout)
int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout)
{
return readPacket(buffer, buffer_size, packet_timeout,
packet_timeout + base::Time::fromSeconds(1));
return readPacket(buffer, buffer_size, packet_timeout, Time(), getFlushOnTimeout());
}
int Driver::readPacket(uint8_t* buffer, int buffer_size,
base::Time const& packet_timeout, base::Time const& first_byte_timeout)
int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout, Time const& first_byte_timeout)
{
return readPacket(buffer, buffer_size, packet_timeout.toMilliseconds(),
first_byte_timeout.toMilliseconds());
return readPacket(buffer, buffer_size, packet_timeout, first_byte_timeout, getFlushOnTimeout());
}
int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout)
int Driver::readPacket(uint8_t* buffer, int buffer_size, Time const& packet_timeout, Time const& first_byte_timeout_unvalidated, bool flush_on_timeout)
{
if (first_byte_timeout > packet_timeout)
first_byte_timeout = -1;
Time first_byte_timeout;
if (first_byte_timeout_unvalidated > packet_timeout)
first_byte_timeout = Time();
else
first_byte_timeout = first_byte_timeout_unvalidated;

if (buffer_size < MAX_PACKET_SIZE)
throw length_error("readPacket(): provided buffer too small (got "
Expand All @@ -712,30 +755,35 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int
"readPacket(): no packet in the internal buffer and no FD to read from");
}

if(!m_stream)
throw std::runtime_error("Driver::writePacket : invalid stream, did you forget to call open ?");

Timeout time_out;
bool read_something = false;
while(true) {

pair<int, bool> read_state = readPacketInternal(buffer, buffer_size);

int packet_size = read_state.first;

read_something = read_something || read_state.second;

if (packet_size > 0)
return packet_size;

// if there was no data to read _and_ packet_timeout is zero, we'll throw
if (packet_timeout == 0)
if (packet_timeout.isNull())
{
if (flush_on_timeout && internal_buffer_size)
{
skipReadBytes(1);
int packet_size = flushReadBuffer(buffer, buffer_size);
if (packet_size)
return packet_size;
}

throw TimeoutError(TimeoutError::FIRST_BYTE,
"readPacket(): no data to read while a packet_timeout of 0 was given");
}

int timeout;
Time timeout;
TimeoutError::TIMEOUT_TYPE timeout_type;
if (first_byte_timeout != -1 && !read_something)
if (!first_byte_timeout.isNull() && !read_something)
{
timeout = first_byte_timeout;
timeout_type = TimeoutError::FIRST_BYTE;
Expand All @@ -754,11 +802,11 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int
}

// we still have time left to wait for arriving data. see how much
int remaining_timeout = time_out.timeLeft(timeout);
Time remaining_timeout = time_out.remaining(timeout);
try {
// calls select and waits until a new read can be actually performed (in the next
// while-iteration)
m_stream->waitRead(base::Time::fromMicroseconds(remaining_timeout * 1000));
m_stream->waitRead(remaining_timeout);
}
catch(TimeoutError& e)
{
Expand All @@ -770,18 +818,33 @@ int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int
}
}

void Driver::setWriteTimeout(base::Time const& timeout)



int Driver::readPacket(uint8_t* buffer, int buffer_size, int packet_timeout, int first_byte_timeout)
{
return readPacket(buffer, buffer_size,
Time::fromMilliseconds(packet_timeout),
Time::fromMilliseconds(first_byte_timeout),
getFlushOnTimeout());
}

void Driver::setWriteTimeout(Time const& timeout)
{ m_write_timeout = timeout; }
base::Time Driver::getWriteTimeout() const
Time Driver::getWriteTimeout() const
{ return m_write_timeout; }

bool Driver::writePacket(uint8_t const* buffer, int buffer_size)
{
return writePacket(buffer, buffer_size, getWriteTimeout());
}
bool Driver::writePacket(uint8_t const* buffer, int buffer_size, base::Time const& timeout)
{ return writePacket(buffer, buffer_size, timeout.toMilliseconds()); }

bool Driver::writePacket(uint8_t const* buffer, int buffer_size, int timeout)
{
return writePacket(buffer, buffer_size, Time::fromMilliseconds(timeout));
}

bool Driver::writePacket(uint8_t const* buffer, int buffer_size, Time const& timeout)
{
if(!m_stream)
throw std::runtime_error("Driver::writePacket : invalid stream, did you forget to call open ?");
Expand All @@ -795,16 +858,15 @@ bool Driver::writePacket(uint8_t const* buffer, int buffer_size, int timeout)
written += c;

if (written == buffer_size) {
m_stats.stamp = base::Time::now();
m_stats.stamp = Time::now();
m_stats.tx += buffer_size;
return true;
}

if (time_out.elapsed())
throw TimeoutError(TimeoutError::PACKET, "writePacket(): timeout");

int remaining_timeout = time_out.timeLeft();
m_stream->waitWrite(base::Time::fromMicroseconds(remaining_timeout * 1000));
m_stream->waitWrite(time_out.remaining());
}
}

65 changes: 60 additions & 5 deletions src/Driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class Driver
*/
bool m_extract_last;

/** Whether the driver should flush on timeout
*
* @see setFlushOnTimeout
*/
bool m_flush_on_timeout;

/** Default read timeout for readPacket
*
* @see getReadTimeout setReadTimeout readPacket
Expand Down Expand Up @@ -158,6 +164,9 @@ class Driver

void openIPClient(std::string const& hostname, int port, addrinfo const& hints);

/** Skip that amount of bytes from the internal buffer */
void skipReadBytes(size_t byte_count);

public:
/** Creates an Driver class for a packet-based protocol
*
Expand Down Expand Up @@ -189,6 +198,46 @@ class Driver
/** Removes all data that is pending on the file descriptor */
void clear();

/** Attempt to read one packet from the internal buffer, not waiting for
* anything.
*
* This is meant to be used in a loop, to read all pending packages from the
* internal buffer, ignoring anything that is not a full packet
*
* <code>
* while ((packet_size = flushReadBuffer(buffer, bufsize)))
* {
* // do something with the packet in 'buffer'
* }
* </code>
*
* @return a packet size if there is a packet and zero if there is none.
* When zero is returned, it is guaranteed that the underlying IO has been
* cleared of any pending data and that there is nothing left in the
* internal buffer.
*/
int flushReadBuffer(uint8_t* buffer, int bufsize);

/** Tests whether the internal read buffer is empty */
bool isReadBufferEmpty() const;

/** Checks the current behaviour of the driver on timeout
*
* @see setFlushOnTimeout
*/
bool getFlushOnTimeout() const;

/** Controls the behaviour of the driver on timeout
*
* If the underlying communication medium is known to be lossy, it is
* beneficial to flush the internal buffer on timeout, that is try to
* process as much data as is available, until nothing's left. This allows
* to skip a corrupted package, while losing as little data as possible.
*
* The default is to not flush.
*/
void setFlushOnTimeout(bool flush);

/** Returns the I/O statistics
*
* Use resetStats() to set them back to 0
Expand Down Expand Up @@ -361,12 +410,11 @@ class Driver
*/
int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout);

/** @overload @deprecated
/** @overload
*
* @arg packet_timeout in milliseconds, see readPacket for semantics
* @arg first_byte_timeout in milliseconds, see readPacket for semantics
* Calls readPacket without a first byte timeout
*/
int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout = -1);
int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout);

/** Tries to read a packet from the file descriptor and to save it in the
* provided buffer. +packet_timeout+ is the timeout to receive a complete
Expand All @@ -386,7 +434,14 @@ class Driver
* @throws TimeoutError on timeout or no data, and UnixError on reading problems
* @returns the size of the packet
*/
int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout);
int readPacket(uint8_t* buffer, int bufsize, base::Time const& packet_timeout, base::Time const& first_byte_timeout, bool flush_on_timeout);

/** @overload @deprecated
*
* @arg packet_timeout in milliseconds, see readPacket for semantics
* @arg first_byte_timeout in milliseconds, see readPacket for semantics
*/
int readPacket(uint8_t* buffer, int bufsize, int packet_timeout, int first_byte_timeout = -1);

/** @overload
*
Expand Down
Loading