diff --git a/examples/Transport/Receiver/CollectionReceiver.php b/examples/Transport/Receiver/CollectionReceiver.php index f2b7b5e6..84410b5d 100644 --- a/examples/Transport/Receiver/CollectionReceiver.php +++ b/examples/Transport/Receiver/CollectionReceiver.php @@ -4,30 +4,30 @@ namespace Flow\Examples\Transport\Receiver; -use Flow\EnvelopeTrait; +use SplObjectStorage; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class CollectionReceiver implements ReceiverInterface { - use EnvelopeTrait; - /** - * @var array + * @var SplObjectStorage */ - private array $envelopePool; + private SplObjectStorage $envelopePool; /** * @param iterable $receivers */ - public function __construct(private iterable $receivers) {} + public function __construct(private iterable $receivers) + { + $this->envelopePool = new SplObjectStorage(); + } public function get(): iterable { foreach ($this->receivers as $receiver) { foreach ($receiver->get() as $envelope) { - $id = $this->getEnvelopeId($envelope); - $this->envelopePool[$id] = $receiver; + $this->envelopePool[$envelope] = $receiver; yield $envelope; } } @@ -35,13 +35,11 @@ public function get(): iterable public function ack(Envelope $envelope): void { - $id = $this->getEnvelopeId($envelope); - $this->envelopePool[$id]->ack($envelope); + $this->envelopePool[$envelope]->ack($envelope); } public function reject(Envelope $envelope): void { - $id = $this->getEnvelopeId($envelope); - $this->envelopePool[$id]->reject($envelope); + $this->envelopePool[$envelope]->reject($envelope); } } diff --git a/examples/Transport/Sender/CollectionSender.php b/examples/Transport/Sender/CollectionSender.php index b4382c6a..fcd9c52d 100644 --- a/examples/Transport/Sender/CollectionSender.php +++ b/examples/Transport/Sender/CollectionSender.php @@ -4,14 +4,11 @@ namespace Flow\Examples\Transport\Sender; -use Flow\EnvelopeTrait; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; class CollectionSender implements SenderInterface { - use EnvelopeTrait; - /** * @param iterable $senders */ diff --git a/examples/server.php b/examples/server.php index 7f091463..3c3375f0 100644 --- a/examples/server.php +++ b/examples/server.php @@ -18,7 +18,7 @@ use Flow\IpStrategy\MaxIpStrategy; use Symfony\Component\Messenger\Envelope; -$driver = match (random_int(1, 1)) { +$driver = match (random_int(1, 4)) { 1 => new AmpDriver(), 2 => new FiberDriver(), 3 => new ReactDriver(), diff --git a/src/Flow/TransportFlow.php b/src/Flow/TransportFlow.php index b06955b3..8ab5a48e 100644 --- a/src/Flow/TransportFlow.php +++ b/src/Flow/TransportFlow.php @@ -10,7 +10,6 @@ use Flow\DriverInterface; use Flow\FlowInterface; use Flow\Ip; -use SplObjectStorage; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; diff --git a/tests/EnvelopeTraitTest.php b/tests/EnvelopeTraitTest.php deleted file mode 100644 index 1b6b9a05..00000000 --- a/tests/EnvelopeTraitTest.php +++ /dev/null @@ -1,30 +0,0 @@ -getEnvelopeId($ip)); - } - - public function testGetIpIdWithoutId(): void - { - $envelope = new Envelope(new stdClass()); - $this->expectException(RuntimeException::class); - $this->getEnvelopeId($envelope); - } -} diff --git a/tests/Examples/Transport/Receiver/CollectionReceiverTest.php b/tests/Examples/Transport/Receiver/CollectionReceiverTest.php index 05e50589..23111da7 100644 --- a/tests/Examples/Transport/Receiver/CollectionReceiverTest.php +++ b/tests/Examples/Transport/Receiver/CollectionReceiverTest.php @@ -5,7 +5,6 @@ namespace Flow\Test\Examples\Transport\Receiver; use ArrayObject; -use Flow\EnvelopeTrait; use Flow\Examples\Transport\Receiver\CollectionReceiver; use PHPUnit\Framework\TestCase; use SplObjectStorage; @@ -18,8 +17,6 @@ class CollectionReceiverTest extends TestCase { - use EnvelopeTrait; - /** * @param array $receivers * @param SplObjectStorage> $expectedReceiversIps @@ -35,11 +32,11 @@ public function testGet($receivers, $expectedReceiversIps): void foreach ($expectedReceiversIps as $receiver) { $expectedIps = $expectedReceiversIps[$receiver]; foreach ($expectedIps as $envelope) { - $expectedIpIds[] = $this->getEnvelopeId($envelope); + $expectedIpIds[] = $this->getTransportMessageId($envelope); } } $envelopeIds = array_map(function (Envelope $envelope) { - return $this->getEnvelopeId($envelope); + return $this->getTransportMessageId($envelope); }, iterator_to_array($envelopes)); $this->assertArraySimilar($expectedIpIds, $envelopeIds); @@ -59,12 +56,12 @@ public function testAckAndReject($receivers, $expectedReceiversIps, $expectedAck $envelopes = $collectionReceiver->get(); foreach ($envelopes as $envelope) { - $envelopeId = $this->getEnvelopeId($envelope); + $envelopeId = $this->getTransportMessageId($envelope); $expectedReceiver = null; foreach ($expectedReceiversIps as $receiver) { $expectedIps = $expectedReceiversIps[$receiver]; foreach ($expectedIps as $expectedIp) { - $expectedIpId = $this->getEnvelopeId($expectedIp); + $expectedIpId = $this->getTransportMessageId($expectedIp); if ($envelopeId === $expectedIpId) { $expectedReceiver = $receiver; @@ -104,8 +101,6 @@ public static function receiverProvider(): iterable } $receiver = new class($envelopes, $expectedAckIpsReceivers, $expectedRejectIpsReceivers) implements ReceiverInterface { - use EnvelopeTrait; - /** * @param array $envelopes * @param array $expectedAckIpsReceivers @@ -138,15 +133,23 @@ public function getExpectedRejectIpsReceivers() public function ack(Envelope $envelope): void { - $id = $this->getEnvelopeId($envelope); + $id = $this->getTransportMessageId($envelope); $this->expectedAckIpsReceivers[$id] = $this; } public function reject(Envelope $envelope): void { - $id = $this->getEnvelopeId($envelope); + $id = $this->getTransportMessageId($envelope); $this->expectedRejectIpsReceivers[$id] = $this; } + + private function getTransportMessageId(Envelope $envelope): string + { + /** @var null|TransportMessageIdStamp $stamp */ + $stamp = $envelope->last(TransportMessageIdStamp::class); + + return $stamp ? $stamp->getId() : ''; + } }; $expectedReceiversIps->offsetSet($receiver, $envelopes); $receivers[] = $receiver; @@ -178,4 +181,12 @@ protected function assertArraySimilar(array $expected, array $array): void } } } + + private function getTransportMessageId(Envelope $envelope): string + { + /** @var null|TransportMessageIdStamp $stamp */ + $stamp = $envelope->last(TransportMessageIdStamp::class); + + return $stamp ? $stamp->getId() : ''; + } }