Skip to content

Commit

Permalink
Clean up and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
mezzode committed Nov 12, 2019
1 parent b90532a commit dc874cc
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 84 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ CTestTestfile.cmake
_deps
.vs
out
CMakeSettings.json
10 changes: 4 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# CMakeList.txt : CMake project for StreamBase, include source and define
# project specific logic here.
#
project(StreamBase)
cmake_minimum_required (VERSION 3.8)

# Add source to this project's executable.
add_executable (server "server.cpp" "server.h")
add_executable (exampleClient "exampleClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h")
add_executable (exampleAsyncClient "exampleAsyncClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h")
add_executable (server "server.cpp" "server.h" "common.cpp" "common.h")
add_executable (exampleClient "exampleClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h" "common.cpp" "common.h")
add_executable (exampleAsyncClient "exampleAsyncClient.cpp" "CustomClass.cpp" "CustomClass.h" "client.cpp" "client.h" "common.cpp" "common.h")
28 changes: 28 additions & 0 deletions CMakeSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"configurations": [
{
"name": "x64-Debug",
"generator": "Ninja",
"configurationType": "Debug",
"inheritEnvironments": [ "msvc_x64_x64" ],
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "",
"buildCommandArgs": "-v",
"ctestCommandArgs": "",
"variables": []
},
{
"name": "x64-Release",
"generator": "Ninja",
"configurationType": "Release",
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "",
"buildCommandArgs": "-v",
"ctestCommandArgs": "",
"inheritEnvironments": [ "msvc_x64_x64" ],
"variables": []
}
]
}
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,32 @@
StreamBase
# StreamBase

Clients communicate with the server by initially sending a header message containing an Action which specifies whether they want to do a save or a get and what key to use.

## Send
On receiving a send action, the server will expect and read a second message containing the binary archive of the data.
After inserting the data into the store, the server will send a message back to the client notifying it the save has been completed.

## Get
On receiving a get action, the server will retrieve the binary archive for the given key from the store and send it back in a message to the client.
Note that the client-side `get()` function requires the type of the data being returned to be specified.

## Data Store
The server stores data in a map from the given key to a binary archive of the provided object.
The map is guarded by a shared mutex to ensure thread-safe access.

## Serialization

This implementation of StreamBase depends on `cereal` to serialize data.
This means extra code may be required for certain types of data
e.g. for custom classes a `serialize` method that is a `friend` of the class may be required in order to serialize its attributes, as C++ lacks reflection.

## Assumptions

For simplicity it is assumed data being sent fits within a single message, and that cases will conform to the spec meaning there is currently no error handling
e.g. attempting to retrieve data which has not been saved is unhandled, clients expect the server to be running, etc.

## Multithreading

This implementation of StreamBase supports async via multithreading. The server uses a different thread and pipe instance for each client connection.

Multithreaded code uses a custom `log()` function as `cout` is not thread-safe.
21 changes: 12 additions & 9 deletions client.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
// client.cpp : Defines the entry point for the application.
//

#include <windows.h>
#include <windows.h>
#include <sstream>
#include <string>
#include <vector>
Expand All @@ -12,11 +9,11 @@
using namespace std;

