Skip to content

Commit

Permalink
Clean up and fix race condition
Browse files Browse the repository at this point in the history
Clients tried to connect before server opened another pipe causing a silent failure since exceptions inside futures are only propagated on `get`
  • Loading branch information
mezzode committed Nov 12, 2019
1 parent 72c2780 commit b90532a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 18 deletions.
41 changes: 30 additions & 11 deletions client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,36 @@ bool awaitSendSuccess(HANDLE h);

template <class T>
void send(string key, T data) {
auto h = CreateFile(
pipeName,
GENERIC_READ | GENERIC_WRITE,
0,
nullptr,
OPEN_EXISTING,
0,
nullptr
);
if (h == INVALID_HANDLE_VALUE) {
throw "Couldn't connect.";
HANDLE h{ INVALID_HANDLE_VALUE };
while (h == INVALID_HANDLE_VALUE) {
h = CreateFile(
pipeName,
GENERIC_READ | GENERIC_WRITE,
0,
nullptr,
OPEN_EXISTING,
0,
nullptr
);

if (h == INVALID_HANDLE_VALUE) {
auto e = GetLastError();
if (e != ERROR_PIPE_BUSY) {
cout << "0x" << hex << GetLastError() << endl;
throw "Couldn't connect.";
}

// else server hasnt yet made another instance for additional clients so wait
auto instanceReady{ WaitNamedPipe(pipeName, NMPWAIT_WAIT_FOREVER) };
if (!instanceReady) {
cout << "Couldn't connect: 0x" << hex << GetLastError() << endl;
throw "Couldn't connect.";
}
}
}

cout << "CreateFile " << key << endl;

DWORD mode{ PIPE_READMODE_MESSAGE };
auto modeSet = SetNamedPipeHandleState(
h, // pipe handle
Expand All @@ -40,6 +57,8 @@ void send(string key, T data) {
throw "Couldn't put pipe into message mode.";
}

cout << "Mode set " << key << endl;

sendHeader(h, key);
sendData(h, data);
const bool success = awaitSendSuccess(h);
Expand Down
12 changes: 11 additions & 1 deletion exampleAsyncClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,24 @@ int main()
// sends

auto intSend{ std::async(policy, []() {
cout << "Sending int" << endl;
send("mykeyasync", 101);
}) };

auto strSend{ std::async(policy, []() {
send("she", string{ "ra" });
cout << "Sending string" << endl;
try {
send("she", string{ "ra" });
} catch (char e[]) {
// manually catch errors since exceptions only propagate on .get()
cout << e << endl;
std::terminate();
}
cout << "Sent string" << endl;
}) };

auto customSend{ std::async(policy, []() {
cout << "Sending custom class" << endl;
CustomClass custom{ 42, 81 };
custom.incrementA();
custom.incrementB();
Expand Down
19 changes: 14 additions & 5 deletions server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//

#include <windows.h>
#include <shared_mutex>
#include <thread>
#include <sstream>
#include <unordered_map>
Expand All @@ -15,9 +16,8 @@ using namespace std;
int main()
{
cout << "I am the server." << endl;

// TODO: add mutex. stl allows multiple threads reading and one writing
auto store = unordered_map<string, string>{};

Store store;

while (TRUE) {
const DWORD pipeMode{ PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT };
Expand Down Expand Up @@ -45,6 +45,7 @@ int main()
// create a separate thread so can handle clients asynchronously
thread{[h, &store]() {
read(h, store);
FlushFileBuffers(h);
const BOOL dSuccess = DisconnectNamedPipe(h);
if (!dSuccess) {
cout << "0x" << hex << GetLastError() << endl;
Expand All @@ -60,14 +61,22 @@ void read(HANDLE h, Store &store) {
switch (action.type) {
case Type::Send: {
cout << "Saving data for key " << action.key << endl;
store[action.key] = readData(h);
{
// only one thread can write at a time
std::lock_guard<std::shared_mutex> lg{ store.lock };
store.records[action.key] = readData(h);
}
cout << "Saved data for key " << action.key << endl;
saveSuccess(h);
break;
}
case Type::Get: {
cout << "Getting data for key " << action.key << endl;
returnData(h, store.at(action.key));
{
// any number of threads can read
std::shared_lock<std::shared_mutex> lg{ store.lock };
returnData(h, store.records.at(action.key));
}
cout << "Got data for key " << action.key << endl;
break;
}
Expand Down
6 changes: 5 additions & 1 deletion server.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

using std::string;

using Store = std::unordered_map<string, string>;
struct Store {
// stl allows multiple threads reading and one writing so using a shared mutex
std::unordered_map<string, string> records;
std::shared_mutex lock;
};

void read(HANDLE h, Store &store);
Action readHeader(HANDLE h);
Expand Down

0 comments on commit b90532a

Please sign in to comment.