forked from cksystemsgroup/scal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flatcombining_queue.h
150 lines (121 loc) · 3.43 KB
/
flatcombining_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright (c) 2012-2013, the Scal Project Authors. All rights reserved.
// Please see the AUTHORS file for details. Use of this source code is governed
// by a BSD license that can be found in the LICENSE file.
// Implementing the queue from:
//
// D. Hendler, I. Incze, N. Shavit, and M. Tzafrir. Flat combining and the
// synchronization-parallelism tradeoff. In Proceedings of the 22nd ACM
// symposium on Parallelism in algorithms and architectures, SPAA ’10, pages
// 355–364, New York, NY, USA, 2010. ACM.
#ifndef SCAL_DATASTRUCTURES_FLATCOMBINING_QUEUE_H_
#define SCAL_DATASTRUCTURES_FLATCOMBINING_QUEUE_H_
#include "datastructures/queue.h"
#include "datastructures/single_list.h"
#include "util/allocation.h"
#include "util/lock.h"
#include "util/threadlocals.h"
namespace scal {
namespace detail {
enum Opcode {
Done = 0,
Enqueue = 1,
Dequeue = 2
};
template<typename T>
struct Operation {
Operation() : opcode(Done) {}
Opcode opcode;
T data;
#define REST (128 - sizeof(T) - sizeof(Opcode))
uint8_t pad1[REST];
#undef REST
void* operator new(size_t size) {
return MallocAligned(size, 128);
}
void* operator new[](size_t size) {
return MallocAligned(size, 128);
}
};
} // namespace detail
template<typename T>
class FlatCombiningQueue : public Queue<T> {
public:
explicit FlatCombiningQueue(uint64_t num_ops);
bool enqueue(T item);
bool dequeue(T *item);
private:
typedef detail::Operation<T> Operation;
typedef detail::Opcode Opcode;
void ScanCombineApply();
inline void SetOp(uint64_t index, Opcode opcode, T data) {
operations_[index].data = data;
operations_[index].opcode = opcode;
}
SpinLock<64> global_lock_;
uint64_t num_ops_;
SingleList<T> queue_;
volatile Operation* operations_;
};
template<typename T>
FlatCombiningQueue<T>::FlatCombiningQueue(uint64_t num_ops)
: num_ops_(num_ops),
operations_(new Operation[num_ops]) {
}
template<typename T>
void FlatCombiningQueue<T>::ScanCombineApply() {
for (uint64_t i = 0; i < num_ops_; i++) {
if (operations_[i].opcode == Opcode::Enqueue) {
queue_.enqueue(operations_[i].data);
SetOp(i, Opcode::Done, (T)NULL);
} else if (operations_[i].opcode == Opcode::Dequeue) {
if (!queue_.is_empty()) {
T item;
queue_.dequeue(&item);
SetOp(i, Opcode::Done, item);
} else {
SetOp(i, Opcode::Done, (T)NULL);
}
}
}
}
template<typename T>
bool FlatCombiningQueue<T>::enqueue(T item) {
const uint64_t thread_id = scal::ThreadContext::get().thread_id();
SetOp(thread_id, Opcode::Enqueue, item);
while (true) {
if (!global_lock_.TryLock()) {
if (operations_[thread_id].opcode == Opcode::Done) {
return true;
}
} else {
ScanCombineApply();
global_lock_.Unlock();
return true;
}
}
}
template<typename T>
bool FlatCombiningQueue<T>::dequeue(T *item) {
const uint64_t thread_id = scal::ThreadContext::get().thread_id();
SetOp(thread_id, Opcode::Dequeue, (T)NULL);
while (true) {
if (!global_lock_.TryLock()) {
if (operations_[thread_id].opcode == Opcode::Done) {
break;
}
} else {
ScanCombineApply();
global_lock_.Unlock();
break;
}
}
*item = operations_[thread_id].data;
SetOp(thread_id, Opcode::Done, (T)NULL);
if (*item == (T)NULL) {
return false;
} else {
return true;
}
}
} // namespace scal
#endif // SCAL_DATASTRUCTURES_FLATCOMBINING_QUEUE_H_