Skip to content

Commit

Permalink
Refactor networking and add TcpTests
Browse files Browse the repository at this point in the history
* Add sending multiple messages in one go
* io_uring networking improvements
* Removal of networking events
  • Loading branch information
Oipo committed Sep 20, 2024
1 parent 61f4b70 commit f9ca8ce
Show file tree
Hide file tree
Showing 27 changed files with 860 additions and 241 deletions.
1 change: 0 additions & 1 deletion examples/http_example/UsingHttpService.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <ichor/services/logging/Logger.h>
#include <ichor/services/timer/ITimerFactory.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/http/IHttpConnectionService.h>
#include <ichor/services/network/http/IHttpHostService.h>
#include <ichor/events/RunFunctionEvent.h>
Expand Down
1 change: 0 additions & 1 deletion examples/http_ping_pong/PingService.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <ichor/DependencyManager.h>
#include <ichor/services/logging/Logger.h>
#include <ichor/services/timer/ITimerFactory.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/http/IHttpConnectionService.h>
#include <ichor/services/network/http/IHttpHostService.h>
#include <ichor/services/timer/ITimerFactory.h>
Expand Down
1 change: 0 additions & 1 deletion examples/http_ping_pong/PongService.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <ichor/services/logging/Logger.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/http/IHttpConnectionService.h>
#include <ichor/services/network/http/IHttpHostService.h>
#include <ichor/services/serialization/ISerializer.h>
Expand Down
3 changes: 1 addition & 2 deletions examples/tcp_example/UsingTcpService.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <ichor/DependencyManager.h>
#include <ichor/services/logging/Logger.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/IConnectionService.h>
#include <ichor/dependency_management/AdvancedService.h>
#include <ichor/dependency_management/DependencyRegister.h>
Expand Down Expand Up @@ -50,7 +49,7 @@ class UsingTcpService final : public AdvancedService<UsingTcpService> {
ICHOR_LOG_INFO(_logger, "Removed serializer");
}

