forked from torch/threads
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.lua
76 lines (62 loc) · 1.78 KB
/
queue.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
local clib = require 'libthreads'
local unpack = unpack or table.unpack
local Queue = clib.Queue
function Queue:addjob(callback, ...)
local args = {...}
local status, msg = pcall(
function()
self.mutex:lock()
while self.isfull == 1 do
self.notfull:wait(self.mutex)
end
local serialize = require(self.serialize)
self:callback(self.tail, serialize.save(callback))
self:arg(self.tail, serialize.save(args))
self.tail = self.tail + 1
if self.tail == self.size then
self.tail = 0
end
if self.tail == self.head then
self.isfull = 1
end
self.isempty = 0
self.mutex:unlock()
self.notempty:signal()
end
)
if not status then
print(string.format('FATAL THREAD PANIC: (addjob) %s', msg))
os.exit(-1)
end
end
function Queue:dojob()
local status, msg = pcall(
function()
local serialize = require(self.serialize)
self.mutex:lock()
while self.isempty == 1 do
self.notempty:wait(self.mutex)
end
local callback = serialize.load(self:callback(self.head))
local args = serialize.load(self:arg(self.head))
self.head = self.head + 1
if self.head == self.size then
self.head = 0
end
if self.head == self.tail then
self.isempty = 1
end
self.isfull = 0
self.mutex:unlock()
self.notfull:signal()
local res = {callback(unpack(args))} -- note: args is a table for sure
return res
end
)
if not status then
print(string.format('FATAL THREAD PANIC: (dojob) %s', msg))
os.exit(-1)
end
return unpack(msg)
end
return Queue