From dc874cc40f4bcfb0c701ab27441e54cb2387b676 Mon Sep 17 00:00:00 2001 From: mezzode Date: Mon, 11 Nov 2019 21:02:33 -0800 Subject: [PATCH] Clean up and readme --- .gitignore | 1 - CMakeLists.txt | 10 +++---- CMakeSettings.json | 28 +++++++++++++++++++ README.md | 33 +++++++++++++++++++++- client.cpp | 21 ++++++++------ client.h | 31 ++++++++------------- common.cpp | 10 +++++++ common.h | 3 ++ exampleAsyncClient.cpp | 62 ++++++++++++++++++++++++++++-------------- serialization.h | 4 +-- server.cpp | 37 ++++++++++++------------- server.h | 5 +--- 12 files changed, 161 insertions(+), 84 deletions(-) create mode 100644 CMakeSettings.json create mode 100644 common.cpp diff --git a/.gitignore b/.gitignore index 13c64ae..2b4392f 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,3 @@ CTestTestfile.cmake _deps .vs out -CMakeSettings.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 3b49da4..021ad9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,7 @@ -# CMakeList.txt : CMake project for StreamBase, include source and define -# project specific logic here. -# +project(StreamBase) cmake_minimum_required (VERSION 3.8) # Add source to this project's executable. -add_executable (server "server.cpp" "server.h") -add_executable (exampleClient "exampleClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h") -add_executable (exampleAsyncClient "exampleAsyncClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h") +add_executable (server "server.cpp" "server.h" "common.cpp" "common.h") +add_executable (exampleClient "exampleClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h" "common.cpp" "common.h") +add_executable (exampleAsyncClient "exampleAsyncClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h" "common.cpp" "common.h") diff --git a/CMakeSettings.json b/CMakeSettings.json new file mode 100644 index 0000000..71601c7 --- /dev/null +++ b/CMakeSettings.json @@ -0,0 +1,28 @@ +{ + "configurations": [ + { + "name": "x64-Debug", + "generator": "Ninja", + "configurationType": "Debug", + "inheritEnvironments": [ "msvc_x64_x64" ], + "buildRoot": "${projectDir}\\out\\build\\${name}", + "installRoot": "${projectDir}\\out\\install\\${name}", + "cmakeCommandArgs": "", + "buildCommandArgs": "-v", + "ctestCommandArgs": "", + "variables": [] + }, + { + "name": "x64-Release", + "generator": "Ninja", + "configurationType": "Release", + "buildRoot": "${projectDir}\\out\\build\\${name}", + "installRoot": "${projectDir}\\out\\install\\${name}", + "cmakeCommandArgs": "", + "buildCommandArgs": "-v", + "ctestCommandArgs": "", + "inheritEnvironments": [ "msvc_x64_x64" ], + "variables": [] + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 219ddfb..2d36ab2 100644 --- a/README.md +++ b/README.md @@ -1 +1,32 @@ -StreamBase +# StreamBase + +Clients communicate with the server by initially sending a header message containing an Action which specifies whether they want to do a save or a get and what key to use. + +## Send +On receiving a send action, the server will expect and read a second message containing the binary archive of the data. +After inserting the data into the store, the server will send a message back to the client notifying it the save has been completed. + +## Get +On receiving a get action, the server will retrieve the binary archive for the given key from the store and send it back in a message to the client. +Note that the client-side `get()` function requires the type of the data being returned to be specified. + +## Data Store +The server stores data in a map from the given key to a binary archive of the provided object. +The map is guarded by a shared mutex to ensure thread-safe access. + +## Serialization + +This implementation of StreamBase depends on `cereal` to serialize data. +This means extra code may be required for certain types of data +e.g. for custom classes a `serialize` method that is a `friend` of the class may be required in order to serialize its attributes, as C++ lacks reflection. + +## Assumptions + +For simplicity it is assumed data being sent fits within a single message, and that cases will conform to the spec meaning there is currently no error handling +e.g. attempting to retrieve data which has not been saved is unhandled, clients expect the server to be running, etc. + +## Multithreading + +This implementation of StreamBase supports async via multithreading. The server uses a different thread and pipe instance for each client connection. + +Multithreaded code uses a custom `log()` function as `cout` is not thread-safe. diff --git a/client.cpp b/client.cpp index 0e33ca3..902ae79 100644 --- a/client.cpp +++ b/client.cpp @@ -1,7 +1,4 @@ -// client.cpp : Defines the entry point for the application. -// - -#include +#include #include #include #include @@ -12,11 +9,11 @@ using namespace std; void sendHeader(HANDLE h, string key) { - cout << "Sending header." << endl; + log("Sending header."); auto headerData{ serialize(Action { Type::Send, key }) }; DWORD bytesWritten{ 0 }; - const BOOL success = WriteFile( + const bool success = WriteFile( h, &headerData[0], headerData.size(), @@ -27,14 +24,20 @@ void sendHeader(HANDLE h, string key) { if (!success) { throw GetLastError(); } - cout << "Sent header." << endl; + log("Sent header."); } bool awaitSendSuccess(HANDLE h) { - std::vector buf(bufSize); + vector buf(bufSize); DWORD bytesRead; - const BOOL readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); // Use ReadFileEx for async + const bool readSuccess = ReadFile( + h, + buf.data(), + buf.size(), + &bytesRead, + nullptr + ); if (!readSuccess) { throw GetLastError(); } diff --git a/client.h b/client.h index 0807141..3d0d3c4 100644 --- a/client.h +++ b/client.h @@ -1,12 +1,8 @@ -// client.h : Include file for standard system include files, -// or project specific include files. - -#pragma once +#pragma once #include #include #include -#include #include "common.h" #include "serialization.h" @@ -36,7 +32,8 @@ void send(string key, T data) { throw "Couldn't connect."; } - // else server hasnt yet made another instance for additional clients so wait + // server hasnt yet made another instance for additional clients + // so wait then try again auto instanceReady{ WaitNamedPipe(pipeName, NMPWAIT_WAIT_FOREVER) }; if (!instanceReady) { cout << "Couldn't connect: 0x" << hex << GetLastError() << endl; @@ -45,8 +42,6 @@ void send(string key, T data) { } } - cout << "CreateFile " << key << endl; - DWORD mode{ PIPE_READMODE_MESSAGE }; auto modeSet = SetNamedPipeHandleState( h, // pipe handle @@ -57,23 +52,21 @@ void send(string key, T data) { throw "Couldn't put pipe into message mode."; } - cout << "Mode set " << key << endl; - sendHeader(h, key); sendData(h, data); const bool success = awaitSendSuccess(h); CloseHandle(h); - cout << "Send " << (success ? "success" : "fail") << endl; + log(string{ "Send " } + (success ? "success" : "fail")); } template T get(string key) { - cout << "Getting data for key " << key << endl; + log("Getting data for key " + key); auto headerData{ serialize(Action { Type::Get, key }) }; - std::vector buf(bufSize); + vector buf(bufSize); DWORD bytesRead; - const BOOL success = CallNamedPipe( + const bool success = CallNamedPipe( pipeName, &headerData[0], headerData.size(), @@ -85,18 +78,18 @@ T get(string key) { if (!success) { throw GetLastError(); } - cout << "Got data for key " << key << endl; + log("Got data for key " + key); auto data{ deserialize(string{buf.data(), bytesRead}) }; return data; } template void sendData(HANDLE h, T data) { - cout << "Sending data." << endl; + log("Sending data"); auto writeStr{ serialize(data) }; DWORD bytesWritten{ 0 }; - const BOOL success = WriteFile( + const bool success = WriteFile( h, &writeStr[0], writeStr.size(), @@ -105,8 +98,8 @@ void sendData(HANDLE h, T data) { ); if (!success) { - // throw GetLastError(); cout << "0x" << hex << GetLastError() << endl; + throw GetLastError(); } - cout << "Sent data." << endl; + log("Sent data"); } diff --git a/common.cpp b/common.cpp new file mode 100644 index 0000000..ae289d9 --- /dev/null +++ b/common.cpp @@ -0,0 +1,10 @@ +#include +#include +#include "common.h" + +std::mutex coutLock; + +void log(std::string msg) { + std::lock_guard lg{ coutLock }; + std::cout << msg << std::endl; +} \ No newline at end of file diff --git a/common.h b/common.h index 87f3ff3..7c5d38a 100644 --- a/common.h +++ b/common.h @@ -1,6 +1,9 @@ #pragma once #include +#include + +void log(std::string msg); constexpr LPCSTR pipeName{ R"(\\.\pipe\StreamBase)" }; constexpr DWORD bufSize{ 512 }; diff --git a/exampleAsyncClient.cpp b/exampleAsyncClient.cpp index f36b86f..f1bd465 100644 --- a/exampleAsyncClient.cpp +++ b/exampleAsyncClient.cpp @@ -7,55 +7,75 @@ using namespace std; int main() { - cout << "I am the async client." << endl; + log("I am the async client."); - const auto policy{ std::launch::async }; + const auto policy{ launch::async }; + + const string intKey{ "MyKeyAsync" }; + const string strKey{ "She" }; + const string customKey{ "CustomClassAsync" }; // sends - auto intSend{ std::async(policy, []() { - cout << "Sending int" << endl; - send("mykeyasync", 101); + auto intSend{ async(policy, [intKey]() { + log("Sending int"); + try { + send(intKey, 101); + } + catch (char e[]) { + // manually catch errors since exceptions only propagate on .get() + log(e); + terminate(); + } + log("Sent int"); }) }; - auto strSend{ std::async(policy, []() { - cout << "Sending string" << endl; + auto strSend{ async(policy, [strKey]() { + log("Sending string"); try { - send("she", string{ "ra" }); + send(strKey, string{ "Ra" }); } catch (char e[]) { // manually catch errors since exceptions only propagate on .get() - cout << e << endl; - std::terminate(); + log(e); + terminate(); } - cout << "Sent string" << endl; + log("Sent string"); }) }; - auto customSend{ std::async(policy, []() { - cout << "Sending custom class" << endl; + auto customSend{ async(policy, [customKey]() { + log("Sending custom class"); CustomClass custom{ 42, 81 }; custom.incrementA(); custom.incrementB(); - send("mycustomclassasync", custom); + try { + send(customKey, custom); + } + catch (char e[]) { + // manually catch errors since exceptions only propagate on .get() + log(e); + terminate(); + } + log("Sent custom class"); }) }; // gets - auto intGet{ std::async(policy, [&intSend]() { + auto intGet{ async(policy, [intKey, &intSend]() { intSend.get(); // await send success - cout << get("mykeyasync") << endl; + log(intKey + " -> " + to_string(get(intKey))); }) }; - auto strGet{ std::async(policy, [&strSend]() { + auto strGet{ async(policy, [strKey, &strSend]() { strSend.get(); // await send success - cout << get("she") << endl; + log(strKey + " -> " + get(strKey)); }) }; - auto customGet{ std::async(policy, [&customSend]() { + auto customGet{ async(policy, [customKey, &customSend]() { customSend.get(); // await send success - auto savedCustom{ get("mycustomclassasync") }; + auto savedCustom{ get(customKey) }; savedCustom.incrementA(); savedCustom.incrementB(); - cout << savedCustom.getA() << endl << savedCustom.getB() << endl; + log("A=" + to_string(savedCustom.getA()) + ", B=" + to_string(savedCustom.getB())); }) }; getchar(); // wait before closing diff --git a/serialization.h b/serialization.h index 896bd9d..26a9005 100644 --- a/serialization.h +++ b/serialization.h @@ -1,7 +1,7 @@ #pragma once #include -#include "cereal/archives/binary.hpp" // assuming we can use appropriate 3rd party libs +#include "cereal/archives/binary.hpp" #include "cereal/types/string.hpp" template @@ -12,7 +12,6 @@ std::string serialize(T data) { cereal::BinaryOutputArchive oarchive{ out }; oarchive(data); } - // writeStr.c_str() returns a const char * so would need to copy it to get a non-const * return out.str(); } @@ -24,6 +23,5 @@ T deserialize(const std::string& serializedData) { cereal::BinaryInputArchive iarchive{ in }; iarchive(data); } - // writeStr.c_str() returns a const char * so would need to copy it to get a non-const * return data; } \ No newline at end of file diff --git a/server.cpp b/server.cpp index 1213d29..b4fd485 100644 --- a/server.cpp +++ b/server.cpp @@ -1,7 +1,4 @@ -// server.cpp : Defines the entry point for the application. -// - -#include +#include #include #include #include @@ -15,7 +12,7 @@ using namespace std; int main() { - cout << "I am the server." << endl; + log("I am the server"); Store store; @@ -35,18 +32,18 @@ int main() throw GetLastError(); } - const BOOL success = ConnectNamedPipe(h, nullptr); + const bool success = ConnectNamedPipe(h, nullptr); if (!success) { cout << "0x" << hex << GetLastError() << endl; throw GetLastError(); } - cout << "Connected." << endl; + log("Connected to a client"); // create a separate thread so can handle clients asynchronously thread{[h, &store]() { read(h, store); FlushFileBuffers(h); - const BOOL dSuccess = DisconnectNamedPipe(h); + const bool dSuccess = DisconnectNamedPipe(h); if (!dSuccess) { cout << "0x" << hex << GetLastError() << endl; throw GetLastError(); @@ -60,24 +57,24 @@ void read(HANDLE h, Store &store) { const auto action{ readHeader(h) }; switch (action.type) { case Type::Send: { - cout << "Saving data for key " << action.key << endl; + log("Saving data for key " + action.key); { // only one thread can write at a time - std::lock_guard lg{ store.lock }; + lock_guard lg{ store.lock }; store.records[action.key] = readData(h); } - cout << "Saved data for key " << action.key << endl; + log("Saved data for key " + action.key); saveSuccess(h); break; } case Type::Get: { - cout << "Getting data for key " << action.key << endl; + log("Getting data for key " + action.key); { // any number of threads can read - std::shared_lock lg{ store.lock }; + shared_lock lg{ store.lock }; returnData(h, store.records.at(action.key)); } - cout << "Got data for key " << action.key << endl; + log("Got data for key " + action.key); break; } } @@ -85,7 +82,7 @@ void read(HANDLE h, Store &store) { void saveSuccess(HANDLE h) { DWORD bytesWritten{ 0 }; - const BOOL success = WriteFile( + const bool success = WriteFile( h, &saveSuccessStatus[0], saveSuccessStatus.size(), @@ -101,7 +98,7 @@ void saveSuccess(HANDLE h) { void returnData(HANDLE h, string data) { DWORD bytesWritten{ 0 }; - const BOOL success = WriteFile( + const bool success = WriteFile( h, &data[0], data.size(), @@ -116,10 +113,10 @@ void returnData(HANDLE h, string data) { } string readData(HANDLE h) { - std::vector buf(bufSize); + vector buf(bufSize); DWORD bytesRead; - const BOOL readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); // Use ReadFileEx for async + const bool readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); if (!readSuccess) { throw GetLastError(); } @@ -128,10 +125,10 @@ string readData(HANDLE h) { } Action readHeader(HANDLE h) { - std::vector buf(bufSize); + vector buf(bufSize); DWORD bytesRead; - const BOOL readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); // Use ReadFileEx for async + const bool readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); if (!readSuccess) { throw GetLastError(); } diff --git a/server.h b/server.h index 390e7aa..d740f37 100644 --- a/server.h +++ b/server.h @@ -1,7 +1,4 @@ -// server.h : Include file for standard system include files, -// or project specific include files. - -#pragma once +#pragma once #include #include