void addDependencyInstance(IConnectionService &connectionService, IService&) {
void addDependencyInstance(IConnectionService &connectionService, IService &svc) {
if(connectionService.isClient()) {
_clientService = &connectionService;
ICHOR_LOG_INFO(_logger, "Inserted clientService");
Expand Down
52 changes: 26 additions & 26 deletions examples/websocket_example/UsingWsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <ichor/DependencyManager.h>
#include <ichor/services/logging/Logger.h>
#include <ichor/services/timer/ITimerFactory.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/services/network/IConnectionService.h>
#include <ichor/services/network/IHostService.h>
#include <ichor/dependency_management/AdvancedService.h>
Expand Down Expand Up @@ -52,34 +51,35 @@ class UsingWsService final : public AdvancedService<UsingWsService> {
}

void addDependencyInstance(IConnectionService &connectionService, IService&) {
if(connectionService.isClient()) {
_clientService = &connectionService;
ICHOR_LOG_INFO(_logger, "Inserted clientService");
} else {
_hostService = &connectionService;
ICHOR_LOG_INFO(_logger, "Inserted _hostService");
_hostService->setReceiveHandler([this](std::span<uint8_t const> data) {
auto msg = _serializer->deserialize(data);
if (msg) {
if(connectionService.isClient()) {
_clientService = &connectionService;
ICHOR_LOG_INFO(_logger, "Inserted clientService");
} else {
_hostService = &connectionService;
ICHOR_LOG_INFO(_logger, "Inserted _hostService");
_hostService->setReceiveHandler([this](std::span<uint8_t const> data) {
auto msg = _serializer->deserialize(data);
if (msg) {
ICHOR_LOG_INFO(_logger, "Received TestMsg id {} val {}", msg->id, msg->val);
} else {
ICHOR_LOG_ERROR(_logger, "Couldn't deserialize message {}", std::string_view{reinterpret_cast<char const *>(data.data()), data.size()});
std::terminate();
}
} else {
ICHOR_LOG_ERROR(_logger, "Couldn't deserialize message {}", std::string_view{reinterpret_cast<char const *>(data.data()), data.size()});
std::terminate();
}
GetThreadLocalEventQueue().pushEvent<QuitEvent>(getServiceId());
});
}
});
}

if(_clientService != nullptr && _hostService != nullptr) {
GetThreadLocalEventQueue().pushEvent<RunFunctionEventAsync>(getServiceId(), [this]() -> AsyncGenerator<IchorBehaviour> {
auto ser = _serializer->serialize(TestMsg{11, "Hello World"});
auto ret = co_await _clientService->sendAsync(std::move(ser));
if(!ret) {
ICHOR_LOG_ERROR(_logger, "start() send error: {}", (int)ret.error());
}
co_return {};
});
}
if(_clientService != nullptr && _hostService != nullptr) {
ICHOR_LOG_ERROR(_logger, "addDependencyInstance push event");
GetThreadLocalEventQueue().pushEvent<RunFunctionEventAsync>(getServiceId(), [this]() -> AsyncGenerator<IchorBehaviour> {
auto ser = _serializer->serialize(TestMsg{11, "Hello World"});
auto ret = co_await _clientService->sendAsync(std::move(ser));
if(!ret) {
ICHOR_LOG_ERROR(_logger, "send error: {}", (int)ret.error());
}
co_return {};
});
}
}

void removeDependencyInstance(IConnectionService&, IService&) {
Expand Down
1 change: 1 addition & 0 deletions include/ichor/DependencyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ namespace Ichor {
void stop();
/// Called from the queue implementation
void addInternalServiceManager(std::unique_ptr<ILifecycleManager> svc);
void clearServiceRegistrations(std::vector<EventInterceptInfo> &allEventInterceptorsCopy, std::vector<EventInterceptInfo> &eventInterceptorsCopy, ServiceIdType svcId);
void removeInternalService(std::vector<EventInterceptInfo> &allEventInterceptorsCopy, std::vector<EventInterceptInfo> &eventInterceptorsCopy, ServiceIdType svcId);
/// Check if there is a coroutine for the given serviceId that is still waiting on something
/// \param serviceId
Expand Down
34 changes: 0 additions & 34 deletions include/ichor/events/InternalEvents.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,40 +208,6 @@ namespace Ichor {
static constexpr std::string_view NAME = typeName<RemoveTrackerEvent>();
};

struct UnrecoverableErrorEvent final : public Event {
UnrecoverableErrorEvent(uint64_t _id, uint64_t _originatingService, uint64_t _priority, uint64_t _errorType, std::string _error) noexcept : Event(_id, _originatingService, _priority), errorType(_errorType), error(std::move(_error)) {}
~UnrecoverableErrorEvent() final = default;

[[nodiscard]] std::string_view get_name() const noexcept final {
return NAME;
}
[[nodiscard]] NameHashType get_type() const noexcept final {
return TYPE;
}

uint64_t errorType;
std::string error;
static constexpr NameHashType TYPE = typeNameHash<UnrecoverableErrorEvent>();
static constexpr std::string_view NAME = typeName<UnrecoverableErrorEvent>();
};

struct RecoverableErrorEvent final : public Event {
RecoverableErrorEvent(uint64_t _id, uint64_t _originatingService, uint64_t _priority, uint64_t _errorType, std::string _error) noexcept : Event(_id, _originatingService, _priority), errorType(_errorType), error(std::move(_error)) {}
~RecoverableErrorEvent() final = default;

[[nodiscard]] std::string_view get_name() const noexcept final {
return NAME;
}
[[nodiscard]] NameHashType get_type() const noexcept final {
return TYPE;
}

uint64_t errorType;
std::string error;
static constexpr NameHashType TYPE = typeNameHash<RecoverableErrorEvent>();
static constexpr std::string_view NAME = typeName<RecoverableErrorEvent>();
};

struct ContinueCoroutineBroadcastEvent final : public Event {
ContinueCoroutineBroadcastEvent(uint64_t _id, uint64_t _originatingService, uint64_t _priority) noexcept : Event(_id, _originatingService, _priority) {}
~ContinueCoroutineBroadcastEvent() final = default;
Expand Down
10 changes: 8 additions & 2 deletions include/ichor/services/network/IConnectionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ namespace Ichor {
/**
* Awaitable send function.
* @param msg message to send
* @return id of message (deprecated)
* @return void on success, IOError otherwise
*/
virtual Ichor::Task<tl::expected<uint64_t, IOError>> sendAsync(std::vector<uint8_t>&& msg) = 0;
virtual Ichor::Task<tl::expected<void, IOError>> sendAsync(std::vector<uint8_t>&& msg) = 0;
/**
* Awaitable send function for multiple messages. Some implementations may use iovec to send all messages in one go.
* @param msgs messages to send
* @return void on success, IOError otherwise
*/
virtual Ichor::Task<tl::expected<void, IOError>> sendAsync(std::vector<std::vector<uint8_t>>&& msgs) = 0;
/**
* Send function with callback, in case many messages have to be sent without awaiting.
* @param msg message to send
Expand Down
61 changes: 0 additions & 61 deletions include/ichor/services/network/NetworkEvents.h

This file was deleted.

10 changes: 6 additions & 4 deletions include/ichor/services/network/boost/WsConnectionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

namespace Ichor {
namespace Detail {
struct WsConnectionOutboxMessage {
uint64_t msgId;
struct WsConnectionOutboxMessage final {
std::vector<uint8_t> msg;
AsyncManualResetEvent *evt;
bool *success;
};
}

Expand All @@ -30,7 +31,8 @@ namespace Ichor {
WsConnectionService(DependencyRegister &reg, Properties props);
~WsConnectionService() final = default;

Task<tl::expected<uint64_t, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<std::vector<uint8_t>>&& msgs) final;
void setPriority(uint64_t priority) final;
uint64_t getPriority() final;

Expand Down Expand Up @@ -59,7 +61,6 @@ namespace Ichor {
void read(net::yield_context &yield);

std::shared_ptr<websocket::stream<beast::tcp_stream>> _ws{};
uint64_t _msgIdCounter{};
std::atomic<uint64_t> _priority{};
std::atomic<bool> _connected{};
std::atomic<bool> _quit{};
Expand All @@ -69,6 +70,7 @@ namespace Ichor {
std::atomic<int64_t> _finishedListenAndRead{};
AsyncManualResetEvent _startStopEvent{};
boost::circular_buffer<Detail::WsConnectionOutboxMessage> _outbox{10};
RealtimeMutex _outboxMutex{};
IEventQueue *_queue;
std::vector<std::vector<uint8_t>> _queuedMessages{};
std::function<void(std::span<uint8_t const>)> _recvHandler;
Expand Down
7 changes: 0 additions & 7 deletions include/ichor/services/network/http/IHttpHostService.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,20 @@ namespace Ichor {
HttpRouteRegistration(HttpMethod method, RouteIdType id, IHttpHostService *service) : _method(method), _id(id), _service(service) {}
HttpRouteRegistration(const HttpRouteRegistration&) = delete;
HttpRouteRegistration(HttpRouteRegistration&& o) noexcept {
fmt::print("HttpRouteRegistration1\n");
reset();
fmt::print("HttpRouteRegistration2\n");
_method = o._method;
_id = o._id;
_service = o._service;
o._service = nullptr;
}

~HttpRouteRegistration() {
fmt::print("HttpRouteRegistration3\n");
reset();
fmt::print("HttpRouteRegistration4\n");
}

HttpRouteRegistration& operator=(const HttpRouteRegistration&) = delete;
HttpRouteRegistration& operator=(HttpRouteRegistration&& o) noexcept {
fmt::print("HttpRouteRegistration5\n");
reset();
fmt::print("HttpRouteRegistration6\n");
_method = o._method;
_id = o._id;
_service = o._service;
Expand All @@ -145,7 +139,6 @@ namespace Ichor {
}

void reset() {
fmt::print("HttpRouteRegistration reset {}\n", (void*)_service);
if(_service != nullptr) {
_service->removeRoute(_method, _id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace Ichor {
IOUringTcpConnectionService(DependencyRegister &reg, Properties props);
~IOUringTcpConnectionService() final = default;

Task<tl::expected<uint64_t, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<std::vector<uint8_t>>&& msgs) final;
void setPriority(uint64_t priority) final;
uint64_t getPriority() final;

Expand All @@ -58,14 +59,13 @@ namespace Ichor {
uint64_t _id;
int64_t _sendTimeout{250'000};
int64_t _recvTimeout{250'000};
uint32_t _bufferEntries{8};
uint32_t _bufferEntrySize{16'384};
uint32_t _bufferEntries{16};
uint32_t _bufferEntrySize{8192};
tl::optional<IOUringBuf> _buffer{};
bool _quit{};
IIOUringQueue *_q{};
ILogger *_logger{};
std::vector<uint8_t> _recvBuf{};
decltype(_recvBuf) _multipartRecvBuf{};
std::vector<decltype(_recvBuf)> _queuedMessages{};
std::function<void(std::span<uint8_t const>)> _recvHandler;
};
Expand Down
4 changes: 2 additions & 2 deletions include/ichor/services/network/tcp/TcpConnectionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ namespace Ichor {
TcpConnectionService(DependencyRegister &reg, Properties props);
~TcpConnectionService() final = default;

Task<tl::expected<uint64_t, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<uint8_t>&& msg) final;
Task<tl::expected<void, IOError>> sendAsync(std::vector<std::vector<uint8_t>>&& msgs) final;
void setPriority(uint64_t priority) final;
uint64_t getPriority() final;

Expand All @@ -51,7 +52,6 @@ namespace Ichor {
uint64_t _id;
uint64_t _attempts;
uint64_t _priority;
uint64_t _msgIdCounter{};
int64_t _sendTimeout{250'000};
int64_t _recvTimeout{250'000};
bool _quit;
Expand Down
10 changes: 9 additions & 1 deletion src/ichor/DependencyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,15 @@ void Ichor::DependencyManager::processEvent(std::unique_ptr<Event> &uniqueEvt) {
std::terminate();
}
if (!_scopedEvents.empty()) {
ICHOR_EMERGENCY_LOG1(_logger, "Bug in Ichor, please submit a bug report.");
ICHOR_EMERGENCY_LOG1(_logger, "Bug in Ichor, please submit a bug report and include at least the following, if any:");
for(auto const &[promiseId, scopedEvt] : _scopedEvents) {
auto svcIt = _services.find(scopedEvt->originatingService);
std::string_view svcName = "UNKNOWN";
if(svcIt != _services.end()) {
svcName = svcIt->second->implementationName();
}
ICHOR_EMERGENCY_LOG2(_logger, "promise {} service {}:{} evt {}", promiseId, scopedEvt->originatingService, svcName, scopedEvt->get_name());
}
std::terminate();
}
if (!_scopedGenerators.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/services/network/boost/AsioContextService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Ichor::AsioContextService::AsioContextService(DependencyRegister &reg, Propertie
}
}

reg.registerDependency<ILogger>(this, DependencyFlags::REQUIRED);
reg.registerDependency<ILogger>(this, DependencyFlags::NONE);
}

Ichor::AsioContextService::~AsioContextService() {
Expand Down
3 changes: 1 addition & 2 deletions src/services/network/boost/HttpConnectionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
#include <ichor/events/RunFunctionEvent.h>
#include <ichor/services/network/boost/HttpConnectionService.h>
#include <ichor/services/network/http/HttpScopeGuards.h>
#include <ichor/services/network/NetworkEvents.h>
#include <ichor/ScopeGuard.h>

Ichor::HttpConnectionService::HttpConnectionService(DependencyRegister &reg, Properties props) : AdvancedService(std::move(props)) {
reg.registerDependency<ILogger>(this, DependencyFlags::REQUIRED);
reg.registerDependency<ILogger>(this, DependencyFlags::NONE);
reg.registerDependency<IAsioContextService>(this, DependencyFlags::REQUIRED);
}

Expand Down
Loading

0 comments on commit f9ca8ce

Please sign in to comment.