-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathAmqpConnectionFactory.php
111 lines (90 loc) · 3.66 KB
/
AmqpConnectionFactory.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<?php
declare(strict_types=1);
namespace Enqueue\AmqpBunny;
use Enqueue\AmqpTools\ConnectionConfig;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
use Interop\Queue\Context;
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
{
use DelayStrategyAwareTrait;
/**
* @var ConnectionConfig
*/
private $config;
/**
* @var BunnyClient
*/
private $client;
/**
* @see ConnectionConfig for possible config formats and values
*
* @param array|string|null $config
*/
public function __construct($config = 'amqp:')
{
$this->config = (new ConnectionConfig($config))
->addSupportedScheme('amqp+bunny')
->addDefaultOption('tcp_nodelay', null)
->parse()
;
if (in_array('rabbitmq', $this->config->getSchemeExtensions(), true)) {
$this->setDelayStrategy(new RabbitMqDlxDelayStrategy());
}
}
/**
* @return AmqpContext
*/
public function createContext(): Context
{
if ($this->config->isLazy()) {
$context = new AmqpContext(function () {
$channel = $this->establishConnection()->channel();
$channel->qos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());
return $channel;
}, $this->config->getConfig());
$context->setDelayStrategy($this->delayStrategy);
return $context;
}
$context = new AmqpContext($this->establishConnection()->channel(), $this->config->getConfig());
$context->setDelayStrategy($this->delayStrategy);
$context->setQos($this->config->getQosPrefetchSize(), $this->config->getQosPrefetchCount(), $this->config->isQosGlobal());
return $context;
}
public function getConfig(): ConnectionConfig
{
return $this->config;
}
private function establishConnection(): BunnyClient
{
if ($this->config->isSslOn()) {
throw new \LogicException('The bunny library does not support SSL connections');
}
if (false == $this->client) {
$bunnyConfig = [];
$bunnyConfig['host'] = $this->config->getHost();
$bunnyConfig['port'] = $this->config->getPort();
$bunnyConfig['vhost'] = $this->config->getVHost();
$bunnyConfig['user'] = $this->config->getUser();
$bunnyConfig['password'] = $this->config->getPass();
$bunnyConfig['read_write_timeout'] = min($this->config->getReadTimeout(), $this->config->getWriteTimeout());
$bunnyConfig['timeout'] = $this->config->getConnectionTimeout();
// @see https://github.com/php-enqueue/enqueue-dev/issues/229
// $bunnyConfig['persistent'] = $this->config->isPersisted();
// if ($this->config->isPersisted()) {
// $bunnyConfig['path'] = 'enqueue';//$this->config->getOption('path', $this->config->getOption('vhost'));
// }
if ($this->config->getHeartbeat()) {
$bunnyConfig['heartbeat'] = $this->config->getHeartbeat();
}
if (null !== $this->config->getOption('tcp_nodelay')) {
$bunnyConfig['tcp_nodelay'] = $this->config->getOption('tcp_nodelay');
}
$this->client = new BunnyClient($bunnyConfig);
$this->client->connect();
}
return $this->client;
}
}