diff --git a/NorthstarDLL/client/igig/kcpstats.cpp b/NorthstarDLL/client/igig/kcpstats.cpp index 2651a2d07..2e9c6bf94 100644 --- a/NorthstarDLL/client/igig/kcpstats.cpp +++ b/NorthstarDLL/client/igig/kcpstats.cpp @@ -222,7 +222,7 @@ static void draw_kcp_stats() } auto ng = NetGraphSink::instance(); - std::shared_lock lk(ng->windowsMutex); + std::unique_lock lk(ng->windowsMutex); if (ng->windows.empty()) { diff --git a/NorthstarDLL/engine/netgraph.cpp b/NorthstarDLL/engine/netgraph.cpp index 868d09545..3fdbed164 100644 --- a/NorthstarDLL/engine/netgraph.cpp +++ b/NorthstarDLL/engine/netgraph.cpp @@ -19,17 +19,17 @@ void sendThreadPayload(std::stop_token stoken, NetGraphSink* ng) while (!stoken.stop_requested()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::shared_lock lk(manager->routingTableMutex); + std::unique_lock lk1(manager->routingTableMutex); + std::unique_lock lk2(ng->windowsMutex); for (const auto& entry : manager->routingTable) { - if (itimediff(iclock(), entry.second.second) > lastSeenInterval) + if (itimediff(iclock(), std::get<2>(entry.second)) > lastSeenInterval) { continue; } NetBuffer buf(128, 0, 128); - std::shared_lock lk(ng->windowsMutex); std::get<0>(ng->windows[entry.first]).encode(buf); - entry.second.first.second->sendto(std::move(buf), entry.first, NetGraphSink::instance().get()); + std::get<1>(entry.second)->sendto(std::move(buf), entry.first, NetGraphSink::instance().get()); } } } @@ -49,7 +49,7 @@ int NetGraphSink::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* { NetStats s {}; s.decode(buf); - std::shared_lock lk(windowsMutex); + std::unique_lock lk(windowsMutex); std::get<3>(windows[ctx]).rotate(s); return 0; } @@ -67,33 +67,23 @@ std::shared_ptr NetGraphSink::instance() void NetStats::encode(NetBuffer& buf) const { - if (buf.size() < sizeof(float) + 5 * sizeof(IUINT64)) + if (buf.size() < sizeof(NetStats)) { - buf.resize(buf.size() + sizeof(float) + 5 * sizeof(IUINT64), 0); + buf.resize(buf.size() + sizeof(NetStats), 0); } - memcpy(buf.data(), &frameTime, sizeof(float)); - memcpy(buf.data() + sizeof(float), &outsegs, sizeof(IUINT64)); - memcpy(buf.data() + sizeof(float) + 1 * sizeof(IUINT64), &lostsegs, sizeof(IUINT64)); - memcpy(buf.data() + sizeof(float) + 2 * sizeof(IUINT64), &retranssegs, sizeof(IUINT64)); - memcpy(buf.data() + sizeof(float) + 3 * sizeof(IUINT64), &insegs, sizeof(IUINT64)); - memcpy(buf.data() + sizeof(float) + 4 * sizeof(IUINT64), &reconsegs, sizeof(IUINT64)); + memcpy(buf.data(), this, sizeof(NetStats)); } void NetStats::decode(const NetBuffer& buf) { - if (buf.size() < sizeof(float) + 5 * sizeof(IUINT64)) + if (buf.size() < sizeof(NetStats)) { NS::log::NEW_NET->warn("[NG] Spurious input of NetStats::decode"); return; } - memcpy(&frameTime, buf.data(), sizeof(float)); - memcpy(&outsegs, buf.data() + sizeof(float), sizeof(IUINT64)); - memcpy(&lostsegs, buf.data() + sizeof(float) + 1 * sizeof(IUINT64), sizeof(IUINT64)); - memcpy(&retranssegs, buf.data() + sizeof(float) + 2 * sizeof(IUINT64), sizeof(IUINT64)); - memcpy(&insegs, buf.data() + sizeof(float) + 3 * sizeof(IUINT64), sizeof(IUINT64)); - memcpy(&reconsegs, buf.data() + sizeof(float) + 4 * sizeof(IUINT64), sizeof(IUINT64)); + memcpy(this, buf.data(), sizeof(NetStats)); } void NetStats::sync(ikcpcb* cb) diff --git a/NorthstarDLL/engine/netgraph.h b/NorthstarDLL/engine/netgraph.h index 9778d9c49..b729ee95c 100644 --- a/NorthstarDLL/engine/netgraph.h +++ b/NorthstarDLL/engine/netgraph.h @@ -1,8 +1,6 @@ #pragma once #include "shared/kcpintegration.h" -#include -#include extern float* g_frameTime; @@ -68,8 +66,8 @@ class NetGraphSink : public NetSink static std::shared_ptr instance(); - std::shared_mutex windowsMutex; - concurrency::concurrent_unordered_map> windows; + std::mutex windowsMutex; + std::unordered_map> windows; private: NetGraphSink(); diff --git a/NorthstarDLL/shared/kcpintegration.cpp b/NorthstarDLL/shared/kcpintegration.cpp index 8bd678a5b..c6fef53f0 100644 --- a/NorthstarDLL/shared/kcpintegration.cpp +++ b/NorthstarDLL/shared/kcpintegration.cpp @@ -34,7 +34,7 @@ int(WSAAPI* orig_recvfrom)( _Out_writes_bytes_to_opt_(*fromlen, *fromlen) struct sockaddr FAR* from, _Inout_opt_ int FAR* fromlen) = nullptr; -int WSAAPI de_bind(_In_ SOCKET s, _In_reads_bytes_(namelen) const struct sockaddr FAR* name, _In_ int namelen) +static int WSAAPI de_bind(_In_ SOCKET s, _In_reads_bytes_(namelen) const struct sockaddr FAR* name, _In_ int namelen) { auto result = orig_bind(s, name, namelen); if (result != SOCKET_ERROR) @@ -52,14 +52,14 @@ int WSAAPI de_bind(_In_ SOCKET s, _In_reads_bytes_(namelen) const struct sockadd if (ntohs(bindAddr->sin6_port) == localPort) { - UdpSource::instance()->bindSocket(s); - NS::log::NEW_NET.get()->info("[UdpSource] Bind on localhost:{}@{}", localPort, s); + NetManager::instance()->localSocket = s; + NS::log::NEW_NET->info("[UdpSource] Bind on localhost:{}@{}", localPort, s); } } return result; } -int WSAAPI de_sendto( +static int WSAAPI de_sendto( _In_ SOCKET s, _In_reads_bytes_(len) const char FAR* buf, _In_ int len, @@ -67,16 +67,17 @@ int WSAAPI de_sendto( _In_reads_bytes_(tolen) const struct sockaddr FAR* to, _In_ int tolen) { - auto gsResult = GameSink::instance()->sendto(s, buf, len, flags, to, tolen); - if (gsResult != NET_HOOK_NOT_ALTERED) + if (NetManager::instance()->localSocket == s) { - return gsResult; + return GameSink::instance()->sendto(s, buf, len, flags, to, tolen); + } + else + { + return orig_sendto(s, buf, len, flags, to, tolen); } - auto result = orig_sendto(s, buf, len, flags, to, tolen); - return result; } -int WSAAPI de_recvfrom( +static int WSAAPI de_recvfrom( _In_ SOCKET s, _Out_writes_bytes_to_(len, return) __out_data_source(NETWORK) char FAR* buf, _In_ int len, @@ -84,13 +85,14 @@ int WSAAPI de_recvfrom( _Out_writes_bytes_to_opt_(*fromlen, *fromlen) struct sockaddr FAR* from, _Inout_opt_ int FAR* fromlen) { - auto gsResult = GameSink::instance()->recvfrom(s, buf, len, flags, from, fromlen); - if (gsResult != NET_HOOK_NOT_ALTERED) + if (NetManager::instance()->localSocket == s) { - return gsResult; + return GameSink::instance()->recvfrom(s, buf, len, flags, from, fromlen); + } + else + { + return orig_recvfrom(s, buf, len, flags, from, fromlen); } - auto result = orig_recvfrom(s, buf, len, flags, from, fromlen); - return result; } bool createAndEnableHook(LPCWSTR pszModule, LPCSTR pszProcName, LPVOID pDetour, LPVOID* ppOriginal) @@ -122,7 +124,7 @@ ON_DLL_LOAD_RELIESON("engine.dll", WSAHOOKS, ConVar, (CModule module)) fec_init(); - NS::log::NEW_NET.get()->info("[Hook] Initialize result: {}", enableWsaHooks()); + NS::log::NEW_NET->info("[Hook] Initialized: {}", enableWsaHooks()); Cvar_kcp_timer_resolution = new ConVar("kcp_timer_resolution", "5", FCVAR_NONE, "miliseconds between each thread wake, lower is better but consumes more CPU."); @@ -226,24 +228,24 @@ void recycleThreadPayload(std::stop_token stoken) { while (!stoken.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::unique_lock routingTableLock(NetManager::instance()->routingTableMutex); auto ng = NetGraphSink::instance(); std::unique_lock remoteStatsLock(ng->windowsMutex); std::vector removes; - auto current = iclock64(); + auto current = iclock(); for (const auto& entry : NetManager::instance()->routingTable) { - if (itimediff64(current, entry.second.second) > Cvar_kcp_conn_timeout->GetInt()) + if (itimediff64(current, std::get<2>(entry.second)) > Cvar_kcp_conn_timeout->GetInt()) { removes.push_back(entry.first); } } for (const auto& removal : removes) { - NS::log::NEW_NET.get()->info("[NetManager] Disconnecting with {}", removal); - NetManager::instance()->routingTable.unsafe_erase(removal); - ng->windows.unsafe_erase(removal); + NS::log::NEW_NET->info("[NetManager] Disconnecting with {}", removal); + NetManager::instance()->routingTable.erase(removal); + ng->windows.erase(removal); } } } @@ -265,9 +267,9 @@ NetManager* NetManager::instance() return singleton; } -std::pair, std::shared_ptr> connectionInitDefault(const SOCKET& s, const sockaddr_in6& addr) +static std::pair, std::shared_ptr> defaultConnectionInit(const NetContext& ctx) { - std::shared_ptr kcp = std::shared_ptr(new KcpLayer({s, addr})); + std::shared_ptr kcp = std::shared_ptr(new KcpLayer(ctx)); std::shared_ptr mux = std::shared_ptr(new MuxLayer()); mux->bindTop(0, std::static_pointer_cast(GameSink::instance())); @@ -284,45 +286,51 @@ std::pair, std::shared_ptr> connectionInitDe return std::make_pair(std::static_pointer_cast(fec), std::static_pointer_cast(mux)); } +static std::pair, std::shared_ptr> bypassConnectionInit(const NetContext& ctx) +{ + return std::make_pair( + std::static_pointer_cast(GameSink::instance()), std::static_pointer_cast(UdpSource::instance())); +} + std::pair, std::shared_ptr> NetManager::initAndBind(const NetContext& ctx) { - return initAndBind(ctx, connectionInitDefault); + return initAndBind(ctx, defaultConnectionInit); } std::pair, std::shared_ptr> NetManager::initAndBind( - const NetContext& ctx, - std::pair, std::shared_ptr> (*connectionInitFunc)(const SOCKET& s, const sockaddr_in6& addr)) + const NetContext& ctx, std::pair, std::shared_ptr> (*connectionInitFunc)(const NetContext& ctx)) { - std::shared_lock routingTableLock(routingTableMutex); - auto result = connectionInitFunc(ctx.socket, ctx.addr); - routingTable.insert(std::make_pair(ctx, std::make_pair(result, iclock64()))); + auto result = connectionInitFunc(ctx); + bind(ctx, result.first, result.second); + NS::log::NEW_NET->info("[NetManager] New connection with {}", ctx); return result; } void NetManager::bind(const NetContext& ctx, std::shared_ptr inboundDst, std::shared_ptr outboundDst) { - std::shared_lock routingTableLock(routingTableMutex); - routingTable.insert(std::make_pair(ctx, std::make_pair(std::make_pair(inboundDst, outboundDst), iclock64()))); + std::unique_lock routingTableLock(routingTableMutex); + routingTable.insert(std::make_pair(ctx, std::make_tuple(inboundDst, outboundDst, iclock()))); } std::optional, std::shared_ptr>> NetManager::route(const NetContext& ctx) { - std::shared_lock routingTableLock(routingTableMutex); - auto it = routingTable.find(ctx); + std::unique_lock routingTableLock(routingTableMutex); + const auto it = routingTable.find(ctx); if (it == routingTable.end()) { return std::optional, std::shared_ptr>>(); } - return std::optional, std::shared_ptr>>((*it).second.first); + return std::optional, std::shared_ptr>>( + std::make_pair(std::get<0>(it->second), std::get<1>(it->second))); } void NetManager::updateLastSeen(const NetContext& ctx) { - std::shared_lock routingTableLock(routingTableMutex); + std::unique_lock routingTableLock(routingTableMutex); auto it = routingTable.find(ctx); if (it != routingTable.end()) { - (*it).second.second = iclock64(); + std::get<2>(it->second) = iclock(); } } @@ -330,7 +338,8 @@ void selectThreadPayload(std::stop_token stoken) { while (!stoken.stop_requested()) { - if (!UdpSource::instance()->initialized(FROM_CAL)) + auto nm = NetManager::instance(); + if (nm->localSocket == NULL) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; @@ -340,17 +349,17 @@ void selectThreadPayload(std::stop_token stoken) timeval timeout {0, Cvar_kcp_select_timeout->GetInt() * 1000}; FD_ZERO(&sockets); - FD_SET(UdpSource::instance()->socket, &sockets); + FD_SET(nm->localSocket, &sockets); auto selectResult = select(NULL, &sockets, NULL, NULL, &timeout); if (selectResult == SOCKET_ERROR) { - NS::log::NEW_NET.get()->error("[UdpSource] select error @ {}: {}", UdpSource::instance()->socket, WSAGetLastError()); + NS::log::NEW_NET->error("[UdpSource] select error @ {}: {}", nm->localSocket, WSAGetLastError()); continue; } - if (!FD_ISSET(UdpSource::instance()->socket, &sockets)) + if (!FD_ISSET(nm->localSocket, &sockets)) { continue; } @@ -359,52 +368,30 @@ void selectThreadPayload(std::stop_token stoken) sockaddr_in6 from {}; int fromlen = sizeof(sockaddr_in6); - auto recvfromResult = orig_recvfrom(UdpSource::instance()->socket, buf.data(), buf.size(), 0, (sockaddr*)&from, &fromlen); + auto recvfromResult = orig_recvfrom(nm->localSocket, buf.data(), buf.size(), 0, (sockaddr*)&from, &fromlen); if (recvfromResult == SOCKET_ERROR) { auto lastError = WSAGetLastError(); if (lastError != WSAEWOULDBLOCK) { - NS::log::NEW_NET.get()->error("[UdpSource] recvfrom error @ {} : {}", UdpSource::instance()->socket, lastError); + NS::log::NEW_NET->error("[UdpSource] recvfrom error @ {} : {}", nm->localSocket, lastError); } continue; } buf.resize(recvfromResult); - NetContext ctx {UdpSource::instance()->socket, from}; - auto route = NetManager::instance()->route(ctx); - - if (route.has_value()) - { - if (!route->first->initialized(FROM_CAL)) - { - NS::log::NEW_NET.get()->warn("[UdpSource] Routed {} to uninitalized NetSink*", ctx); - continue; - } - auto inputResult = route->first->input(NetBuffer(buf), ctx, UdpSource::instance().get()); - if (inputResult != 0) - { - NS::log::NEW_NET.get()->error("[UdpSource] NetSink*->input {} error: {}", ctx, inputResult); - } - continue; - } - - // New connection - - NS::log::NEW_NET.get()->info("[UdpSource] Accepting new connection from {}", ctx); - auto newRoute = NetManager::instance()->initAndBind(ctx); + NetContext ctx {from}; + auto route = NetManager::instance()->route(ctx).value_or(NetManager::instance()->initAndBind(ctx)); - if (!newRoute.first->initialized(FROM_CAL)) + if (!route.first->initialized(FROM_CAL)) { - NS::log::NEW_NET.get()->warn("[UdpSource] Routed {} to uninitalized NetSink*", ctx); - continue; + NS::log::NEW_NET->warn("[UdpSource] Discarding data from {} cause uninitalized NetSink", ctx); } - auto inputResult = newRoute.first->input(NetBuffer(buf), ctx, UdpSource::instance().get()); - if (inputResult != 0) + else { - NS::log::NEW_NET.get()->error("[UdpSource] NetSink*->input {} error: {}", ctx, inputResult); + route.first->input(NetBuffer(buf), ctx, UdpSource::instance().get()); } } } @@ -422,26 +409,18 @@ UdpSource::~UdpSource() int UdpSource::sendto(NetBuffer&& buf, const NetContext& ctx, const NetSink* top) { - if (ctx.socket != socket) - { - return NET_HOOK_NOT_ALTERED; - } - auto sendtoResult = orig_sendto(socket, buf.data(), buf.size(), 0, (const sockaddr*)&ctx.addr, sizeof(sockaddr_in6)); + auto sendtoResult = + orig_sendto(NetManager::instance()->localSocket, buf.data(), buf.size(), 0, (const sockaddr*)&ctx.addr, sizeof(sockaddr_in6)); if (sendtoResult == SOCKET_ERROR) { - NS::log::NEW_NET.get()->error("[UdpSource] sendto {} error: {}", ctx, WSAGetLastError()); + NS::log::NEW_NET->error("[UdpSource] sendto {} error: {}", ctx, WSAGetLastError()); } return sendtoResult; } bool UdpSource::initialized(int from) { - return socket != NULL; -} - -void UdpSource::bindSocket(const SOCKET& s) -{ - socket = s; + return NetManager::instance()->localSocket != NULL; } std::shared_ptr UdpSource::instance() @@ -460,7 +439,7 @@ int GameSink::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bot bool GameSink::initialized(int from) { - return true; + return NetManager::instance()->localSocket != NULL; } int GameSink::recvfrom( @@ -471,24 +450,15 @@ int GameSink::recvfrom( _Out_writes_bytes_to_opt_(*fromlen, *fromlen) struct sockaddr FAR* from, _Inout_opt_ int FAR* fromlen) { - if (!UdpSource::instance()->initialized(FROM_CAL) || UdpSource::instance()->socket != s) - { - return NET_HOOK_NOT_ALTERED; - } - std::pair data; if (pendingData.try_pop(data)) { - if (data.second.socket != s) - { - pendingData.push(data); - return NET_HOOK_NOT_ALTERED; - } if (from == nullptr || fromlen == nullptr || *fromlen < sizeof(sockaddr_in6)) { WSASetLastError(WSAEFAULT); return SOCKET_ERROR; } + memcpy_s(from, *fromlen, &data.second.addr, sizeof(sockaddr_in6)); NetManager::instance()->updateLastSeen(data.second); @@ -523,51 +493,22 @@ int GameSink::sendto( WSASetLastError(WSAEFAULT); return SOCKET_ERROR; } + auto nm = NetManager::instance(); - auto converted = *(sockaddr_in6*)to; - NetContext ctx {s, converted}; - auto route = NetManager::instance()->route(ctx); + NetContext ctx {*(sockaddr_in6*)to}; + auto route = nm->route(ctx).value_or(nm->initAndBind(ctx)); - if (route.has_value()) + if (!route.second->initialized(FROM_CAL)) { - if (!route->second->initialized(FROM_CAL)) - { - NS::log::NEW_NET->warn("[GameSink] Routed {} to uninitalized NetSource*", ctx); - return NET_HOOK_NOT_ALTERED; - } - auto sendtoResult = route->second->sendto(NetBuffer(buf, len), ctx, GameSink::instance().get()); - if (sendtoResult < 0) - { - NS::log::NEW_NET->error("[GameSink] NetSource*->sendto {} error: {}", ctx, sendtoResult); - } - else - { - NetManager::instance()->updateLastSeen(ctx); - } - return sendtoResult; + NS::log::NEW_NET->warn("[GameSink] Routed {} to uninitalized NetSource*", ctx); + WSASetLastError(WSAENETDOWN); + return SOCKET_ERROR; } - - // New connection - - if (s == UdpSource::instance()->socket) + else { - NS::log::NEW_NET.get()->info("[GameSink] Initiating new connection to {}", ctx); - auto newRoute = NetManager::instance()->initAndBind(ctx); - - if (!newRoute.first->initialized(FROM_CAL)) - { - NS::log::NEW_NET.get()->warn("[GameSink] Routed {} to uninitalized NetSource*", ctx); - return NET_HOOK_NOT_ALTERED; - } - auto sendtoResult = newRoute.second->sendto(NetBuffer(buf, len), ctx, GameSink::instance().get()); - if (sendtoResult != 0) - { - NS::log::NEW_NET.get()->error("[GameSink] NetSource*->sendto {} error: {}", ctx, sendtoResult); - } - return sendtoResult; + NetManager::instance()->updateLastSeen(ctx); + return route.second->sendto(NetBuffer(buf, len), ctx, GameSink::instance().get()); } - - return NET_HOOK_NOT_ALTERED; } std::shared_ptr GameSink::instance() @@ -603,27 +544,29 @@ int FecLayer::sendto(NetBuffer&& buf, const NetContext& ctx, const NetSink* top) } auto encoded = encode(buf); + int result = 0; for (auto& nBuf : encoded) { auto sendtoResult = bottom.lock()->sendto(std::move(nBuf), ctx, this); if (sendtoResult == SOCKET_ERROR) { - NS::log::NEW_NET.get()->error("[FEC] bottom->sendto {} error: {}", ctx, sendtoResult); + result = SOCKET_ERROR; } } - return 0; + return result; } int FecLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bottom) { + // Bypass non-FEC packets if (buf.size() < FEC_MIN_SIZE) { - NS::log::NEW_NET.get()->warn("[FEC] input {} sliently dropping non-FEC packet: insufficient length", ctx); - return 0; + return top->input(std::move(buf), ctx, this); } IUINT16 flag = 0; ikcp_decode16u(buf.data() + 4, &flag); + int result = 0; if (flag == FEC_TYPE_DATA || flag == FEC_TYPE_PARITY) { @@ -636,7 +579,7 @@ int FecLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bot auto fecSize = nBuf.getU16H(); // drop size if (fecSize < 2) { - NS::log::NEW_NET.get()->warn("[FEC] top->input {} spurious input with fecSize < 2", ctx); + NS::log::NEW_NET->warn("[FEC] input {}: spurious input with fecSize < 2", ctx); } else { @@ -645,7 +588,7 @@ int FecLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bot auto inputResult = top->input(std::move(nBuf), ctx, this); if (inputResult == SOCKET_ERROR) { - NS::log::NEW_NET.get()->error("[FEC] top->input {} error: {}", ctx, inputResult); + result = SOCKET_ERROR; } } } @@ -655,29 +598,25 @@ int FecLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bot if (fecSize < 2) { - NS::log::NEW_NET.get()->warn("[FEC] top->input {} spurious reconstructed with fecSize < 2", ctx); + NS::log::NEW_NET->warn("[FEC] input {}: reconstructed spurious input with fecSize < 2", ctx); continue; } rBuf.resize(fecSize - 2, 0); - auto inputResult = top->input(std::move(rBuf), {ctx.socket, ctx.addr, true}, this); + auto inputResult = top->input(std::move(rBuf), {ctx.addr, true}, this); if (inputResult == SOCKET_ERROR) { - NS::log::NEW_NET.get()->error("[FEC] top->input {} error: {}", ctx, inputResult); + result = SOCKET_ERROR; } } } else { - auto inputResult = top->input(std::move(buf), ctx, this); - if (inputResult == SOCKET_ERROR) - { - NS::log::NEW_NET.get()->error("[FEC] top->input {} error: {}", ctx, inputResult); - } - return inputResult; + // Bypass non-FEC packets + return top->input(std::move(buf), ctx, this); } - return 0; + return result; } bool FecLayer::initialized(int from) @@ -934,7 +873,7 @@ void FecLayer::AutoTuner::sample(bool bit, IUINT32 seq) pulses[0] = {bit, seq}; } -std::pair FecLayer::AutoTuner::findPeriods() +std::pair FecLayer::AutoTuner::findPeriods() const { std::unordered_map dataShardsInterval; std::unordered_map parityShardsInterval; @@ -995,7 +934,7 @@ void updateThreadPayload(std::stop_token stoken, KcpLayer* layer) if (itimediff(current, lastStatsSync) >= 100) { auto ng = NetGraphSink::instance(); - std::shared_lock lk2(ng->windowsMutex); + std::unique_lock lk2(ng->windowsMutex); std::get<0>(ng->windows[layer->remoteAddr]).sync(layer->cb); std::get<1>(ng->windows[layer->remoteAddr]).rotate(layer->cb->rx_srtt); std::get<2>(ng->windows[layer->remoteAddr]).rotate(std::get<0>(ng->windows[layer->remoteAddr])); @@ -1020,15 +959,11 @@ int kcpOutput(const char* buf, int len, ikcpcb* kcp, void* user) auto source = layer->bottom.lock(); if (!source->initialized(FROM_CAL)) { - NS::log::NEW_NET.get()->error("[KCP] kcpOutput: Uninitalized bottom"); - return -1; - } - auto sendToResult = source->sendto(NetBuffer(buf, len), layer->remoteAddr, layer); - if (sendToResult == SOCKET_ERROR) - { - NS::log::NEW_NET.get()->error("[KCP] kcpOutput: sendto error"); + NS::log::NEW_NET->error("[KCP] kcpOutput: {}: uninitalized NetSource", layer->remoteAddr); + WSASetLastError(WSAENETDOWN); + return SOCKET_ERROR; } - return sendToResult; + return source->sendto(NetBuffer(buf, len), layer->remoteAddr, layer); } KcpLayer::KcpLayer(const NetContext& ctx) @@ -1036,7 +971,7 @@ KcpLayer::KcpLayer(const NetContext& ctx) cb = ikcp_create(0, this); cb->output = kcpOutput; - ikcp_wndsize(cb, 512, 512); + ikcp_wndsize(cb, 128, 256); ikcp_nodelay(cb, 1, 10, 2, 1); cb->interval = Cvar_kcp_timer_resolution->GetInt(); @@ -1063,7 +998,7 @@ int KcpLayer::sendto(NetBuffer&& buf, const NetContext& ctx, const NetSink* top) if (result < 0) { - NS::log::NEW_NET.get()->error("[KCP] sendto {}: error {}", ctx, result); + NS::log::NEW_NET->error("[KCP] sendto {}: error {}", ctx, result); } std::unique_lock lk2(updateCvMutex); @@ -1136,18 +1071,12 @@ int MuxLayer::sendto(NetBuffer&& buf, const NetContext& ctx, const NetSink* top) { if (!topInverseMap.contains((uintptr_t)top)) { - NS::log::NEW_NET.get()->error("[MUX] sendto {} silently dropping packets from unknown", ctx); + NS::log::NEW_NET->error("[Mux] sendto {}: dropping packets from unknown sink", ctx); return SOCKET_ERROR; } buf.putU8H(topInverseMap[(uintptr_t)top]); - auto sendtoResult = bottom.lock()->sendto(std::move(buf), ctx, this); - if (sendtoResult == SOCKET_ERROR) - { - NS::log::NEW_NET.get()->error("[MUX] sendto {} error: {}", ctx, sendtoResult); - } - - return sendtoResult; + return bottom.lock()->sendto(std::move(buf), ctx, this); } int MuxLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bottom) @@ -1155,15 +1084,10 @@ int MuxLayer::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bot IUINT8 channelId = buf.getU8H(); if (!topMap.contains(channelId)) { - NS::log::NEW_NET.get()->error("[MUX] input {} silently dropping packets from unknown", ctx); + NS::log::NEW_NET.get()->error("[Mux] input {}: dropping packets from unknown source", ctx); return SOCKET_ERROR; } - auto inputResult = topMap[channelId]->input(std::move(buf), ctx, this); - if (inputResult == SOCKET_ERROR) - { - NS::log::NEW_NET.get()->error("[MUX] input {} error: {}", ctx, inputResult); - } - return inputResult; + return topMap[channelId]->input(std::move(buf), ctx, this); } bool MuxLayer::initialized(int from) @@ -1211,18 +1135,3 @@ void MuxLayer::bindBottom(std::weak_ptr bottom) { this->bottom = bottom; } - -DummySink::DummySink() {} - -DummySink::~DummySink() {} - -int DummySink::input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bottom) -{ - NS::log::NEW_NET.get()->error("[DUMMY] input {}: {} {}", ctx, buf.data()[0], buf.data()[1]); - return 0; -} - -bool DummySink::initialized(int from) -{ - return true; -} diff --git a/NorthstarDLL/shared/kcpintegration.h b/NorthstarDLL/shared/kcpintegration.h index f0f04439d..bcce1f1b8 100644 --- a/NorthstarDLL/shared/kcpintegration.h +++ b/NorthstarDLL/shared/kcpintegration.h @@ -1,11 +1,8 @@ #pragma once -#include #include #include "shared/ikcp.h" #include -#include -#include #define RS_MALLOC _malloc_base #define RS_FREE _free_base @@ -14,7 +11,6 @@ struct NetContext { - SOCKET socket; sockaddr_in6 addr; bool recon = false; }; @@ -30,7 +26,7 @@ template <> struct fmt::formatter { char addrStr[47] {'\0'}; inet_ntop(p.addr.sin6_family, &p.addr.sin6_addr, addrStr, 47); - return fmt::format_to(ctx.out(), "[{}]:{}@{}", addrStr, ntohs(p.addr.sin6_port), p.socket); + return fmt::format_to(ctx.out(), "[{}]:{}", addrStr, ntohs(p.addr.sin6_port)); } }; @@ -89,7 +85,7 @@ namespace std { bool operator()(const NetContext& l, const NetContext& r) const { - return equal_to()(l.socket, r.socket) && equal_to()(l.addr, r.addr); + return equal_to()(l.addr, r.addr); } }; @@ -97,10 +93,7 @@ namespace std { size_t operator()(const NetContext& k) const { - size_t res = 17; - res = res * 31 + hash()(k.socket); - res = res * 31 + hash()(k.addr); - return res; + return hash()(k.addr); } }; } // namespace std @@ -258,17 +251,17 @@ class NetManager std::pair, std::shared_ptr> initAndBind(const NetContext& ctx); std::pair, std::shared_ptr> initAndBind( const NetContext& ctx, - std::pair, std::shared_ptr> (*connectionInitFunc)(const SOCKET& s, const sockaddr_in6& addr)); + std::pair, std::shared_ptr> (*connectionInitFunc)(const NetContext& ctx)); void bind(const NetContext& ctx, std::shared_ptr inboundDst, std::shared_ptr outboundDst); std::optional, std::shared_ptr>> route(const NetContext& ctx); void updateLastSeen(const NetContext& ctx); friend void recycleThreadPayload(std::stop_token stoken); - // Only locked exclusively when disconnecting. - std::shared_mutex routingTableMutex; - Concurrency::concurrent_unordered_map, std::shared_ptr>, IINT64>> - routingTable; + std::mutex routingTableMutex; + std::unordered_map, std::shared_ptr, IUINT32>> routingTable; + + SOCKET localSocket = NULL; private: std::jthread recycleThread; @@ -319,15 +312,12 @@ class UdpSource : public NetSource virtual int sendto(NetBuffer&& buf, const NetContext& ctx, const NetSink* top); virtual bool initialized(int from); - void bindSocket(const SOCKET& s); - static std::shared_ptr instance(); friend GameSink; friend void selectThreadPayload(std::stop_token stoken); private: - SOCKET socket = NULL; std::jthread selectThread; UdpSource(); @@ -377,7 +367,7 @@ class FecLayer : public NetSource, public NetSink Pulse pulses[FEC_MAX_AUTO_TUNE_SAMPLE] {}; void sample(bool bit, IUINT32 seq); - std::pair findPeriods(); + std::pair findPeriods() const; }; // Decoder part @@ -450,13 +440,3 @@ class MuxLayer : public NetSource, public NetSink std::unordered_map> topMap; std::unordered_map topInverseMap; }; - -class DummySink : public NetSink -{ - public: - DummySink(); - ~DummySink(); - - virtual int input(NetBuffer&& buf, const NetContext& ctx, const NetSource* bottom); - virtual bool initialized(int from); -};