forked from freehere107/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 1
/
enqueue_test.go
122 lines (91 loc) · 3.41 KB
/
enqueue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package workers
import (
"encoding/json"
"github.com/customerio/gospec"
. "github.com/customerio/gospec"
"github.com/garyburd/redigo/redis"
)
func EnqueueSpec(c gospec.Context) {
was := Config.Namespace
Config.Namespace = "prod:"
c.Specify("Enqueue", func() {
conn := Config.Pool.Get()
defer conn.Close()
c.Specify("makes the queue available", func() {
Enqueue("enqueue1", "Add", []int{1, 2})
found, _ := redis.Bool(conn.Do("sismember", "prod:queues", "enqueue1"))
c.Expect(found, IsTrue)
})
c.Specify("adds a job to the queue", func() {
nb, _ := redis.Int(conn.Do("llen", "prod:queue:enqueue2"))
c.Expect(nb, Equals, 0)
Enqueue("enqueue2", "Add", []int{1, 2})
nb, _ = redis.Int(conn.Do("llen", "prod:queue:enqueue2"))
c.Expect(nb, Equals, 1)
})
c.Specify("saves the arguments", func() {
Enqueue("enqueue3", "Compare", []string{"foo", "bar"})
bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue3"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")
args := result["args"].([]interface{})
c.Expect(len(args), Equals, 2)
c.Expect(args[0], Equals, "foo")
c.Expect(args[1], Equals, "bar")
})
c.Specify("has a jid", func() {
Enqueue("enqueue4", "Compare", []string{"foo", "bar"})
bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue4"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")
jid := result["jid"].(string)
c.Expect(len(jid), Equals, 24)
})
c.Specify("has enqueued_at that is close to now", func() {
Enqueue("enqueue5", "Compare", []string{"foo", "bar"})
bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue5"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")
ea := result["enqueued_at"].(float64)
c.Expect(ea, Not(Equals), 0)
c.Expect(ea, IsWithin(0.1), nowToSecondsWithNanoPrecision())
})
c.Specify("has retry and retry_count when set", func() {
EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryCount: 13, Retry: true})
bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue6"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")
retry := result["retry"].(bool)
c.Expect(retry, Equals, true)
retryCount := int(result["retry_count"].(float64))
c.Expect(retryCount, Equals, 13)
})
})
c.Specify("EnqueueIn", func() {
scheduleQueue := "prod:" + SCHEDULED_JOBS_KEY
conn := Config.Pool.Get()
defer conn.Close()
c.Specify("has added a job in the scheduled queue", func() {
_, err := EnqueueIn("enqueuein1", "Compare", 10, map[string]interface{}{"foo": "bar"})
c.Expect(err, Equals, nil)
scheduledCount, _ := redis.Int(conn.Do("zcard", scheduleQueue))
c.Expect(scheduledCount, Equals, 1)
conn.Do("del", scheduleQueue)
})
c.Specify("has the correct 'queue'", func() {
_, err := EnqueueIn("enqueuein2", "Compare", 10, map[string]interface{}{"foo": "bar"})
c.Expect(err, Equals, nil)
var data EnqueueData
elem, err := conn.Do("zrange", scheduleQueue, 0, -1)
bytes, err := redis.Bytes(elem.([]interface{})[0], err)
json.Unmarshal(bytes, &data)
c.Expect(data.Queue, Equals, "enqueuein2")
conn.Do("del", scheduleQueue)
})
})
Config.Namespace = was
}