This repository has been archived by the owner on Mar 9, 2023. It is now read-only.
forked from ruby-amqp/march_hare
-
Notifications
You must be signed in to change notification settings - Fork 0
/
basic_cancel_spec.rb
125 lines (100 loc) · 2.94 KB
/
basic_cancel_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
RSpec.describe 'A consumer' do
let(:connection) { MarchHare.connect }
after :each do
connection.close
end
context "that does not block the caller" do
it 'receives messages until cancelled' do
x = connection.create_channel.default_exchange
q = connection.create_channel.queue("", :exclusive => true)
messages = []
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
consumer = q.subscribe do |headers, message|
messages << message
sleep 0.1
end
consumer_exited = true
end
publisher_thread = Thread.new do
20.times do
x.publish('hello world', :routing_key => q.name)
end
end
sleep 0.2
consumer.cancel
consumer_thread.join
publisher_thread.join
expect(messages).not_to be_empty
expect(consumer_exited).to eq(true)
end
end
context "that DOES block the caller" do
it 'receives messages until cancelled' do
x = connection.create_channel.default_exchange
q = connection.create_channel.queue("", :exclusive => true)
messages = []
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
consumer = q.build_consumer do |headers, message|
messages << message
sleep 0.1
end
q.subscribe_with(consumer, :block => true)
consumer_exited = true
end
publisher_thread = Thread.new do
20.times do
x.publish('hello world', :routing_key => q.name)
end
end
sleep 0.5
consumer.cancel
consumer_thread.join
publisher_thread.join
expect(messages).not_to be_empty
expect(consumer_exited).to eq(true)
end
end
context "that DOES block the caller and never receives any messages" do
it 'can be cancelled' do
x = connection.create_channel.default_exchange
q = connection.create_channel.queue("", :exclusive => true)
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
co = q.build_consumer(:block => true) do |headers, message|
messages << message
sleep 0.1
end
consumer = co
q.subscribe_with(co, :block => true)
consumer_exited = true
end
sleep 1.0
consumer.cancel
consumer_thread.join
expect(consumer_exited).to eq(true)
end
end
context 'that is cancelled' do
it 'will not raise errors when cancelled again' do
queue = connection.create_channel.queue('')
consumer = queue.build_consumer(:block => true) { |headers, message| }
thread = Thread.new do
queue.subscribe_with(consumer, :block => true)
end
sleep 1
begin
consumer.cancel
consumer.cancel
rescue NativeException => e
raise e
ensure
thread.join
end
end
end
end