Skip to content

Commit

Permalink
refactor: send larger messages properly (#455)
Browse files Browse the repository at this point in the history
Until now, the solution to ensure that even large messages are fully sent out has been to flush the connection queues after each sending of a message, which is likely an unnecessary call (with unnecessary cost) in vast majority of cases, and which may block the connection from doing other work until the large message is fully sent out. This was a rather quick, hacky workaround.

Now, after the sending the message we check whether it has been sent out fully or not. If not (outbound queues are non-empty), then we send a wake-up signal to the connection event loop. The event loop thread then fetches new sd-bus timeouts and events and will see that there are pending outbound messages to process, and will process them together with any other prospective pending events, until there is nothing to process (i.e., the outbound message has been fully dispatched).
  • Loading branch information
sangelovic committed Oct 10, 2024
1 parent 107c6a1 commit c559070
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 57 deletions.
30 changes: 21 additions & 9 deletions src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ sd_bus_message* Connection::callMethod(sd_bus_message* sdbusMsg, uint64_t timeou

SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r);

// Wake up event loop to process messages that may have arrived in the meantime...
// Wake up event loop to process messages that may have arrived in the meantime,
// or to dispatch the outbound message that hasn't yet been fully sent out.
wakeUpEventLoopIfMessagesInQueue();

return sdbusReply;
Expand All @@ -669,7 +670,8 @@ Slot Connection::callMethodAsync(sd_bus_message* sdbusMsg, sd_bus_message_handle

// An event loop may wait in poll with timeout `t1', while in another thread an async call is made with
// timeout `t2'. If `t2' < `t1', then we have to wake up the event loop thread to update its poll timeout.
if (timeoutAfter < timeoutBefore)
// We also have to wake up the event loop to process the messages that may be in the read/write queues.
if (timeoutAfter < timeoutBefore || arePendingMessagesInQueues())
notifyEventLoopToWakeUpFromPoll();

return {slot, [this](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
Expand All @@ -679,6 +681,9 @@ void Connection::sendMessage(sd_bus_message* sdbusMsg)
{
auto r = sdbus_->sd_bus_send(nullptr, sdbusMsg, nullptr);

// Wake up event loop to continue dispatching the (fairly large) outbound message that hasn't yet been fully sent
wakeUpEventLoopIfMessagesInQueue();

SDBUS_THROW_ERROR_IF(r < 0, "Failed to send D-Bus message", -r);
}

Expand Down Expand Up @@ -758,11 +763,15 @@ void Connection::notifyEventLoopToWakeUpFromPoll()

void Connection::wakeUpEventLoopIfMessagesInQueue()
{
// When doing a sync call, other D-Bus messages may have arrived, waiting in the read queue.
// We need this in two cases:
// 1. When doing a sync call, other D-Bus messages may have arrived, waiting in the read queue.
// In case an event loop is inside a poll in another thread, or an external event loop polls in the
// same thread but as an unrelated event source, then we need to wake up the poll explicitly so the
// event loop 1. processes all messages in the read queue, 2. updates poll timeout before next poll.
if (arePendingMessagesInReadQueue())
// 2. Additionally, when sending out messages, these may be too long to be sent out entirely within
// the single sd_bus_send() or sd_bus_call_async() call, in which case they are queued in the write
// queue. We need to wake up the event loop to continue sending the message until it's fully sent.
if (arePendingMessagesInQueues())
notifyEventLoopToWakeUpFromPoll();
}

Expand Down Expand Up @@ -801,6 +810,8 @@ bool Connection::waitForNextEvent()
, {loopExitFd_.fd, POLLIN, 0} };
constexpr auto fdsCount = sizeof(fds)/sizeof(fds[0]);

// Are there pending messages in the inbound queue? Then sd-bus will set timeout to 0, so poll() will wake up right away.
// Are there pending messages in the outbound queue? Then sd-bus will add POLLOUT to events, so poll() will wake up right away.
auto timeout = sdbusPollData.getPollTimeout();
auto r = poll(fds, fdsCount, timeout);

Expand All @@ -814,7 +825,7 @@ bool Connection::waitForNextEvent()
{
auto cleared = eventFd_.clear();
SDBUS_THROW_ERROR_IF(!cleared, "Failed to read from the event descriptor", -errno);
// Go poll() again, but with up-to-date timeout (which will wake poll() up right away if there are messages to process)
// Go poll() again, but with freshly calculated, up-to-date timeout and with up-to-date events to watch
return waitForNextEvent();
}
// Loop exit notification
Expand All @@ -828,14 +839,15 @@ bool Connection::waitForNextEvent()
return true;
}

bool Connection::arePendingMessagesInReadQueue() const
bool Connection::arePendingMessagesInQueues() const
{
uint64_t readQueueSize{};
uint64_t writeQueueSize{};

auto r = sdbus_->sd_bus_get_n_queued_read(bus_.get(), &readQueueSize);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get number of pending messages in read queue", -r);
auto r = sdbus_->sd_bus_get_n_queued(bus_.get(), &readQueueSize, &writeQueueSize);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get number of pending messages in sd-bus queues", -r);

return readQueueSize > 0;
return readQueueSize > 0 || writeQueueSize > 0;
}

Message Connection::getCurrentlyProcessedMessage() const
Expand Down
2 changes: 1 addition & 1 deletion src/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ namespace sdbus::internal {
void finishHandshake(sd_bus* bus);
bool waitForNextEvent();

[[nodiscard]] bool arePendingMessagesInReadQueue() const;
[[nodiscard]] bool arePendingMessagesInQueues() const;

void notifyEventLoopToExit();
void notifyEventLoopToWakeUpFromPoll();
Expand Down
2 changes: 1 addition & 1 deletion src/ISdBus.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace sdbus::internal {
virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) = 0;
virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) = 0;
virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) = 0;
virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) = 0;
virtual int sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write) = 0;
virtual int sd_bus_flush(sd_bus *bus) = 0;
virtual sd_bus *sd_bus_flush_close_unref(sd_bus *bus) = 0;
virtual sd_bus *sd_bus_close_unref(sd_bus *bus) = 0;
Expand Down
18 changes: 6 additions & 12 deletions src/SdBus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "SdBus.h"
#include <sdbus-c++/Error.h>
#include <algorithm>