void sendHeader(HANDLE h, string key) {
cout << "Sending header." << endl;
log("Sending header.");

auto headerData{ serialize(Action { Type::Send, key }) };
DWORD bytesWritten{ 0 };
const BOOL success = WriteFile(
const bool success = WriteFile(
h,
&headerData[0],
headerData.size(),
Expand All @@ -27,14 +24,20 @@ void sendHeader(HANDLE h, string key) {
if (!success) {
throw GetLastError();
}
cout << "Sent header." << endl;
log("Sent header.");
}

bool awaitSendSuccess(HANDLE h) {
std::vector<char> buf(bufSize);
vector<char> buf(bufSize);

DWORD bytesRead;
const BOOL readSuccess = ReadFile(h, buf.data(), buf.size(), &bytesRead, nullptr); // Use ReadFileEx for async
const bool readSuccess = ReadFile(
h,
buf.data(),
buf.size(),
&bytesRead,
nullptr
);
if (!readSuccess) {
throw GetLastError();
}
Expand Down
31 changes: 12 additions & 19 deletions client.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
// client.h : Include file for standard system include files,
// or project specific include files.

#pragma once
#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <windows.h>
#include "common.h"
#include "serialization.h"

Expand Down Expand Up @@ -36,7 +32,8 @@ void send(string key, T data) {
throw "Couldn't connect.";
}

// else server hasnt yet made another instance for additional clients so wait
// server hasnt yet made another instance for additional clients
// so wait then try again
auto instanceReady{ WaitNamedPipe(pipeName, NMPWAIT_WAIT_FOREVER) };
if (!instanceReady) {
cout << "Couldn't connect: 0x" << hex << GetLastError() << endl;
Expand All @@ -45,8 +42,6 @@ void send(string key, T data) {
}
}

cout << "CreateFile " << key << endl;

DWORD mode{ PIPE_READMODE_MESSAGE };
auto modeSet = SetNamedPipeHandleState(
h, // pipe handle
Expand All @@ -57,23 +52,21 @@ void send(string key, T data) {
throw "Couldn't put pipe into message mode.";
}

cout << "Mode set " << key << endl;

sendHeader(h, key);
sendData(h, data);
const bool success = awaitSendSuccess(h);

CloseHandle(h);
cout << "Send " << (success ? "success" : "fail") << endl;
log(string{ "Send " } + (success ? "success" : "fail"));
}

template<class T>
T get(string key) {
cout << "Getting data for key " << key << endl;
log("Getting data for key " + key);
auto headerData{ serialize(Action { Type::Get, key }) };
std::vector<char> buf(bufSize);
vector<char> buf(bufSize);
DWORD bytesRead;
const BOOL success = CallNamedPipe(
const bool success = CallNamedPipe(
pipeName,
&headerData[0],
headerData.size(),
Expand All @@ -85,18 +78,18 @@ T get(string key) {
if (!success) {
throw GetLastError();
}
cout << "Got data for key " << key << endl;
log("Got data for key " + key);
auto data{ deserialize<T>(string{buf.data(), bytesRead}) };
return data;
}

template<class T>
void sendData(HANDLE h, T data) {
cout << "Sending data." << endl;
log("Sending data");
auto writeStr{ serialize(data) };

DWORD bytesWritten{ 0 };
const BOOL success = WriteFile(
const bool success = WriteFile(
h,
&writeStr[0],
writeStr.size(),
Expand All @@ -105,8 +98,8 @@ void sendData(HANDLE h, T data) {
);

if (!success) {
// throw GetLastError();
cout << "0x" << hex << GetLastError() << endl;
throw GetLastError();
}
cout << "Sent data." << endl;
log("Sent data");
}
10 changes: 10 additions & 0 deletions common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include <iostream>
#include <mutex>
#include "common.h"

std::mutex coutLock;

void log(std::string msg) {
std::lock_guard<std::mutex> lg{ coutLock };
std::cout << msg << std::endl;
}
3 changes: 3 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <string>
#include <windows.h>

void log(std::string msg);

constexpr LPCSTR pipeName{ R"(\\.\pipe\StreamBase)" };
constexpr DWORD bufSize{ 512 };
Expand Down
62 changes: 41 additions & 21 deletions exampleAsyncClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,75 @@ using namespace std;

int main()
{
cout << "I am the async client." << endl;
log("I am the async client.");

const auto policy{ std::launch::async };
const auto policy{ launch::async };

const string intKey{ "MyKeyAsync" };
const string strKey{ "She" };
const string customKey{ "CustomClassAsync" };

// sends

auto intSend{ std::async(policy, []() {
cout << "Sending int" << endl;
send("mykeyasync", 101);
auto intSend{ async(policy, [intKey]() {
log("Sending int");
try {
send(intKey, 101);
}
catch (char e[]) {
// manually catch errors since exceptions only propagate on .get()
log(e);
terminate();
}
log("Sent int");
}) };

auto strSend{ std::async(policy, []() {
cout << "Sending string" << endl;
auto strSend{ async(policy, [strKey]() {
log("Sending string");
try {
send("she", string{ "ra" });
send(strKey, string{ "Ra" });
} catch (char e[]) {
// manually catch errors since exceptions only propagate on .get()
cout << e << endl;
std::terminate();
log(e);
terminate();
}
cout << "Sent string" << endl;
log("Sent string");
}) };

auto customSend{ std::async(policy, []() {
cout << "Sending custom class" << endl;
auto customSend{ async(policy, [customKey]() {
log("Sending custom class");
CustomClass custom{ 42, 81 };
custom.incrementA();
custom.incrementB();
send("mycustomclassasync", custom);
try {
send(customKey, custom);
}
catch (char e[]) {
// manually catch errors since exceptions only propagate on .get()
log(e);
terminate();
}
log("Sent custom class");
}) };

// gets

auto intGet{ std::async(policy, [&intSend]() {
auto intGet{ async(policy, [intKey, &intSend]() {
intSend.get(); // await send success
cout << get<int>("mykeyasync") << endl;
log(intKey + " -> " + to_string(get<int>(intKey)));
}) };

auto strGet{ std::async(policy, [&strSend]() {
auto strGet{ async(policy, [strKey, &strSend]() {
strSend.get(); // await send success
cout << get<string>("she") << endl;
log(strKey + " -> " + get<string>(strKey));
}) };

auto customGet{ std::async(policy, [&customSend]() {
auto customGet{ async(policy, [customKey, &customSend]() {
customSend.get(); // await send success
auto savedCustom{ get<CustomClass>("mycustomclassasync") };
auto savedCustom{ get<CustomClass>(customKey) };
savedCustom.incrementA();
savedCustom.incrementB();
cout << savedCustom.getA() << endl << savedCustom.getB() << endl;
log("A=" + to_string(savedCustom.getA()) + ", B=" + to_string(savedCustom.getB()));
}) };

getchar(); // wait before closing
Expand Down
4 changes: 1 addition & 3 deletions serialization.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <string>
#include "cereal/archives/binary.hpp" // assuming we can use appropriate 3rd party libs
#include "cereal/archives/binary.hpp"
#include "cereal/types/string.hpp"

template<class T>
Expand All @@ -12,7 +12,6 @@ std::string serialize(T data) {
cereal::BinaryOutputArchive oarchive{ out };
oarchive(data);
}
// writeStr.c_str() returns a const char * so would need to copy it to get a non-const *
return out.str();
}

Expand All @@ -24,6 +23,5 @@ T deserialize(const std::string& serializedData) {
cereal::BinaryInputArchive iarchive{ in };
iarchive(data);
}
// writeStr.c_str() returns a const char * so would need to copy it to get a non-const *
return data;
}
Loading

0 comments on commit dc874cc

Please sign in to comment.