Skip to content

Commit

Permalink
13213213
Browse files Browse the repository at this point in the history
  • Loading branch information
WtzLAS committed Dec 20, 2023
1 parent dc3469c commit 189a91b
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 16 deletions.
125 changes: 112 additions & 13 deletions NorthstarDLL/shared/kcpintegration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,12 @@ std::pair<std::shared_ptr<NetSink>, std::shared_ptr<NetSource>> connectionInitDe
{
std::shared_ptr<FecLayer> fec =
std::shared_ptr<FecLayer>(new FecLayer(Cvar_kcp_fec_send_data_shards->GetInt(), Cvar_kcp_fec_send_parity_shards->GetInt(), 1, 1));
fec->bindTop(std::static_pointer_cast<NetSink>(GameSink::instance()));
std::shared_ptr<KcpLayer> kcp = std::shared_ptr<KcpLayer>(new KcpLayer({s, addr}));
kcp->bindTop(std::static_pointer_cast<NetSink>(GameSink::instance()));
kcp->bindBottom(std::static_pointer_cast<NetSource>(fec));
fec->bindTop(std::static_pointer_cast<NetSink>(kcp));
fec->bindBottom(std::static_pointer_cast<NetSource>(UdpSource::instance()));
return std::make_pair(std::static_pointer_cast<NetSink>(fec), std::static_pointer_cast<NetSource>(fec));
return std::make_pair(std::static_pointer_cast<NetSink>(fec), std::static_pointer_cast<NetSource>(kcp));
}

std::pair<std::shared_ptr<NetSink>, std::shared_ptr<NetSource>> NetManager::initAndBind(const NetContext& ctx)
Expand Down Expand Up @@ -584,17 +587,29 @@ int FecLayer::input(const NetBuffer& buf, const NetContext& ctx)
nBuf.getU32H(); // drop seqid
nBuf.getU16H(); // drop flag
auto fecSize = nBuf.getU16H(); // drop size
nBuf.resize(fecSize - 2, 0);

auto inputResult = top->input(nBuf, ctx);
if (inputResult == SOCKET_ERROR)
if (fecSize < 2)
{
NS::log::NEW_NET.get()->error("[FEC] top->input {} error: {}", ctx, inputResult);
NS::log::NEW_NET.get()->warn("[FEC] top->input {} spurious input with fecSize < 2", ctx);
}
else
{
nBuf.resize(fecSize - 2, 0);

auto inputResult = top->input(nBuf, ctx);
if (inputResult == SOCKET_ERROR)
{
NS::log::NEW_NET.get()->error("[FEC] top->input {} error: {}", ctx, inputResult);
}
}
}
for (auto& rBuf : reconstructed)
{
auto fecSize = rBuf.getU16H();
if (fecSize < 2)
{
NS::log::NEW_NET.get()->warn("[FEC] top->input {} spurious reconstructed with fecSize < 2", ctx);
continue;
}
rBuf.resize(fecSize - 2, 0);

auto inputResult = top->input(rBuf, ctx);
Expand All @@ -606,7 +621,7 @@ int FecLayer::input(const NetBuffer& buf, const NetContext& ctx)
}
else
{
NS::log::NEW_NET.get()->warn("[FEC] {} sliently dropping non-FEC packet: invalid flag", ctx);
NS::log::NEW_NET.get()->warn("[FEC] top->input {} sliently dropping non-FEC packet: invalid flag", ctx);
}

return 0;
Expand Down Expand Up @@ -896,16 +911,100 @@ std::pair<int, int> FecLayer::AutoTuner::findPeriods()
return std::pair<int, int>(ds->first, ps->first);
}

void updateThreadPayload(std::stop_token stoken) {}
void updateThreadPayload(std::stop_token stoken, KcpLayer* layer)
{
while (!stoken.stop_requested())
{
if (!layer->initialized())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
ikcp_update(layer->cb, iclock());
auto current = iclock();
auto next = ikcp_check(layer->cb, current);
auto duration = itimediff(next, current);
if (duration > 0)
{
std::unique_lock<std::mutex> lk(layer->updateCvMutex);
auto cvResult = layer->updateCv.wait_for(lk, std::chrono::milliseconds(duration));
}
}
}

