Skip to content

Commit

Permalink
Mod hashing + better download (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
lionkor authored Oct 4, 2024
2 parents 55f1a3c + 5d34090 commit 611e53b
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 139 deletions.
4 changes: 0 additions & 4 deletions include/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class TClient final {
std::string GetCarData(int Ident);
std::string GetCarPositionRaw(int Ident);
void SetUDPAddr(const ip::udp::endpoint& Addr) { mUDPAddress = Addr; }
void SetDownSock(ip::tcp::socket&& CSock) { mDownSocket = std::move(CSock); }
void SetTCPSock(ip::tcp::socket&& CSock) { mSocket = std::move(CSock); }
void Disconnect(std::string_view Reason);
bool IsDisconnected() const { return !mSocket.is_open(); }
Expand All @@ -75,8 +74,6 @@ class TClient final {
[[nodiscard]] const std::unordered_map<std::string, std::string>& GetIdentifiers() const { return mIdentifiers; }
[[nodiscard]] const ip::udp::endpoint& GetUDPAddr() const { return mUDPAddress; }
[[nodiscard]] ip::udp::endpoint& GetUDPAddr() { return mUDPAddress; }
[[nodiscard]] ip::tcp::socket& GetDownSock() { return mDownSocket; }
[[nodiscard]] const ip::tcp::socket& GetDownSock() const { return mDownSocket; }
[[nodiscard]] ip::tcp::socket& GetTCPSock() { return mSocket; }
[[nodiscard]] const ip::tcp::socket& GetTCPSock() const { return mSocket; }
[[nodiscard]] std::string GetRoles() const { return mRole; }
Expand Down Expand Up @@ -122,7 +119,6 @@ class TClient final {
SparseArray<std::string> mVehiclePosition;
std::string mName = "Unknown Client";
ip::tcp::socket mSocket;
ip::tcp::socket mDownSocket;
ip::udp::endpoint mUDPAddress {};
int mUnicycleID = -1;
std::string mRole;
Expand Down
3 changes: 1 addition & 2 deletions include/TNetwork.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class TNetwork {
std::mutex mOpenIDMutex;

std::vector<uint8_t> UDPRcvFromClient(ip::udp::endpoint& ClientEndpoint);
void HandleDownload(TConnection&& TCPSock);
void OnConnect(const std::weak_ptr<TClient>& c);
void TCPClient(const std::weak_ptr<TClient>& c);
void Looper(const std::weak_ptr<TClient>& c);
Expand All @@ -67,7 +66,7 @@ class TNetwork {
void Parse(TClient& c, const std::vector<uint8_t>& Packet);
void SendFile(TClient& c, const std::string& Name);
static bool TCPSendRaw(TClient& C, ip::tcp::socket& socket, const uint8_t* Data, size_t Size);
static void SplitLoad(TClient& c, size_t Sent, size_t Size, bool D, const std::string& Name);
static void SendFileToClient(TClient& c, size_t Size, const std::string& Name);
static const uint8_t* SendSplit(TClient& c, ip::tcp::socket& Socket, const uint8_t* DataPtr, size_t Size);
};

Expand Down
8 changes: 8 additions & 0 deletions include/TResourceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once

#include "Common.h"
#include <nlohmann/json.hpp>

class TResourceManager {
public:
Expand All @@ -30,10 +31,17 @@ class TResourceManager {
[[nodiscard]] std::string FileSizes() const { return mFileSizes; }
[[nodiscard]] int ModsLoaded() const { return mModsLoaded; }

[[nodiscard]] std::string NewFileList() const;

void RefreshFiles();

private:
size_t mMaxModSize = 0;
std::string mFileSizes;
std::string mFileList;
std::string mTrimmedList;
int mModsLoaded = 0;

std::mutex mModsMutex;
nlohmann::json mMods;
};
1 change: 0 additions & 1 deletion src/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ void TClient::EnqueuePacket(const std::vector<uint8_t>& Packet) {
TClient::TClient(TServer& Server, ip::tcp::socket&& Socket)
: mServer(Server)
, mSocket(std::move(Socket))
, mDownSocket(ip::tcp::socket(Server.IoCtx()))
, mLastPingTime(std::chrono::high_resolution_clock::now()) {
}

Expand Down
150 changes: 24 additions & 126 deletions src/TNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ void TNetwork::Identify(TConnection&& RawConnection) {
if (Code == 'C') {
Client = Authentication(std::move(RawConnection));
} else if (Code == 'D') {
HandleDownload(std::move(RawConnection));
beammp_errorf("Old download packet detected - the client is wildly out of date, this will be ignored");
return;
} else if (Code == 'P') {
boost::system::error_code ec;
write(RawConnection.Socket, buffer("P"), ec);
Expand All @@ -249,7 +250,7 @@ void TNetwork::Identify(TConnection&& RawConnection) {
beammp_errorf("Invalid code got in Identify: '{}'", Code);
}
} catch (const std::exception& e) {
beammp_errorf("Error during handling of code {} - client left in invalid state, closing socket", Code);
beammp_errorf("Error during handling of code {} - client left in invalid state, closing socket: {}", Code, e.what());
boost::system::error_code ec;
RawConnection.Socket.shutdown(socket_base::shutdown_both, ec);
if (ec) {
Expand All @@ -262,27 +263,7 @@ void TNetwork::Identify(TConnection&& RawConnection) {
}
}

void TNetwork::HandleDownload(TConnection&& Conn) {
char D;
boost::system::error_code ec;
read(Conn.Socket, buffer(&D, 1), ec);
if (ec) {
Conn.Socket.shutdown(socket_base::shutdown_both, ec);
// ignore ec
return;
}
auto ID = uint8_t(D);
mServer.ForEachClient([&](const std::weak_ptr<TClient>& ClientPtr) -> bool {
ReadLock Lock(mServer.GetClientMutex());
if (!ClientPtr.expired()) {
auto c = ClientPtr.lock();
if (c->GetID() == ID) {
c->SetDownSock(std::move(Conn.Socket));
}
}
return true;
});
}


std::string HashPassword(const std::string& str) {
std::stringstream ret;
Expand Down Expand Up @@ -758,11 +739,11 @@ void TNetwork::Parse(TClient& c, const std::vector<uint8_t>& Packet) {
case 'S':
if (SubCode == 'R') {
beammp_debug("Sending Mod Info");
std::string ToSend = mResourceManager.FileList() + mResourceManager.FileSizes();
if (ToSend.empty())
ToSend = "-";
std::string ToSend = mResourceManager.NewFileList();
beammp_debugf("Mod Info: {}", ToSend);
if (!TCPSend(c, StringToVector(ToSend))) {
// TODO: error
ClientKick(c, "TCP Send 'SY' failed");
return;
}
}
return;
Expand All @@ -772,8 +753,6 @@ void TNetwork::Parse(TClient& c, const std::vector<uint8_t>& Packet) {
}

void TNetwork::SendFile(TClient& c, const std::string& UnsafeName) {
beammp_info(c.GetName() + " requesting : " + UnsafeName.substr(UnsafeName.find_last_of('/')));

if (!fs::path(UnsafeName).has_filename()) {
if (!TCPSend(c, StringToVector("CO"))) {
// TODO: handle
Expand All @@ -796,87 +775,9 @@ void TNetwork::SendFile(TClient& c, const std::string& UnsafeName) {
// TODO: handle
}

/// Wait for connections
int T = 0;
while (!c.GetDownSock().is_open() && T < 50) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
T++;
}

if (!c.GetDownSock().is_open()) {
beammp_error("Client doesn't have a download socket!");
if (!c.IsDisconnected())
c.Disconnect("Missing download socket");
return;
}

size_t Size = size_t(std::filesystem::file_size(FileName));
size_t MSize = Size / 2;

std::thread SplitThreads[2] {
std::thread([&] {
RegisterThread("SplitLoad_0");
SplitLoad(c, 0, MSize, false, FileName);
}),
std::thread([&] {
RegisterThread("SplitLoad_1");
SplitLoad(c, MSize, Size, true, FileName);
})
};

for (auto& SplitThread : SplitThreads) {
if (SplitThread.joinable()) {
SplitThread.join();
}
}
}

static std::pair<size_t /* count */, size_t /* last chunk */> SplitIntoChunks(size_t FullSize, size_t ChunkSize) {
if (FullSize < ChunkSize) {
return { 0, FullSize };
}
size_t Count = FullSize / (FullSize / ChunkSize);
size_t LastChunkSize = FullSize - (Count * ChunkSize);
return { Count, LastChunkSize };
}

TEST_CASE("SplitIntoChunks") {
size_t FullSize;
size_t ChunkSize;
SUBCASE("Normal case") {
FullSize = 1234567;
ChunkSize = 1234;
}
SUBCASE("Zero original size") {
FullSize = 0;
ChunkSize = 100;
}
SUBCASE("Equal full size and chunk size") {
FullSize = 125;
ChunkSize = 125;
}
SUBCASE("Even split") {
FullSize = 10000;
ChunkSize = 100;
}
SUBCASE("Odd split") {
FullSize = 13;
ChunkSize = 2;
}
SUBCASE("Large sizes") {
FullSize = 10 * GB;
ChunkSize = 125 * MB;
}
auto [Count, LastSize] = SplitIntoChunks(FullSize, ChunkSize);
CHECK((Count * ChunkSize) + LastSize == FullSize);
}

const uint8_t* /* end ptr */ TNetwork::SendSplit(TClient& c, ip::tcp::socket& Socket, const uint8_t* DataPtr, size_t Size) {
if (TCPSendRaw(c, Socket, DataPtr, Size)) {
return DataPtr + Size;
} else {
return nullptr;
}
SendFileToClient(c, Size, FileName);
}

#if defined(BEAMMP_LINUX)
Expand All @@ -886,8 +787,8 @@ const uint8_t* /* end ptr */ TNetwork::SendSplit(TClient& c, ip::tcp::socket& So
#include <unistd.h>
#include <signal.h>
#endif
void TNetwork::SplitLoad(TClient& c, size_t Offset, size_t End, bool D, const std::string& Name) {
TScopedTimer timer(fmt::format("Download of {}-{} for '{}'", Offset, End, Name));
void TNetwork::SendFileToClient(TClient& c, size_t Size, const std::string& Name) {
TScopedTimer timer(fmt::format("Download of '{}' for client {}", Name, c.GetID()));
#if defined(BEAMMP_LINUX)
signal(SIGPIPE, SIG_IGN);
// on linux, we can use sendfile(2)!
Expand All @@ -897,11 +798,11 @@ void TNetwork::SplitLoad(TClient& c, size_t Offset, size_t End, bool D, const st
return;
}
// native handle, needed in order to make native syscalls with it
int socket = D ? c.GetDownSock().native_handle() : c.GetTCPSock().native_handle();
int socket = c.GetTCPSock().native_handle();

ssize_t ret = 0;
auto ToSendTotal = End - Offset;
auto Start = Offset;
auto ToSendTotal = Size;
auto Start = 0;
while (ret < ssize_t(ToSendTotal)) {
auto SysOffset = off_t(Start + size_t(ret));
ret = sendfile(socket, fd, &SysOffset, ToSendTotal - size_t(ret));
Expand All @@ -915,35 +816,32 @@ void TNetwork::SplitLoad(TClient& c, size_t Offset, size_t End, bool D, const st
std::ifstream f(Name.c_str(), std::ios::binary);
uint32_t Split = 125 * MB;
std::vector<uint8_t> Data;
if (End > Split)
if (Size > Split)
Data.resize(Split);
else
Data.resize(End);
ip::tcp::socket* TCPSock { nullptr };
if (D)
TCPSock = &c.GetDownSock();
else
TCPSock = &c.GetTCPSock();
while (!c.IsDisconnected() && Offset < End) {
size_t Diff = End - Offset;
Data.resize(Size);
ip::tcp::socket* TCPSock = &c.GetTCPSock();
std::streamsize Sent = 0;
while (!c.IsDisconnected() && Sent < Size) {
size_t Diff = Size - Sent;
if (Diff > Split) {
f.seekg(Offset, std::ios_base::beg);
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Split);
if (!TCPSendRaw(c, *TCPSock, Data.data(), Split)) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (1)");
break;
}
Offset += Split;
Sent += Split;
} else {
f.seekg(Offset, std::ios_base::beg);
f.seekg(Sent, std::ios_base::beg);
f.read(reinterpret_cast<char*>(Data.data()), Diff);
if (!TCPSendRaw(c, *TCPSock, Data.data(), int32_t(Diff))) {
if (!c.IsDisconnected())
c.Disconnect("TCPSendRaw failed in mod download (2)");
break;
}
Offset += Diff;
Sent += Diff;
}
}
#endif
Expand Down
Loading

0 comments on commit 611e53b

Please sign in to comment.