forked from jakubkulhan/bunny
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreconnect.php
55 lines (50 loc) · 1.71 KB
/
reconnect.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<?php
require "vendor/autoload.php";
$loop = \React\EventLoop\Factory::create();
$mq = new \Bunny\Async\Client($loop, [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
//'heartbeat' => 6,
//'timeout'=>2.0
]);
// Connect is refactored to be retryable
$mq->connect()
->retryWhen(function ($errors) {
// Retry to connect on error
return $errors->delay(2000)
->doOnNext(function () {
echo "Disconnected, retry\n";
});
})
->flatMap(function () use ($mq) {
return $mq->channel();
})
->subscribeCallback(function (\Bunny\Channel $channel) use ($mq, $loop) {
echo "connected\n";
\Rx\Observable::interval(2000)
// <- here we drop during disconnect disconnect
// <- add a buffer ? include it on produce ?
->flatMap(function () use ($mq, $channel) {
$data = md5(microtime(true));
echo "Produce {$data}\n";
return \Rx\React\Promise::toObservable($channel->publish($data, [], 'amq.direct', 'test'))
->map(function () use ($data) {
return $data;
});
})
->subscribeCallback(
function ($data) {
echo " {$data} : OK\n";
},
function () {
echo "error during produce\n";
}, null,
new \Rx\Scheduler\EventLoopScheduler($loop)
);
}, function (\Exception $e) {
echo "disconnected : {$e->getMessage()}\n";
}, null, new \Rx\Scheduler\EventLoopScheduler($loop));
// Publish every 1s
$loop->run();