Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce latency of YarpLoggerDevice #883

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Changelog
All notable changes to this project are documented in this file.

## [unreleased]
### Added
- Add a feature to reduce latency of `YarpLoggerDevice` (https://github.com/ami-iit/bipedal-locomotion-framework/pull/883)

### Changed
### Fixed
### Removed

## [0.19.0] - 2024-09-06
### Added
- Added Vector Collection Server for publishing information for real-time users in the YARPRobotLoggerDevice (https://github.com/ami-iit/bipedal-locomotion-framework/pull/796)
Expand Down
25 changes: 25 additions & 0 deletions devices/YarpRobotLoggerDevice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,28 @@ robot-log-visualizer
Then, you can open the mat file generated by the logger and explore the logged data as in the following video:

[robot-log-visualizer.webm](https://github.com/ami-iit/robot-log-visualizer/assets/16744101/3fd5c516-da17-4efa-b83b-392b5ce1383b)

## How to reduce latency
If you are experiencing some latency in logged data, you can try to enable different real time scheduling strategies through the `real_time_scheduling_strategy` in the [yarp-robot-logger.xml](https://github.com/ami-iit/bipedal-locomotion-framework/blob/master/devices/YarpRobotLoggerDevice/app/robots/ergoCubSN000/yarp-robot-logger.xml). For example:

```xml
<?xml version="1.0" encoding="UTF-8" ?>
<device xmlns:xi="http://www.w3.org/2001/XInclude" name="yarp-robot-logger" type="YarpRobotLoggerDevice">
<param name="robot">ergocub</param>
<param name="sampling_period_in_s">0.001</param>
<param name="real_time_scheduling_strategy">early_wakeup_and_fifo</param>
```

The available strategies are `none` (which is the default and means no strategy will be applied), `early_wakeup`, `fifo`, and `early_wakeup_and_fifo`.
The `early_wakeup` strategy makes the YarpLoggerDevice run thread wake up earlier and then busy wait until it is time to resume.
The `fifo` strategy increases the YarpLoggerDevice run thread priority and changes its scheduling policy to SCHED-FIFO.
The `early_wakeup_and_fifo` combines the `early_wakeup` and `fifo` strategies.

Note that the `fifo` and `early_wakeup_and_fifo` strategies are only available for Linux. Moreover, you should run the `YarpLoggerDevice` with elevated privileges when deploying them.
For example:

```console
sudo -E <yarprobotinterface_directory>/yarprobotinterface --config launch-yarp-robot-logger.xml
```

with `<yarprobotinterface_directory>` being the directory containing `yarprobotinterface`, that you can determine with the terminal command `which yarprobotinterface`.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver,
private:
std::chrono::nanoseconds m_previousTimestamp;
std::chrono::nanoseconds m_acceptableStep{std::chrono::nanoseconds::max()};
std::chrono::nanoseconds m_resumeTime{std::chrono::nanoseconds(0)};
std::chrono::nanoseconds m_awakeningTime{std::chrono::microseconds(500)};
bool m_firstRun{true};

using ft_t = Eigen::Matrix<double, 6, 1>;
Expand Down Expand Up @@ -171,12 +173,23 @@ class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver,
bool m_streamCartesianWrenches{false};
bool m_streamFTSensors{false};
bool m_streamTemperatureSensors{false};

enum class RealTimeSchedulingStrategy
{
None,
EarlyWakeUp,
FIFO,
EarlyWakeUpAndFIFO,
};
RealTimeSchedulingStrategy m_RealTimeSchedulingStrategy{RealTimeSchedulingStrategy::None};
std::vector<std::string> m_textLoggingSubnames;
std::vector<std::string> m_codeStatusCmdPrefixes;

std::mutex m_bufferManagerMutex;
robometry::BufferManager m_bufferManager;

bool threadInit() final;

void lookForNewLogs();
void lookForExogenousSignals();

Expand Down
136 changes: 126 additions & 10 deletions devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
#include <string>
#include <tuple>

#if defined(__linux__)
#include <pthread.h>
#include <sched.h>
#endif

#include <BipedalLocomotion/ParametersHandler/IParametersHandler.h>
#include <BipedalLocomotion/ParametersHandler/YarpImplementation.h>
#include <BipedalLocomotion/System/Clock.h>
Expand Down Expand Up @@ -124,6 +129,51 @@ YarpRobotLoggerDevice::YarpRobotLoggerDevice()

YarpRobotLoggerDevice::~YarpRobotLoggerDevice() = default;

bool YarpRobotLoggerDevice::threadInit()
{
constexpr auto logPrefix = "[YarpRobotLoggerDevice::threadInit]";

if (m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::None
|| m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::EarlyWakeUp)
{
return true;
}

// Set the scheduling policy to SCHED_FIFO and priority
#if defined(__linux__)
// Get the native handle
pthread_t native_handle = pthread_self();

// Set thread scheduling parameters
sched_param param;
param.sched_priority = 80;

// Set the scheduling policy to SCHED_FIFO and priority
int ret = pthread_setschedparam(native_handle, SCHED_FIFO, &param);
if (ret != 0)
{
log()->error("{} Failed to set scheduling policy, with error: {}", logPrefix, ret);
if (ret == EPERM)
{
log()->error("{} The calling thread does not have the appropriate privileges to set "
"the requested scheduling policy and parameters. Try to run the "
"YarpLoggerDevice with 'sudo -E'.",
logPrefix);
}
return false;
} else
{
log()->info("{} Scheduling policy set to SCHED_FIFO with priority {}",
logPrefix,
param.sched_priority);
return true;
}
#else
log()->warn("{} Real-time scheduling is not supported on this platform.", logPrefix);
#endif
return true;
}

bool YarpRobotLoggerDevice::open(yarp::os::Searchable& config)
{

Expand All @@ -146,9 +196,50 @@ bool YarpRobotLoggerDevice::open(yarp::os::Searchable& config)
log()->info("{} Real time logging not activated", logPrefix);
}

std::string realTimeSchedulingStrategy{"none"};
if (!params->getParameter("real_time_scheduling_strategy", realTimeSchedulingStrategy))
{
log()->info("{} The 'real_time_scheduling_strategy' parameter is not found. "
"YarpLoggerDevice will run without any real time strategy.",
logPrefix);
}
if (realTimeSchedulingStrategy == "none")
{
m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::None;
} else if (realTimeSchedulingStrategy == "early_wakeup")
{
m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::EarlyWakeUp;
} else if (realTimeSchedulingStrategy == "fifo")
{
m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::FIFO;
} else if (realTimeSchedulingStrategy == "early_wakeup_and_fifo")
{
m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO;
} else
{
log()->error("{} The 'real_time_scheduling_strategy' parameter is not valid. Available "
"options are 'none', 'early_wakeup', 'fifo', 'early_wakeup_and_fifo'.",
logPrefix);
return false;
}

double devicePeriod{0.01};
if (params->getParameter("sampling_period_in_s", devicePeriod))
{
if (m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::EarlyWakeUp
|| m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO)
{
if (devicePeriod > m_awakeningTime.count() * 1e-9)
{
devicePeriod = devicePeriod - m_awakeningTime.count() * 1e-9;
} else
{
log()->error("{} The sampling period is smaller than the awakening time. Cannot "
"use the 'early_wakeup' strategy.",
logPrefix);
return false;
}
}
this->setPeriod(devicePeriod);
}

Expand Down Expand Up @@ -1189,22 +1280,21 @@ void YarpRobotLoggerDevice::lookForExogenousSignals()
continue;
}

log()->info("[YarpRobotLoggerDevice::lookForExogenousSignals] Attempt to get the "
"metadata for the vectors collection signal named: {}",
log()->info("[YarpRobotLoggerDevice::lookForExogenousSignals] Attempt to get "
"the metadata for the vectors collection signal named: {}",
name);

if (!signal.client.getMetadata(signal.metadata))
{
log()->warn("[YarpRobotLoggerDevice::lookForExogenousSignals] Unable to get "
"the metadata for the signal named: {}. The exogenous signal will "
"not contain the metadata.",
log()->warn("[YarpRobotLoggerDevice::lookForExogenousSignals] Unable to "
"get the metadata for the signal named: {}. The exogenous "
"signal will not contain the metadata.",
name);
}
}
}

signal.connected = connectionDone;

}
};

Expand Down Expand Up @@ -1431,6 +1521,30 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit

void YarpRobotLoggerDevice::run()
{

bool wait = (m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::EarlyWakeUp
|| m_RealTimeSchedulingStrategy == RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO)
? true
: false;
std::chrono::nanoseconds now = BipedalLocomotion::clock().now();

while (wait)
{

if (now >= m_resumeTime)
{
wait = false;
} else
{
if ((m_resumeTime - now) > (m_awakeningTime / 2))
{
// still have time to sleep
BipedalLocomotion::clock().sleepFor(m_awakeningTime / 10);
}
}
now = BipedalLocomotion::clock().now();
}

auto logData = [this](const std::string& name, const auto& data, const double time) {
m_bufferManager.push_back(data, time, name);
std::string rtName = robotRtRootName + treeDelim + name;
Expand Down Expand Up @@ -1718,12 +1832,11 @@ void YarpRobotLoggerDevice::run()
yarp::os::Bottle* b = m_textLoggingPort.read(false);
if (b != nullptr)
{
msg = BipedalLocomotion::TextLoggingEntry::deserializeMessage(*b,
std::to_string(time));
msg = BipedalLocomotion::TextLoggingEntry::deserializeMessage(*b, std::to_string(time));
if (msg.isValid)
{
signalFullName = msg.portSystem + "::" + msg.portPrefix + "::" + msg.processName
+ "::p" + msg.processPID;
+ "::p" + msg.processPID;

// matlab does not support the character - as a key of a struct
findAndReplaceAll(signalFullName, "-", "_");
Expand All @@ -1736,7 +1849,7 @@ void YarpRobotLoggerDevice::run()
m_bufferManager.addChannel({signalFullName, {1, 1}});
m_textLogsStoredInManager.insert(signalFullName);
}
//Not using logData here because we don't want to stream the data to RT
// Not using logData here because we don't want to stream the data to RT
m_bufferManager.push_back(msg, time, signalFullName);
}
bufferportSize = m_textLoggingPort.getPendingReads();
Expand All @@ -1753,6 +1866,9 @@ void YarpRobotLoggerDevice::run()

m_previousTimestamp = t;
m_firstRun = false;

m_resumeTime = t + std::chrono::nanoseconds(static_cast<int64_t>(this->getPeriod() * 1e9))
+ m_awakeningTime;
}

bool YarpRobotLoggerDevice::saveCallback(const std::string& fileName,
Expand Down
Loading