-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththread-sync-example.cpp
130 lines (104 loc) · 3.32 KB
/
thread-sync-example.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//
// Created by kobi on 1/10/23.
//
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>
class Event {
public:
Event() = default;
Event(const Event&) = delete;
Event(Event&&) = delete;
Event& operator=(const Event&) = delete;
Event& operator=(Event&&) = delete;
class Awaiter;
Awaiter operator co_await() const noexcept;
void notify() noexcept;
private:
friend class Awaiter;
mutable std::atomic<void*> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};
};
class Event::Awaiter {
public:
Awaiter(const Event& eve): event(eve) {}
bool await_ready() const;
bool await_suspend(std::coroutine_handle<> corHandle) noexcept;
void await_resume() noexcept {}
private:
friend class Event;
const Event& event;
std::coroutine_handle<> coroutineHandle;
};
bool Event::Awaiter::await_ready() const {
// allow at most one waiter
if (event.suspendedWaiter.load() != nullptr) {
throw std::runtime_error("More than one waiter is not valid");
}
// event.notified == false; suspends the coroutine
// event.notified == true; the coroutine is executed like a normal function
return event.notified;
}
bool Event::Awaiter::await_suspend(std::coroutine_handle<>corHandle) noexcept {
coroutineHandle = corHandle;
if (event.notified) return false;
// store the waiter for later notification
event.suspendedWaiter.store(this);
return true;
}
void Event::notify() noexcept {
notified = true;
// try to load the waiter
auto* waiter = static_cast<Awaiter*>(suspendedWaiter.load());
// check if a waiter is available
if (waiter != nullptr) {
// resume the coroutine => await_resume
waiter->coroutineHandle.resume();
}
}
Event::Awaiter Event::operator co_await() const noexcept {
return Awaiter{ *this };
}
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task receiver(Event& event) {
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification! " << '\n';
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << '\n';
}
using namespace std::chrono_literals;
int main() {
std::cout << '\n';
std::cout << "Notification before waiting" << '\n';
Event event1{};
auto senderThread1 = std::thread([&event1]{ event1.notify(); }); // Notification
auto receiverThread1 = std::thread(receiver, std::ref(event1));
receiverThread1.join();
senderThread1.join();
std::cout << '\n';
std::cout << "Notification after 2 seconds waiting" << '\n';
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));
auto senderThread2 = std::thread([&event2] {
std::this_thread::sleep_for(2s);
event2.notify(); // Notification
});
receiverThread2.join();
senderThread2.join();
std::cout << '\n';
}