Skip to content

Commit

Permalink
Submit events in batches for IOUringQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Oipo committed Feb 24, 2024
1 parent 1e58222 commit 528d957
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions include/ichor/CommunicationChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ichor/DependencyManager.h>
#include <ichor/stl/RealtimeReadWriteMutex.h>
#include <shared_mutex>
#include <mutex>

#ifdef DEBUG_CHANNEL
#include <iostream>
Expand Down
1 change: 0 additions & 1 deletion include/ichor/DependencyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <memory>
#include <chrono>
#include <atomic>
#include <mutex>
#include <span>
#include <ichor/interfaces/IFrameworkLogger.h>
#include <ichor/dependency_management/AdvancedService.h>
Expand Down
3 changes: 2 additions & 1 deletion include/ichor/event_queues/IOUringQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Ichor {
[[nodiscard]] bool is_running() const noexcept final;

io_uring* createEventLoop(unsigned entriesCount = 2048);
void useEventLoop(io_uring *loop);
void useEventLoop(io_uring *loop, unsigned entriesCount);

void start(bool captureSigInt) final;
[[nodiscard]] bool shouldQuit() final;
Expand All @@ -38,6 +38,7 @@ namespace Ichor {
std::atomic<bool> _initializedQueue{false};
std::thread::id _threadId{};
uint64_t _quitTimeoutMs;
int _entriesCount;
long long _pollTimeoutNs;
std::chrono::steady_clock::time_point _whenQuitEventWasSent{};
std::atomic<bool> _quitEventSent{false};
Expand Down
33 changes: 29 additions & 4 deletions src/ichor/event_queues/IOUringQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,13 @@ namespace Ichor {
io_uring_sqe_set_data(sqe, procEvent);
io_uring_prep_nop(sqe);

auto ret = io_uring_submit(_eventQueuePtr);
if(ret != 1) [[unlikely]] {
auto space = io_uring_sq_space_left(_eventQueuePtr);
if(space == 0) {
auto ret = io_uring_submit(_eventQueuePtr);
if (ret != _entriesCount) [[unlikely]] {
// spdlog::info("io_uring_submit {}", ret);
throw std::runtime_error("submit wrong amount");
throw std::runtime_error("submit wrong amount");
}
}
}
}
Expand Down Expand Up @@ -164,6 +167,7 @@ namespace Ichor {
io_uring* IOUringQueue::createEventLoop(unsigned entriesCount) {
_eventQueue = std::make_unique<io_uring>();
_eventQueuePtr = _eventQueue.get();
_entriesCount = entriesCount;
io_uring_params p{};
p.flags = IORING_SETUP_DEFER_TASKRUN | IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER;
p.sq_thread_idle = 50;
Expand All @@ -183,8 +187,9 @@ namespace Ichor {
return _eventQueuePtr;
}

void IOUringQueue::useEventLoop(io_uring *event) {
void IOUringQueue::useEventLoop(io_uring *event, unsigned entriesCount) {
_eventQueuePtr = event;
_entriesCount = entriesCount;
_initializedQueue.store(true, std::memory_order_release);
}

Expand All @@ -206,6 +211,15 @@ namespace Ichor {

startDm();

{
auto space = io_uring_sq_space_left(_eventQueuePtr);
auto ret = io_uring_submit(_eventQueuePtr);
if (ret != _entriesCount - space) [[unlikely]] {
// spdlog::info("io_uring_submit {}", ret);
throw std::runtime_error("submit wrong amount");
}
}

while(!shouldQuit()) [[likely]] {
io_uring_cqe *cqe{};
__kernel_timespec ts{};
Expand All @@ -225,6 +239,17 @@ namespace Ichor {
//spdlog::info("processing {}", evt->get_name());
std::unique_ptr<Event> uniqueEvt{evt};
processEvent(uniqueEvt);

{
auto space = io_uring_sq_space_left(_eventQueuePtr);
if(space < _entriesCount) {
ret = io_uring_submit(_eventQueuePtr);
if (ret != _entriesCount - space) [[unlikely]] {
// spdlog::info("io_uring_submit {}", ret);
throw std::runtime_error("submit wrong amount");
}
}
}
}
}

Expand Down

0 comments on commit 528d957

Please sign in to comment.