int kcpOutput(const char* buf, int len, ikcpcb* kcp, void* user)
{
KcpLayer* layer = (KcpLayer*)user;
auto source = layer->bottom.lock();
if (!source->initialized())
{
NS::log::NEW_NET.get()->error("[KCP] kcpOutput: Uninitalized bottom");
return -1;
}
auto sendToResult = source->sendto(NetBuffer(buf, len), layer->remoteAddr);
if (sendToResult == SOCKET_ERROR)
{
NS::log::NEW_NET.get()->error("[KCP] kcpOutput: sendto error");
}
return sendToResult;
}

KcpLayer::KcpLayer(const NetContext& ctx)
{
cb = ikcp_create(0, this);
cb->output = kcpOutput;
ikcp_nodelay(cb, 1, Cvar_kcp_timer_resolution->GetInt(), 2, 1);

remoteAddr = ctx;
updateThread = std::jthread(updateThreadPayload, this);
updateThread.detach();
}

KcpLayer::KcpLayer()
KcpLayer::~KcpLayer()
{
cb = ikcp_create(0, );
updateThread.request_stop();
updateThread.join();

ikcp_release(cb);
cb = nullptr;
}

KcpLayer::~KcpLayer() {}
int KcpLayer::sendto(const NetBuffer& buf, const NetContext& ctx)
{
auto result = ikcp_send(cb, buf.data(), buf.size());
if (result < 0)
{
NS::log::NEW_NET.get()->error("[KCP] sendto {}: error {}", ctx, result);
}
std::unique_lock<std::mutex> lk(updateCvMutex);
updateCv.notify_all();
return result;
}

int KcpLayer::input(const NetBuffer& buf, const NetContext& ctx)
{
auto result = ikcp_input(cb, buf.data(), buf.size());
if (result < 0)
{
NS::log::NEW_NET.get()->error("[KCP] input {}: error {}", ctx, result);
}
std::unique_lock<std::mutex> lk(updateCvMutex);
updateCv.notify_all();
return result;
}

bool KcpLayer::initialized()
{
return top && !bottom.expired() && top->initialized() && bottom.lock()->initialized() && updateThread.joinable() && cb != nullptr && cb->user != nullptr;
return top && !bottom.expired() && top->initialized() && bottom.lock()->initialized() && updateThread.joinable() && cb != nullptr &&
cb->user != nullptr;
}

void KcpLayer::bindTop(std::shared_ptr<NetSink> top)
{
this->top = top;
}

void KcpLayer::bindBottom(std::weak_ptr<NetSource> bottom)
{
this->bottom = bottom;
}
13 changes: 10 additions & 3 deletions NorthstarDLL/shared/kcpintegration.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,21 @@ class FecLayer : public NetSource, public NetSink
std::vector<NetBuffer> encode(const NetBuffer& buf);
};

class KcpLayer : public NetSource, NetSink
class KcpLayer : public NetSource, public NetSink
{
public:
KcpLayer();
KcpLayer(const NetContext& ctx);
~KcpLayer();

virtual int sendto(const NetBuffer& buf, const NetContext& ctx);
virtual int input(const NetBuffer& buf, const NetContext& ctx);
virtual bool initialized();

void startUpdateThread();
void bindTop(std::shared_ptr<NetSink> top);
void bindBottom(std::weak_ptr<NetSource> bottom);

friend void updateThreadPayload(std::stop_token stoken, KcpLayer* layer);
friend int kcpOutput(const char* buf, int len, ikcpcb* kcp, void* layer);

private:
std::shared_ptr<NetSink> top;
Expand All @@ -408,5 +412,8 @@ class KcpLayer : public NetSource, NetSink
std::condition_variable updateCv;
std::mutex updateCvMutex;

// Stored cause ikcpcb invokes udp_output callback indefinitely.
NetContext remoteAddr;

ikcpcb* cb;
};

0 comments on commit 189a91b

Please sign in to comment.