-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronous Outlet for zero-copying socket writing #153
Conversation
Disabling the |
Note to self: disable synchronous-write-mode when data format is string or when byte order is not default. |
Thanks @tstenner for pointing out that the |
bad96d6
to
08024cc
Compare
d66ef0c
to
6cf0600
Compare
e59ffc3
to
d40eacd
Compare
d40eacd
to
6b14a1f
Compare
examples/SendData.cpp
Outdated
else { | ||
// Advanced: Push set of discontiguous buffers. | ||
array<float *, 2> bufs = {sample.data(), extra.data()}; | ||
outlet.push_numeric_bufs(reinterpret_cast<const char **>(const_cast<const float**>(bufs.data())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new push_numeric_bufs
should be part of a separate PR; as it is it's easy to misuse the API / provoke undefined behavior and the casts should be handled more safely by the C++ API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree, but unfortunately I need this functionality from pure C. So it was either this, or add 14 functions to the C API. We can do that eventually, after all the inner workings are settled on.
@@ -170,4 +207,10 @@ template void stream_outlet_impl::enqueue<float>(const float *data, double, bool | |||
template void stream_outlet_impl::enqueue<double>(const double *data, double, bool); | |||
template void stream_outlet_impl::enqueue<std::string>(const std::string *data, double, bool); | |||
|
|||
void stream_outlet_impl::enqueue_sync_multi(std::vector<asio::const_buffer> buffs, double timestamp, bool pushthrough) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: remove enqueue_sync
and make this a template that takes a pair of asio::const_buffer
iterators/pointers
src/tcp_server.cpp
Outdated
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> buffs) { | ||
std::lock_guard<std::recursive_mutex> lock(inflight_mut_); | ||
std::size_t bytes_sent = 0; | ||
for (const auto &x : inflight_ready_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This waits for each preceding outlet to acknowledge that the complete data has been received before transmitting data to the next outlet. Suggestion: keep this code when there's only one inlet, start async writes to all inlets and wait for all writes to succeed before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know of an easy way to do this without semaphores (C++20). Do you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sync
outlets you don't have a data transfer thread, so the calling thread is responsible for performing the IO, i.e. call send()
or send_async()
+io_context::run()
. In pseudocode:
if consumers.size == 0:
return
if consumers.size() == 1:
res = consumers[0].send(data)
if res != ok:
handle_error()
else:
io_ctx.restart()
for consumer in consumers:
consumer.send_async(data, [](err_t err) {
if res != ok:
handle_error()
})
steady_timer timer;
timer.wait(3s, [](err_t err) { if err != ok: … });
io_ctx.run();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming here that consumers
is a tcp::socket
, unfortunately socket.send(...)
often fails to send the entire set of buffers. I was getting garbled data until I switched to asio::write(socket, bufs)
.
I don't know about socket.send_async
vs asio::async_write
. I hope the latter can successfully send everything (eventually) without having to monitor bytes_sent and re-entering the send condition.
I don't know if that changes the interaction with io_ctx
. I'll give it a try when I get back to this. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right about socket.send_async()
, so that would need to be asio::async_write()
and also asio::write()
. It doesn't change anything for the io_ctx
, as asio::(async_)write
just keeps track of the data already sent and starts a new send operation if it isn't finished yet.
incorporate other suggestions from tstenner. Currently not working properly!
I made some changes based on your suggestions. However, the multiple-client code is probably still broken. I had to deprioritize that due to time constraints. I hope to get back to this soon. |
I discovered that |
Closing in favour of #170 |
This implements a code path that allows for synchronous (blocked) calls to
push_*
by setting an outlet constructor argumentdo_async
tofalse
or0
. In this path,liblsl
performs 0 (zero) copies on the data, compared to the 2 copies performed wen doing asynchronous writing. This can have dramatic speed / CPU usage improvements when using high bandwidth data (18%->5% in one of my tests).Note that this contains some commits from the
double_buflen
branch. Currently the only relevant commit to examine is c64aeb9.Some concerns:
push_*
depends on how many consumers are added. This can lead to unpredictable user experience unless the user application callspush_*
from a non-interactive thread.transfer_samples_thread
but this did not work out. So now we have a thread doing nothing. I will try again.do_async
flag on the very newlsl_create_outlet_ex
C method, and preserved the API of the existinglsl_create_outlet
method.#include <asio/buffer.hpp>
for private memberstd::vector<asio::const_buffer> sync_buffs_
. A future revision should change this tostd::vector<const_buffer_p>
defined elsewhere.push_buffers_directly(std::vector<buffer_type> buffers, double timestamp, std::vector<std::size_t> frame_indices)
whereframe_indices
gives the indices inbuffers
that start a new frame (frame := multi-channel sample). The goal is to enable the user to push discontiguous frames (e.g., disabled channels still allocated in device buffer) without first copying to a contiguous frame. I'm not sure I actually need this.