-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_queue.hpp
123 lines (104 loc) · 3.27 KB
/
concurrent_queue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#ifndef LIME62_CONCURRENT_QUEUE_H
#define LIME62_CONCURRENT_QUEUE_H
#include <queue>
#include <atomic>
#include <mutex>
#include <condition_variable>
/*!
@brief namespace for JongYoon Lim
@see https://github.com/lime62
*/
namespace lime62 {
/*!
@brief A simple C++11 Concurrent Queue based on std::queue.
Supports waiting operations for retrieving an element when it's empty.
It's interrupted by calling interrupt().
@ref std::queue class
*/
template<typename T, typename Container=std::deque<T> >
class concurrent_queue {
public:
typedef typename std::queue<T>::size_type size_type;
typedef typename std::queue<T>::reference reference;
typedef typename std::queue<T>::const_reference const_reference;
~concurrent_queue() {
interrupt();
}
void interrupt() {
interrupted_ = true;
condition_variable_.notify_one();
}
void push(T const &e) {
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(e);
}
condition_variable_.notify_one();
}
template<typename... _Args>
void emplace(_Args&&... __args) {
{
std::unique_lock<std::mutex> lock(this->mutex_);
queue_.emplace(std::forward<_Args>(__args)...);
}
condition_variable_.notify_one();
}
bool empty() {
std::unique_lock<std::mutex> lock(this->mutex_);
return queue_.empty();
}
void pop() {
std::unique_lock<std::mutex> lock(this->mutex_);
if (!queue_.empty())
queue_.pop();
}
void front_pop(T& ret) {
std::unique_lock<std::mutex> lock(this->mutex_);
wait(lock);
ret = queue_.front();
queue_.pop();
}
size_type size() const {
std::unique_lock<std::mutex> lock(this->mutex_);
return queue_.size();
}
reference front() {
std::unique_lock<std::mutex> lock(this->mutex_);
wait(lock);
return queue_.front();
}
const_reference front() const {
std::unique_lock<std::mutex> lock(this->mutex_);
wait(lock);
return queue_.front();
}
reference back() {
std::unique_lock<std::mutex> lock(this->mutex_);
wait(lock);
return queue_.back();
}
const_reference back() const {
std::unique_lock<std::mutex> lock(this->mutex_);
wait(lock);
return queue_.back();
}
void swap(concurrent_queue& q) {
throw std::exception("Not supported");
}
protected:
std::queue<T, Container> queue_;
std::mutex mutex_;
std::condition_variable condition_variable_;
std::atomic_bool interrupted_;
private:
void wait(std::unique_lock<std::mutex>& lock) {
interrupted_ = false;
while (queue_.empty()) {
condition_variable_.wait(lock);
if (interrupted_)
throw std::runtime_error("Interrupted");
}
}
};
}
#endif //LIME62_CONCURRENT_QUEUE_H