diff --git a/include/ichor/services/network/tcp/TcpConnectionService.h b/include/ichor/services/network/tcp/TcpConnectionService.h index 9cdc4804..fe25bf1e 100644 --- a/include/ichor/services/network/tcp/TcpConnectionService.h +++ b/include/ichor/services/network/tcp/TcpConnectionService.h @@ -7,6 +7,18 @@ #include <ichor/services/timer/ITimerFactory.h> namespace Ichor { + + /** + * Service for managing a TCP connection + * + * Properties: + * - "Address" std::string - What address to connect to (required if Socket is not present) + * - "Port" uint16_t - What port to connect to (required if Socket is not present) + * - "Socket" int - An existing socket to manage (required if Address/Port are not present) + * - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY) + * - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000) + * - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000) + */ class TcpConnectionService final : public IConnectionService, public AdvancedService<TcpConnectionService> { public: TcpConnectionService(DependencyRegister ®, Properties props); @@ -26,15 +38,22 @@ namespace Ichor { void addDependencyInstance(ITimerFactory &logger, IService &isvc); void removeDependencyInstance(ITimerFactory &logger, IService &isvc); + void recvHandler(); + friend DependencyRegister; + static uint64_t tcpConnId; int _socket; - int _attempts; + uint64_t _id; + uint64_t _attempts; uint64_t _priority; - uint64_t _msgIdCounter; + uint64_t _msgIdCounter{}; + int64_t _sendTimeout{250'000}; + int64_t _recvTimeout{250'000}; bool _quit; ILogger *_logger{}; ITimerFactory *_timerFactory{}; + ITimer *_timer{}; }; } diff --git a/include/ichor/services/network/tcp/TcpHostService.h b/include/ichor/services/network/tcp/TcpHostService.h index 85b630b3..675ab4f1 100644 --- a/include/ichor/services/network/tcp/TcpHostService.h +++ b/include/ichor/services/network/tcp/TcpHostService.h @@ -24,6 +24,16 @@ namespace Ichor { static constexpr std::string_view NAME = Ichor::typeName<NewSocketEvent>(); }; + /** + * Service for creating a TCP host + * + * Properties: + * - "Address" std::string - What address to bind to (default INADDR_ANY) + * - "Port" uint16_t - What port to bind to (required) + * - "Priority" uint64_t - Which priority to use for inserted events (default INTERNAL_EVENT_PRIORITY) + * - "TimeoutSendUs" int64_t - Timeout in microseconds for send calls (default 250'000) + * - "TimeoutRecvUs" int64_t - Timeout in microseconds for recv calls (default 250'000) + */ class TcpHostService final : public IHostService, public AdvancedService<TcpHostService> { public: TcpHostService(DependencyRegister ®, Properties props); @@ -43,6 +53,7 @@ namespace Ichor { void removeDependencyInstance(ITimerFactory &logger, IService &isvc); AsyncGenerator<IchorBehaviour> handleEvent(NewSocketEvent const &evt); + void acceptHandler(); friend DependencyRegister; friend DependencyManager; @@ -50,9 +61,12 @@ namespace Ichor { int _socket; int _bindFd; uint64_t _priority; + int64_t _sendTimeout{250'000}; + int64_t _recvTimeout{250'000}; bool _quit; ILogger *_logger{}; ITimerFactory *_timerFactory{}; + ITimer *_timer{}; std::vector<NeverNull<TcpConnectionService*>> _connections; EventHandlerRegistration _newSocketEventHandlerRegistration{}; }; diff --git a/include/ichor/services/timer/ITimer.h b/include/ichor/services/timer/ITimer.h index cc272868..985ade7e 100644 --- a/include/ichor/services/timer/ITimer.h +++ b/include/ichor/services/timer/ITimer.h @@ -20,6 +20,11 @@ namespace Ichor { /// Thread-safe. [[nodiscard]] virtual uint64_t getPriority() const noexcept = 0; /// Thread-safe. + /// \param fireOnce + virtual void setFireOnce(bool fireOnce) noexcept = 0; + /// Thread-safe. + [[nodiscard]] virtual bool getFireOnce() const noexcept = 0; + /// Thread-safe. [[nodiscard]] virtual uint64_t getTimerId() const noexcept = 0; diff --git a/include/ichor/services/timer/Timer.h b/include/ichor/services/timer/Timer.h index 6e01e434..34fb9740 100644 --- a/include/ichor/services/timer/Timer.h +++ b/include/ichor/services/timer/Timer.h @@ -37,20 +37,25 @@ namespace Ichor { /// Thread-safe. [[nodiscard]] uint64_t getPriority() const noexcept final; /// Thread-safe. + void setFireOnce(bool fireOnce) noexcept final; + /// Thread-safe. + [[nodiscard]] bool getFireOnce() const noexcept final; + /// Thread-safe. [[nodiscard]] uint64_t getTimerId() const noexcept final; private: /// /// \param timerId unique identifier for timer /// \param svcId unique identifier for svc using this timer - Timer(IEventQueue *queue, uint64_t timerId, uint64_t svcId) noexcept; + Timer(NeverNull<IEventQueue*> queue, uint64_t timerId, uint64_t svcId) noexcept; void insertEventLoop(bool fireImmediately); friend class TimerFactory; - IEventQueue *_queue{}; + NeverNull<IEventQueue*> _queue; uint64_t _timerId{}; + std::atomic<bool> _fireOnce{}; std::atomic<uint64_t> _intervalNanosec{1'000'000'000}; std::unique_ptr<std::thread> _eventInsertionThread{}; std::function<AsyncGenerator<IchorBehaviour>()> _fnAsync{}; diff --git a/src/services/network/tcp/TcpConnectionService.cpp b/src/services/network/tcp/TcpConnectionService.cpp index d28ca725..7c87932a 100644 --- a/src/services/network/tcp/TcpConnectionService.cpp +++ b/src/services/network/tcp/TcpConnectionService.cpp @@ -3,13 +3,19 @@ #include <ichor/DependencyManager.h> #include <ichor/services/network/tcp/TcpConnectionService.h> #include <ichor/services/network/NetworkEvents.h> +#include <ichor/events/RunFunctionEvent.h> +#include <ichor/ScopeGuard.h> #include <arpa/inet.h> #include <sys/socket.h> #include <netinet/tcp.h> #include <unistd.h> #include <fcntl.h> +#include <poll.h> +#include <thread> -Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { +uint64_t Ichor::TcpConnectionService::tcpConnId{}; + +Ichor::TcpConnectionService::TcpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)), _socket(-1), _id(tcpConnId++), _attempts(), _priority(INTERNAL_EVENT_PRIORITY), _quit() { reg.registerDependency<ILogger>(this, DependencyFlags::REQUIRED); reg.registerDependency<ITimerFactory>(this, DependencyFlags::REQUIRED); } @@ -18,22 +24,40 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService:: if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { _priority = Ichor::any_cast<uint64_t>(propIt->second); } + if(auto propIt = getProperties().find("TimeoutSendUs"); propIt != getProperties().end()) { + _sendTimeout = Ichor::any_cast<int64_t>(propIt->second); + } + if(auto propIt = getProperties().find("TimeoutRecvUs"); propIt != getProperties().end()) { + _recvTimeout = Ichor::any_cast<int64_t>(propIt->second); + } if(getProperties().contains("Socket")) { if(auto propIt = getProperties().find("Socket"); propIt != getProperties().end()) { _socket = Ichor::any_cast<int>(propIt->second); } - ICHOR_LOG_TRACE(_logger, "Starting TCP connection for existing socket"); + + int setting = 1; + ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting)); + + timeval timeout{}; + timeout.tv_usec = _recvTimeout; + setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + timeout.tv_usec = _sendTimeout; + setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); + + auto flags = ::fcntl(_socket, F_GETFL, 0); + ::fcntl(_socket, F_SETFL, flags | O_NONBLOCK); + ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for existing socket", _id); } else { auto addrIt = getProperties().find("Address"); auto portIt = getProperties().find("Port"); if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "Missing address"); + ICHOR_LOG_ERROR(_logger, "[{}] Missing address", _id); co_return tl::unexpected(StartError::FAILED); } if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR(_logger, "Missing port"); + ICHOR_LOG_ERROR(_logger, "[{}] Missing port", _id); co_return tl::unexpected(StartError::FAILED); } @@ -47,6 +71,13 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService:: int setting = 1; ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting)); + + timeval timeout{}; + timeout.tv_usec = _recvTimeout; + setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + timeout.tv_usec = _sendTimeout; + setsockopt(_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); + auto flags = ::fcntl(_socket, F_GETFL, 0); ::fcntl(_socket, F_SETFL, flags | O_NONBLOCK); @@ -60,38 +91,70 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpConnectionService:: throw std::runtime_error("inet_pton invalid address for given address family (has to be ipv4-valid address)"); } - while(connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0) - { - ICHOR_LOG_ERROR(_logger, "connect error {}", errno); - if(_attempts++ >= 5) { + bool connected{}; + while(!connected && connect(_socket, (struct sockaddr *)&address, sizeof(address)) < 0) { + ICHOR_LOG_ERROR(_logger, "[{}] connect error {}", _id, errno); + if(errno == EINPROGRESS) { + while(_attempts++ >= 5) { + pollfd pfd{}; + pfd.fd = _socket; + pfd.events = POLLOUT; + ret = poll(&pfd, 1, static_cast<int>(_sendTimeout)); + + if(ret < 0) { + ICHOR_LOG_ERROR(_logger, "[{}] poll error {}", _id, errno); + continue; + } + + // timeout + if(ret == 0) { + continue; + } + + if(pfd.revents & POLLERR) { + ICHOR_LOG_ERROR(_logger, "[{}] POLLERR {} {} {}", _id, pfd.revents); + } else if(pfd.revents & POLLHUP) { + ICHOR_LOG_ERROR(_logger, "[{}] POLLHUP {} {} {}", _id, pfd.revents); + } else if(pfd.revents & POLLOUT) { + int connect_result{}; + socklen_t result_len = sizeof(connect_result); + ret = getsockopt(_socket, SOL_SOCKET, SO_ERROR, &connect_result, &result_len); + + if(ret < 0) { + throw std::runtime_error("getsocketopt error: Couldn't connect"); + } + + // connect failed, retry + if(connect_result < 0) { + break; + } + connected = true; + break; + } + } + } else if(errno == EALREADY) { + std::this_thread::sleep_for(std::chrono::microseconds(_sendTimeout)); + } else { + _attempts++; + } + + // we don't want to increment attempts in the EINPROGRESS case, but we do want to check it here + if(_attempts >= 5) { throw std::runtime_error("Couldn't connect"); } } auto *ip = ::inet_ntoa(address.sin_addr); - ICHOR_LOG_TRACE(_logger, "Starting TCP connection for {}:{}", ip, ::ntohs(address.sin_port)); + ICHOR_LOG_TRACE(_logger, "[{}] Starting TCP connection for {}:{}", _id, ip, ::ntohs(address.sin_port)); } - auto &timer = _timerFactory->createTimer(); - timer.setChronoInterval(20ms); - timer.setCallback([this]() { - std::array<char, 1024> buf{}; - auto ret = recv(_socket, buf.data(), buf.size(), 0); - - if (ret == 0) { - return; - } - - if(ret < 0) { - ICHOR_LOG_ERROR(_logger, "Error receiving from socket: {}", errno); - GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Error receiving from socket. errno = " + std::to_string(errno)); - return; - } - - GetThreadLocalEventQueue().pushPrioritisedEvent<NetworkDataEvent>(getServiceId(), _priority, std::vector<uint8_t>{buf.data(), buf.data() + ret}); - return; + _timer = &_timerFactory->createTimer(); + _timer->setFireOnce(true); + _timer->setChronoInterval(20ms); + _timer->setCallback([this]() { + recvHandler(); }); - timer.startTimer(); + _timer->startTimer(true); co_return {}; } @@ -111,15 +174,15 @@ void Ichor::TcpConnectionService::addDependencyInstance(ILogger &logger, IServic _logger = &logger; } -void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &logger, IService&) { +void Ichor::TcpConnectionService::removeDependencyInstance(ILogger &, IService&) { _logger = nullptr; } -void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &factory, IService &) { - _timerFactory = &factory; +void Ichor::TcpConnectionService::addDependencyInstance(ITimerFactory &timerFactory, IService &) { + _timerFactory = &timerFactory; } -void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &factory, IService&) { +void Ichor::TcpConnectionService::removeDependencyInstance(ITimerFactory &, IService&) { _timerFactory = nullptr; } @@ -127,9 +190,19 @@ tl::expected<uint64_t, Ichor::SendErrorReason> Ichor::TcpConnectionService::send auto id = ++_msgIdCounter; size_t sent_bytes = 0; + if(_quit) { + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no send", _id); + return tl::unexpected(SendErrorReason::QUITTING); + } + while(sent_bytes < msg.size()) { auto ret = ::send(_socket, msg.data() + sent_bytes, msg.size() - sent_bytes, 0); + if(_quit) { + ICHOR_LOG_TRACE(_logger, "[{}] quitting mid-send", _id); + return tl::unexpected(SendErrorReason::QUITTING); + } + if(ret < 0) { GetThreadLocalEventQueue().pushEvent<FailedSendMessageEvent>(getServiceId(), std::move(msg), id); break; @@ -149,4 +222,33 @@ uint64_t Ichor::TcpConnectionService::getPriority() { return _priority; } +void Ichor::TcpConnectionService::recvHandler() { + ScopeGuard sg{[this]() { + if(!_quit) { + _timer->startTimer(); + } else { + ICHOR_LOG_TRACE(_logger, "[{}] quitting, no push", _id); + } + }}; + std::array<char, 1024> buf{}; + auto ret = recv(_socket, buf.data(), buf.size(), 0); + + if (ret == 0) { + return; + } + + if (_quit) { + ICHOR_LOG_TRACE(_logger, "[{}] quitting", _id); + return; + } + + if(ret < 0) { + ICHOR_LOG_ERROR(_logger, "[{}] Error receiving from socket: {}", _id, errno); + GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Error receiving from socket. errno = " + std::to_string(errno)); + return; + } + + GetThreadLocalEventQueue().pushPrioritisedEvent<NetworkDataEvent>(getServiceId(), _priority, std::vector<uint8_t>{buf.data(), buf.data() + ret}); +} + #endif diff --git a/src/services/network/tcp/TcpHostService.cpp b/src/services/network/tcp/TcpHostService.cpp index f92bc89b..3c986285 100644 --- a/src/services/network/tcp/TcpHostService.cpp +++ b/src/services/network/tcp/TcpHostService.cpp @@ -4,6 +4,8 @@ #include <ichor/services/network/IConnectionService.h> #include <ichor/services/network/tcp/TcpHostService.h> #include <ichor/services/network/tcp/TcpConnectionService.h> +#include <ichor/events/RunFunctionEvent.h> +#include <ichor/ScopeGuard.h> #include <arpa/inet.h> #include <sys/socket.h> #include <netinet/tcp.h> @@ -70,30 +72,13 @@ Ichor::Task<tl::expected<void, Ichor::StartError>> Ichor::TcpHostService::start( throw std::runtime_error("Couldn't listen on socket: errno = " + std::to_string(errno)); } - auto &timer = _timerFactory->createTimer(); - timer.setChronoInterval(20ms); - timer.setCallback([this]() { - sockaddr_in client_addr{}; - socklen_t client_addr_size = sizeof(client_addr); - int newConnection = ::accept(_socket, (sockaddr *) &client_addr, &client_addr_size); - - if (newConnection == -1) { - ICHOR_LOG_ERROR(_logger, "New connection but accept() returned {} errno {}", newConnection, errno); - if(errno == EINVAL) { - GetThreadLocalEventQueue().pushEvent<UnrecoverableErrorEvent>(getServiceId(), 4u, "Accept() generated error. errno = " + std::to_string(errno)); - return; - } - GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Accept() generated error. errno = " + std::to_string(errno)); - return; - } - - auto *ip = ::inet_ntoa(client_addr.sin_addr); - ICHOR_LOG_TRACE(_logger, "new connection from {}:{}", ip, ::ntohs(client_addr.sin_port)); - - GetThreadLocalEventQueue().pushPrioritisedEvent<NewSocketEvent>(getServiceId(), _priority, newConnection); - return; + _timer = &_timerFactory->createTimer(); + _timer->setFireOnce(true); + _timer->setChronoInterval(20ms); + _timer->setCallback([this]() { + acceptHandler(); }); - timer.startTimer(); + _timer->startTimer(true); co_return {}; } @@ -115,15 +100,15 @@ void Ichor::TcpHostService::addDependencyInstance(ILogger &logger, IService &) { _logger = &logger; } -void Ichor::TcpHostService::removeDependencyInstance(ILogger &logger, IService&) { +void Ichor::TcpHostService::removeDependencyInstance(ILogger &, IService&) { _logger = nullptr; } -void Ichor::TcpHostService::addDependencyInstance(ITimerFactory &factory, IService &) { - _timerFactory = &factory; +void Ichor::TcpHostService::addDependencyInstance(ITimerFactory &timerFactory, IService &) { + _timerFactory = &timerFactory; } -void Ichor::TcpHostService::removeDependencyInstance(ITimerFactory &factory, IService&) { +void Ichor::TcpHostService::removeDependencyInstance(ITimerFactory &, IService&) { _timerFactory = nullptr; } @@ -139,9 +124,44 @@ Ichor::AsyncGenerator<Ichor::IchorBehaviour> Ichor::TcpHostService::handleEvent( Properties props{}; props.emplace("Priority", Ichor::make_any<uint64_t>(_priority)); props.emplace("Socket", Ichor::make_any<int>(evt.socket)); + props.emplace("TimeoutSendUs", Ichor::make_any<int64_t>(_sendTimeout)); + props.emplace("TimeoutRecvUs", Ichor::make_any<int64_t>(_recvTimeout)); _connections.emplace_back(GetThreadLocalManager().template createServiceManager<TcpConnectionService, IConnectionService>(std::move(props))); co_return {}; } +void Ichor::TcpHostService::acceptHandler() { + sockaddr_in client_addr{}; + socklen_t client_addr_size = sizeof(client_addr); + ScopeGuard sg{[this]() { + if(!_quit) { + _timer->startTimer(); + } else { + ICHOR_LOG_TRACE(_logger, "quitting, no push"); + } + }}; + int newConnection = ::accept(_socket, (sockaddr *) &client_addr, &client_addr_size); + + if(_quit) { + ICHOR_LOG_TRACE(_logger, "quitting"); + return; + } + + if (newConnection == -1) { + ICHOR_LOG_ERROR(_logger, "New connection but accept() returned {} errno {}", newConnection, errno); + if(errno == EINVAL) { + GetThreadLocalEventQueue().pushEvent<UnrecoverableErrorEvent>(getServiceId(), 4u, "Accept() generated error. errno = " + std::to_string(errno)); + return; + } + GetThreadLocalEventQueue().pushEvent<RecoverableErrorEvent>(getServiceId(), 4u, "Accept() generated error. errno = " + std::to_string(errno)); + return; + } + + auto *ip = ::inet_ntoa(client_addr.sin_addr); + ICHOR_LOG_TRACE(_logger, "new connection from {}:{}", ip, ::ntohs(client_addr.sin_port)); + + GetThreadLocalEventQueue().pushPrioritisedEvent<NewSocketEvent>(getServiceId(), _priority, newConnection); +} + #endif diff --git a/src/services/timer/Timer.cpp b/src/services/timer/Timer.cpp index e55736d4..203083ab 100644 --- a/src/services/timer/Timer.cpp +++ b/src/services/timer/Timer.cpp @@ -8,12 +8,15 @@ using namespace std::chrono_literals; -Ichor::Timer::Timer(Ichor::IEventQueue *queue, uint64_t timerId, uint64_t svcId) noexcept : _queue(queue), _timerId(timerId), _requestingServiceId(svcId) { +Ichor::Timer::Timer(NeverNull<IEventQueue*> queue, uint64_t timerId, uint64_t svcId) noexcept : _queue(queue), _timerId(timerId), _requestingServiceId(svcId) { stopTimer(); } Ichor::Timer::~Timer() noexcept { stopTimer(); + if(_eventInsertionThread && _eventInsertionThread->joinable()) { + _eventInsertionThread->join(); + } } void Ichor::Timer::startTimer() { @@ -27,6 +30,9 @@ void Ichor::Timer::startTimer(bool fireImmediately) { bool expected = true; if(_quit.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) { + if(_eventInsertionThread && _eventInsertionThread->joinable()) { + _eventInsertionThread->join(); + } _eventInsertionThread = std::make_unique<std::thread>([this, fireImmediately]() { this->insertEventLoop(fireImmediately); }); #if defined(__linux__) || defined(__CYGWIN__) pthread_setname_np(_eventInsertionThread->native_handle(), fmt::format("Tmr#{}", _timerId).c_str()); @@ -37,7 +43,9 @@ void Ichor::Timer::startTimer(bool fireImmediately) { void Ichor::Timer::stopTimer() { bool expected = false; if(_quit.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { - _eventInsertionThread->join(); + if(_eventInsertionThread->joinable()) { + _eventInsertionThread->join(); + } _eventInsertionThread = nullptr; } } @@ -77,6 +85,14 @@ uint64_t Ichor::Timer::getPriority() const noexcept { return _priority.load(std::memory_order_acquire); } +void Ichor::Timer::setFireOnce(bool fireOnce) noexcept { + _fireOnce.store(fireOnce, std::memory_order_release); +} + +bool Ichor::Timer::getFireOnce() const noexcept { + return _fireOnce.load(std::memory_order_acquire); +} + uint64_t Ichor::Timer::getTimerId() const noexcept { return _timerId; } @@ -89,11 +105,6 @@ void Ichor::Timer::insertEventLoop(bool fireImmediately) { SetThreadDescription(GetCurrentThread(), fmt::format(L"Tmr#{}", _timerId).c_str()); #endif - - while(!_quit.load(std::memory_order_acquire) && _queue == nullptr) { - std::this_thread::sleep_for(1ms); - } - auto now = std::chrono::steady_clock::now(); auto next = now; if(!fireImmediately) { @@ -115,6 +126,12 @@ void Ichor::Timer::insertEventLoop(bool fireImmediately) { _queue->pushPrioritisedEvent<RunFunctionEvent>(_requestingServiceId, getPriority(), _fn); } + if(_fireOnce.load(std::memory_order_acquire)) { + bool expected = false; + _quit.compare_exchange_strong(expected, true, std::memory_order_acq_rel); + break; + } + next += std::chrono::nanoseconds(_intervalNanosec.load(std::memory_order_acquire)); } }