namespace sdbus::internal {

Expand All @@ -52,11 +53,6 @@ int SdBus::sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie)
if (r < 0)
return r;

// Make sure long messages are not only stored in outgoing queues but also really sent out
// TODO: This is a workaround. We should not block here until everything is physically sent out.
// Refactor: if sd_bus_get_n_queued_write() > 0 then wake up event loop through event fd
::sd_bus_flush(bus != nullptr ? bus : ::sd_bus_message_get_bus(m));

return r;
}

Expand All @@ -75,11 +71,6 @@ int SdBus::sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m,
if (r < 0)
return r;

// Make sure long messages are not only stored in outgoing queues but also really sent out
// TODO: This is a workaround. We should not block here until everything is physically sent out.
// Refactor: if sd_bus_get_n_queued_write() > 0 then wake up event loop through event fd
::sd_bus_flush(bus != nullptr ? bus : ::sd_bus_message_get_bus(m));

return r;
}

Expand Down Expand Up @@ -413,11 +404,14 @@ int SdBus::sd_bus_get_poll_data(sd_bus *bus, PollData* data)
return r;
}

int SdBus::sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret)
int SdBus::sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write)
{
std::lock_guard lock(sdbusMutex_);

return ::sd_bus_get_n_queued_read(bus, ret);
auto r1 = ::sd_bus_get_n_queued_read(bus, read);
auto r2 = ::sd_bus_get_n_queued_write(bus, write);

return std::min(r1, r2);
}

int SdBus::sd_bus_flush(sd_bus *bus)
Expand Down
2 changes: 1 addition & 1 deletion src/SdBus.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SdBus final : public ISdBus
virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) override;
virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) override;
virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) override;
virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) override;
virtual int sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write) override;
virtual int sd_bus_flush(sd_bus *bus) override;
virtual sd_bus *sd_bus_flush_close_unref(sd_bus *bus) override;
virtual sd_bus *sd_bus_close_unref(sd_bus *bus) override;
Expand Down
26 changes: 25 additions & 1 deletion tests/integrationtests/DBusAsyncMethodsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TYPED_TEST(AsyncSdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTime
}
}

TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchoronousMethodAsynchronously)
TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchronousMethodAsynchronously)
{
// Yeah, this is kinda timing-dependent test, but times should be safe...
std::mutex mtx;
Expand Down Expand Up @@ -142,6 +142,19 @@ TYPED_TEST(AsyncSdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncM
ASSERT_THAT(resultCount, Eq(1500));
}

TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchronousMethodWithLargeMessage)
{
std::map<int32_t, std::string> largeMap;
for (int32_t i = 0; i < 40'000; ++i)
largeMap.emplace(i, "This is string nr. " + std::to_string(i+1));

auto result1 = this->m_proxy->doOperationAsyncWithLargeData(0, largeMap); // Sends large map back in the context of the callback (event loop thread)
auto result2 = this->m_proxy->doOperationAsyncWithLargeData(500, largeMap); // Sends large map back outside the context of the event loop thread

ASSERT_THAT(result1, Eq(largeMap));
ASSERT_THAT(result2, Eq(largeMap));
}

TYPED_TEST(AsyncSdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
{
std::promise<uint32_t> promise;
Expand Down Expand Up @@ -177,6 +190,17 @@ TYPED_TEST(AsyncSdbusTestObject, InvokesMethodAsynchronouslyOnClientSideWithFutu
ASSERT_THAT(returnValue, Eq(100));
}

TYPED_TEST(AsyncSdbusTestObject, InvokesMethodWithLargeDataAsynchronouslyOnClientSideWithFuture)
{
std::map<int32_t, std::string> largeMap;
for (int32_t i = 0; i < 40'000; ++i)
largeMap.emplace(i, "This is string nr. " + std::to_string(i+1));

auto future = this->m_proxy->doOperationWithLargeDataClientSideAsync(largeMap, sdbus::with_future);

ASSERT_THAT(future.get(), Eq(largeMap));
}

TYPED_TEST(AsyncSdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress)
{
this->m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, std::optional<sdbus::Error> /*err*/){});
Expand Down
2 changes: 1 addition & 1 deletion tests/integrationtests/DBusGeneralTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ using ADirectConnection = TestFixtureWithDirectConnection;
/* -- TEST CASES -- */
/*-------------------------------------*/

TEST(AdaptorAndProxy, CanBeConstructedSuccesfully)
TEST(AdaptorAndProxy, CanBeConstructedSuccessfully)
{
auto connection = sdbus::createBusConnection();
connection->requestName(SERVICE_NAME);
Expand Down
52 changes: 38 additions & 14 deletions tests/integrationtests/DBusMethodsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ using namespace sdbus::test;
/* -- TEST CASES -- */
/*-------------------------------------*/

TYPED_TEST(SdbusTestObject, CallsEmptyMethodSuccesfully)
TYPED_TEST(SdbusTestObject, CallsEmptyMethodSuccessfully)
{
ASSERT_NO_THROW(this->m_proxy->noArgNoReturn());
}

TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccessfully)
{
auto resInt = this->m_proxy->getInt();
ASSERT_THAT(resInt, Eq(INT32_VALUE));
Expand All @@ -68,14 +68,14 @@ TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccesfully)
ASSERT_THAT(multiplyRes, Eq(INT64_VALUE * DOUBLE_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodsWithTuplesSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodsWithTuplesSuccessfully)
{
auto resTuple = this->m_proxy->getTuple();
ASSERT_THAT(std::get<0>(resTuple), Eq(UINT32_VALUE));
ASSERT_THAT(std::get<1>(resTuple), Eq(STRING_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccessfully)
{
sdbus::Struct<uint8_t, int16_t, double, std::string, std::vector<int16_t>> a{};
auto vectorRes = this->m_proxy->getInts16FromStruct(a);
Expand All @@ -88,21 +88,21 @@ TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccesfully)
ASSERT_THAT(vectorRes, Eq(std::vector<int16_t>{INT16_VALUE, INT16_VALUE, -INT16_VALUE}));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithVariantSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithVariantSuccessfully)
{
sdbus::Variant v{DOUBLE_VALUE};
sdbus::Variant variantRes = this->m_proxy->processVariant(v);
ASSERT_THAT(variantRes.get<int32_t>(), Eq(static_cast<int32_t>(DOUBLE_VALUE)));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithStdVariantSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithStdVariantSuccessfully)
{
std::variant<int32_t, double, std::string> v{DOUBLE_VALUE};
auto variantRes = this->m_proxy->processVariant(v);
ASSERT_THAT(std::get<int32_t>(variantRes), Eq(static_cast<int32_t>(DOUBLE_VALUE)));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccessfully)
{
std::vector<int32_t> x{-2, 0, 2};
sdbus::Struct<sdbus::Variant, sdbus::Variant> y{false, true};
Expand All @@ -116,44 +116,44 @@ TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccesfully)
ASSERT_THAT(mapOfVariants[2].get<bool>(), Eq(res[2].get<bool>()));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithStructInStructSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithStructInStructSuccessfully)
{
auto val = this->m_proxy->getStructInStruct();
ASSERT_THAT(val.template get<0>(), Eq(STRING_VALUE));
ASSERT_THAT(std::get<0>(std::get<1>(val))[INT32_VALUE], Eq(INT32_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithTwoStructsSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithTwoStructsSuccessfully)
{
auto val = this->m_proxy->sumStructItems({1, 2}, {3, 4});
ASSERT_THAT(val, Eq(1 + 2 + 3 + 4));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithTwoVectorsSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithTwoVectorsSuccessfully)
{
auto val = this->m_proxy->sumArrayItems({1, 7}, {2, 3, 4});
ASSERT_THAT(val, Eq(1 + 7 + 2 + 3 + 4));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithSignatureSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithSignatureSuccessfully)
{
auto resSignature = this->m_proxy->getSignature();
ASSERT_THAT(resSignature, Eq(SIGNATURE_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithObjectPathSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithObjectPathSuccessfully)
{
auto resObjectPath = this->m_proxy->getObjPath();
ASSERT_THAT(resObjectPath, Eq(OBJECT_PATH_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithUnixFdSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithUnixFdSuccessfully)
{
auto resUnixFd = this->m_proxy->getUnixFd();
ASSERT_THAT(resUnixFd.get(), Gt(UNIX_FD_VALUE));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithComplexTypeSuccesfully)
TYPED_TEST(SdbusTestObject, CallsMethodWithComplexTypeSuccessfully)
{
auto resComplex = this->m_proxy->getComplex();
ASSERT_THAT(resComplex.count(0), Eq(1));
Expand Down Expand Up @@ -265,6 +265,30 @@ TYPED_TEST(SdbusTestObject, CanAccessAssociatedMethodCallMessageInAsyncMethodCal
ASSERT_THAT(this->m_adaptor->m_methodName, Eq("doOperationAsync"));
}

TYPED_TEST(SdbusTestObject, CallsMethodWithLargeArgument)
{
std::map<int, std::string> collection;
//std::size_t totalSize{};
for (int i = 0; i < 400'000; i++)
{
collection[i] = ("This is a string of fifty characters. This is a string of fifty " + std::to_string(i));
//totalSize += sizeof(int) + collection[i].size();
}
//printf("Sending large message with collection of size %zu bytes\n", totalSize);
this->m_proxy->sendLargeMessage(collection);
}

TYPED_TEST(SdbusTestObject, CanSendCallsAndReceiveRepliesWithLargeData)
{
std::map<int32_t, std::string> largeMap;
for (int32_t i = 0; i < 40'000; ++i)
largeMap.emplace(i, "This is string nr. " + std::to_string(i+1));

auto returnedMap = this->m_proxy->doOperationWithLargeData(largeMap);

ASSERT_THAT(returnedMap, Eq(largeMap));
}

#if LIBSYSTEMD_VERSION>=240
TYPED_TEST(SdbusTestObject, CanSetGeneralMethodTimeoutWithLibsystemdVersionGreaterThan239)
{
Expand Down
4 changes: 2 additions & 2 deletions tests/integrationtests/DBusPropertiesTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ using namespace sdbus::test;
/* -- TEST CASES -- */
/*-------------------------------------*/

TYPED_TEST(SdbusTestObject, ReadsReadOnlyPropertySuccesfully)
TYPED_TEST(SdbusTestObject, ReadsReadOnlyPropertySuccessfully)
{
ASSERT_THAT(this->m_proxy->state(), Eq(DEFAULT_STATE_VALUE));
}
Expand All @@ -65,7 +65,7 @@ TYPED_TEST(SdbusTestObject, FailsWritingToReadOnlyProperty)
ASSERT_THROW(this->m_proxy->setStateProperty("new_value"), sdbus::Error);
}

TYPED_TEST(SdbusTestObject, WritesAndReadsReadWritePropertySuccesfully)
TYPED_TEST(SdbusTestObject, WritesAndReadsReadWritePropertySuccessfully)
{
uint32_t newActionValue = 5678;

Expand Down
Loading

0 comments on commit c559070

Please sign in to comment.