-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnsQsConsumer.php
115 lines (90 loc) · 2.96 KB
/
SnsQsConsumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<?php
declare(strict_types=1);
namespace Enqueue\SnsQs;
use Enqueue\Sqs\SqsConsumer;
use Enqueue\Sqs\SqsMessage;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Message;
use Interop\Queue\Queue;
class SnsQsConsumer implements Consumer
{
/**
* @var SnsQsContext
*/
private $context;
/**
* @var SqsConsumer
*/
private $consumer;
/**
* @var SnsQsQueue
*/
private $queue;
public function __construct(SnsQsContext $context, SqsConsumer $consumer, SnsQsQueue $queue)
{
$this->context = $context;
$this->consumer = $consumer;
$this->queue = $queue;
}
public function getQueue(): Queue
{
return $this->queue;
}
public function receive(int $timeout = 0): ?Message
{
if ($sqsMessage = $this->consumer->receive($timeout)) {
return $this->convertMessage($sqsMessage);
}
return null;
}
public function receiveNoWait(): ?Message
{
if ($sqsMessage = $this->consumer->receiveNoWait()) {
return $this->convertMessage($sqsMessage);
}
return null;
}
/**
* @param SnsQsMessage $message
*/
public function acknowledge(Message $message): void
{
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
$this->consumer->acknowledge($message->getSqsMessage());
}
/**
* @param SnsQsMessage $message
*/
public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, SnsQsMessage::class);
$this->consumer->reject($message->getSqsMessage(), $requeue);
}
private function convertMessage(SqsMessage $sqsMessage): SnsQsMessage
{
$message = $this->context->createMessage();
$message->setRedelivered($sqsMessage->isRedelivered());
$message->setSqsMessage($sqsMessage);
$message->setHeaders($sqsMessage->getHeaders());
$body = $sqsMessage->getBody();
if (isset($body[0]) && '{' === $body[0]) {
$data = json_decode($sqsMessage->getBody(), true);
if (isset($data['TopicArn']) && isset($data['Type']) && 'Notification' === $data['Type']) {
// SNS message conversion
if (isset($data['Message'])) {
$message->setBody((string) $data['Message']);
}
if (isset($data['MessageAttributes']['Headers'])) {
$headersData = json_decode($data['MessageAttributes']['Headers']['Value'], true);
$message->setHeaders($headersData[0]);
$message->setProperties($headersData[1]);
}
return $message;
}
}
$message->setBody($sqsMessage->getBody());
$message->setProperties($sqsMessage->getProperties());
return $message;
}
}