-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathGearmanContext.php
129 lines (106 loc) · 3.08 KB
/
GearmanContext.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
<?php
declare(strict_types=1);
namespace Enqueue\Gearman;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
use Interop\Queue\Destination;
use Interop\Queue\Exception\InvalidDestinationException;
use Interop\Queue\Exception\PurgeQueueNotSupportedException;
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
use Interop\Queue\Message;
use Interop\Queue\Producer;
use Interop\Queue\Queue;
use Interop\Queue\SubscriptionConsumer;
use Interop\Queue\Topic;
class GearmanContext implements Context
{
/**
* @var \GearmanClient
*/
private $client;
/**
* @var GearmanConsumer[]
*/
private $consumers;
/**
* @var array
*/
private $config;
public function __construct(array $config)
{
$this->config = $config;
}
/**
* @return GearmanMessage
*/
public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
{
return new GearmanMessage($body, $properties, $headers);
}
/**
* @return GearmanDestination
*/
public function createTopic(string $topicName): Topic
{
return new GearmanDestination($topicName);
}
/**
* @return GearmanDestination
*/
public function createQueue(string $queueName): Queue
{
return new GearmanDestination($queueName);
}
public function createTemporaryQueue(): Queue
{
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
}
/**
* @return GearmanProducer
*/
public function createProducer(): Producer
{
return new GearmanProducer($this->getClient());
}
/**
* @param GearmanDestination $destination
*
* @return GearmanConsumer
*/
public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, GearmanDestination::class);
$this->consumers[] = $consumer = new GearmanConsumer($this, $destination);
return $consumer;
}
public function close(): void
{
$this->getClient()->clearCallbacks();
foreach ($this->consumers as $consumer) {
$consumer->getWorker()->unregisterAll();
}
}
public function createSubscriptionConsumer(): SubscriptionConsumer
{
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
}
public function purgeQueue(Queue $queue): void
{
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
}
public function getClient(): \GearmanClient
{
if (false == $this->client) {
$this->client = new \GearmanClient();
$this->client->addServer($this->config['host'], $this->config['port']);
}
return $this->client;
}
public function createWorker(): \GearmanWorker
{
$worker = new \GearmanWorker();
$worker->addServer($this->config['host'], $this->config['port']);
return $worker;
}
}