-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconcurrent_queue.hpp
160 lines (143 loc) · 4.08 KB
/
concurrent_queue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/*
* Copyright Stephan Hofmockel 2012.
* Distributed under the Boost Software License, Version 1.0.
* See accompanying file LICENSE_1_0.txt or copy at
* http://www.boost.org/LICENSE_1_0.txt
*/
#ifndef CONNCURENT_QUEUE_HPP
#define CONNCURENT_QUEUE_HPP
#include <deque>
#include <exception>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread_time.hpp>
class QueueEmpty : public std::exception {};
class QueueFull : public std::exception {};
class NoMoreTasks : public std::exception {};
template<typename T>
class ConcurrentQueue {
private:
boost::mutex mutex;
std::deque<T> queue;
boost::condition_variable empty_cond;
boost::condition_variable full_cond;
boost::condition_variable all_tasks_done_cond;
size_t maxsize;
boost::uint64_t unfinished_tasks;
public:
ConcurrentQueue(long maxsize=0);
size_t size();
size_t get_maxsize(){return this->maxsize;};
/* block=true and timeout=0 => same as block= false */
/* timeout=0 => timeout is *not* considered */
/* Timeout is in milliseconds */
void put(T&, bool block=true, boost::uint64_t timeout=0);
void pop(T&, bool block=true, boost::uint64_t timeout=0);
void task_done();
void join();
};
template<typename T>
ConcurrentQueue<T>::ConcurrentQueue(long maxsize)
{
if (maxsize < 0) {
this->maxsize = 0;
}
else {
this->maxsize = maxsize;
}
this->unfinished_tasks = 0;
}
template<typename T>
size_t
ConcurrentQueue<T>::size()
{
boost::lock_guard<boost::mutex> raii_lock(this->mutex);
return this->queue.size();
}
template<typename T>
void
ConcurrentQueue<T>::put(T &item, bool block, boost::uint64_t timeout)
{
boost::mutex::scoped_lock lock(this->mutex);
if ((this->queue.size() < this->maxsize) or (this->maxsize == 0)) {
/* Fall through the end of method */
}
else if (not block) {
throw QueueFull();
}
else {
if (timeout > 0) {
boost::system_time abs_timeout = boost::get_system_time();
abs_timeout += boost::posix_time::milliseconds(timeout);
while (this->queue.size() == this->maxsize) {
if (not this->full_cond.timed_wait(lock, abs_timeout)) {
throw QueueFull();
}
}
}
else {
while (this->queue.size() == this->maxsize) {
this->full_cond.wait(lock);
}
}
}
this->queue.push_back(item);
this->unfinished_tasks += 1;
lock.unlock();
this->empty_cond.notify_one();
}
template<typename T>
void
ConcurrentQueue<T>::pop(T & item, bool block, boost::uint64_t timeout)
{
boost::mutex::scoped_lock lock(this->mutex);
if (not this->queue.empty()) {
/* Fall through the end of method */
}
else if (not block){
throw QueueEmpty();
}
else {
if (timeout > 0) {
boost::system_time abs_timeout = boost::get_system_time();
abs_timeout += boost::posix_time::milliseconds(timeout);
while (this->queue.empty()) {
if (not this->empty_cond.timed_wait(lock, abs_timeout)) {
throw QueueEmpty();
}
}
}
else {
while (this->queue.empty()) {
this->empty_cond.wait(lock);
}
}
}
item = this->queue.front();
this->queue.pop_front();
lock.unlock();
this->full_cond.notify_one();
}
template<typename T>
void
ConcurrentQueue<T>::task_done()
{
boost::mutex::scoped_lock lock(this->mutex);
if (this->unfinished_tasks == 0) {
throw NoMoreTasks();
}
if (--this->unfinished_tasks == 0) {
this->all_tasks_done_cond.notify_all();
}
}
template<typename T>
void
ConcurrentQueue<T>::join()
{
boost::mutex::scoped_lock lock(this->mutex);
while (this->unfinished_tasks) {
this->all_tasks_done_cond.wait(lock);
}
}
#endif