-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMQ.php
119 lines (108 loc) · 3.28 KB
/
RabbitMQ.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?php
namespace TargonIndustries\rabbitmq;
use AMQPChannel;
use AMQPChannelException;
use AMQPConnection;
use AMQPConnectionException;
use AMQPExchange;
use AMQPExchangeException;
use AMQPQueue;
use AMQPQueueException;
use TargonIndustries\rabbitmq\tasks\RabbitMqTask;
require_once DOCROOT_LIB_RABBITMQ . 'tasks/RabbitMqTask.php';
class RabbitMQ
{
private static ?AMQPConnection $connection = null;
private static ?AMQPChannel $channel = null;
/**
* @throws AMQPConnectionException
*/
public static function getConnection(): ?AMQPConnection
{
if (self::$connection == null) {
self::$connection = new AMQPConnection([
'host' => getenv('RABBITMQ_HOST'),
'port' => 5672,
'vhost' => '/',
'login' => getenv('RABBITMQ_USER'),
'password' => getenv('RABBITMQ_PASS')
]);
self::$connection->connect();
}
return self::$connection;
}
/**
* @throws AMQPConnectionException
*/
public static function getChannel(): ?AMQPChannel
{
if (self::$channel == null) {
self::$channel = new AMQPChannel(self::getConnection());
}
return self::$channel;
}
/**
* @throws AMQPQueueException
* @throws AMQPChannelException
* @throws AMQPConnectionException
*/
public static function getQueue(string $name): AMQPQueue
{
$queue = new AMQPQueue(self::getChannel());
$queue->setName($name);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
return $queue;
}
/**
* @throws AMQPExchangeException
* @throws AMQPChannelException
* @throws AMQPConnectionException
*/
public static function getExchange(string $name): AMQPExchange
{
$exchange = new AMQPExchange(self::getChannel());
$exchange->setName($name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
return $exchange;
}
/**
* @throws AMQPQueueException
* @throws AMQPExchangeException
* @throws AMQPChannelException
* @throws AMQPConnectionException
*/
public static function bindQueueToExchange(string $queueName, string $exchangeName, string $routingKey): void
{
$queue = self::getQueue($queueName);
$exchange = self::getExchange($exchangeName);
$queue->bind($exchangeName, $routingKey);
}
/**
* @throws AMQPExchangeException
* @throws AMQPChannelException
* @throws AMQPConnectionException
*/
public static function sendTask(string $queueName, string $exchangeName, string $routingKey, RabbitMqTask $task): void
{
$exchange = self::getExchange($exchangeName);
$exchange->publish($task->serialize(), $routingKey);
}
/**
* @throws AMQPQueueException
* @throws AMQPChannelException
* @throws AMQPConnectionException
*/
public static function getTask(string $queueName)
{
$queue = self::getQueue($queueName);
$message = $queue->get();
if ($message) {
$queue->ack($message->getDeliveryTag());
return unserialize($message->getBody());
}
return null;
}
}