Skip to content

Commit

Permalink
✅ Fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 17, 2024
1 parent dd1710c commit e85bbc4
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 58 deletions.
22 changes: 10 additions & 12 deletions examples/Transport/Receiver/CollectionReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,42 @@

namespace Flow\Examples\Transport\Receiver;

use Flow\EnvelopeTrait;
use SplObjectStorage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

class CollectionReceiver implements ReceiverInterface
{
use EnvelopeTrait;

/**
* @var array<mixed, ReceiverInterface>
* @var SplObjectStorage<Envelope, ReceiverInterface>
*/
private array $envelopePool;
private SplObjectStorage $envelopePool;

/**
* @param iterable<ReceiverInterface> $receivers
*/
public function __construct(private iterable $receivers) {}
public function __construct(private iterable $receivers)
{
$this->envelopePool = new SplObjectStorage();
}

public function get(): iterable
{
foreach ($this->receivers as $receiver) {
foreach ($receiver->get() as $envelope) {
$id = $this->getEnvelopeId($envelope);
$this->envelopePool[$id] = $receiver;
$this->envelopePool[$envelope] = $receiver;
yield $envelope;
}
}
}

public function ack(Envelope $envelope): void
{
$id = $this->getEnvelopeId($envelope);
$this->envelopePool[$id]->ack($envelope);
$this->envelopePool[$envelope]->ack($envelope);
}

public function reject(Envelope $envelope): void
{
$id = $this->getEnvelopeId($envelope);
$this->envelopePool[$id]->reject($envelope);
$this->envelopePool[$envelope]->reject($envelope);
}
}
3 changes: 0 additions & 3 deletions examples/Transport/Sender/CollectionSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

namespace Flow\Examples\Transport\Sender;

use Flow\EnvelopeTrait;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

class CollectionSender implements SenderInterface
{
use EnvelopeTrait;

/**
* @param iterable<SenderInterface> $senders
*/
Expand Down
2 changes: 1 addition & 1 deletion examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use Flow\IpStrategy\MaxIpStrategy;
use Symfony\Component\Messenger\Envelope;

$driver = match (random_int(1, 1)) {
$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
Expand Down
1 change: 0 additions & 1 deletion src/Flow/TransportFlow.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Flow\DriverInterface;
use Flow\FlowInterface;
use Flow\Ip;
use SplObjectStorage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
Expand Down
30 changes: 0 additions & 30 deletions tests/EnvelopeTraitTest.php

This file was deleted.

33 changes: 22 additions & 11 deletions tests/Examples/Transport/Receiver/CollectionReceiverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Flow\Test\Examples\Transport\Receiver;

use ArrayObject;
use Flow\EnvelopeTrait;
use Flow\Examples\Transport\Receiver\CollectionReceiver;
use PHPUnit\Framework\TestCase;
use SplObjectStorage;
Expand All @@ -18,8 +17,6 @@

class CollectionReceiverTest extends TestCase
{
use EnvelopeTrait;

/**
* @param array<ReceiverInterface> $receivers
* @param SplObjectStorage<ReceiverInterface, array<Envelope>> $expectedReceiversIps
Expand All @@ -35,11 +32,11 @@ public function testGet($receivers, $expectedReceiversIps): void
foreach ($expectedReceiversIps as $receiver) {
$expectedIps = $expectedReceiversIps[$receiver];
foreach ($expectedIps as $envelope) {
$expectedIpIds[] = $this->getEnvelopeId($envelope);
$expectedIpIds[] = $this->getTransportMessageId($envelope);
}
}
$envelopeIds = array_map(function (Envelope $envelope) {
return $this->getEnvelopeId($envelope);
return $this->getTransportMessageId($envelope);
}, iterator_to_array($envelopes));

$this->assertArraySimilar($expectedIpIds, $envelopeIds);
Expand All @@ -59,12 +56,12 @@ public function testAckAndReject($receivers, $expectedReceiversIps, $expectedAck

$envelopes = $collectionReceiver->get();
foreach ($envelopes as $envelope) {
$envelopeId = $this->getEnvelopeId($envelope);
$envelopeId = $this->getTransportMessageId($envelope);
$expectedReceiver = null;
foreach ($expectedReceiversIps as $receiver) {
$expectedIps = $expectedReceiversIps[$receiver];
foreach ($expectedIps as $expectedIp) {
$expectedIpId = $this->getEnvelopeId($expectedIp);
$expectedIpId = $this->getTransportMessageId($expectedIp);
if ($envelopeId === $expectedIpId) {
$expectedReceiver = $receiver;

Expand Down Expand Up @@ -104,8 +101,6 @@ public static function receiverProvider(): iterable
}

$receiver = new class($envelopes, $expectedAckIpsReceivers, $expectedRejectIpsReceivers) implements ReceiverInterface {
use EnvelopeTrait;

/**
* @param array<Envelope> $envelopes
* @param array<mixed, ReceiverInterface> $expectedAckIpsReceivers
Expand Down Expand Up @@ -138,15 +133,23 @@ public function getExpectedRejectIpsReceivers()

public function ack(Envelope $envelope): void
{
$id = $this->getEnvelopeId($envelope);
$id = $this->getTransportMessageId($envelope);
$this->expectedAckIpsReceivers[$id] = $this;
}

public function reject(Envelope $envelope): void
{
$id = $this->getEnvelopeId($envelope);
$id = $this->getTransportMessageId($envelope);
$this->expectedRejectIpsReceivers[$id] = $this;
}

private function getTransportMessageId(Envelope $envelope): string
{
/** @var null|TransportMessageIdStamp $stamp */
$stamp = $envelope->last(TransportMessageIdStamp::class);

return $stamp ? $stamp->getId() : '';
}
};
$expectedReceiversIps->offsetSet($receiver, $envelopes);
$receivers[] = $receiver;
Expand Down Expand Up @@ -178,4 +181,12 @@ protected function assertArraySimilar(array $expected, array $array): void
}
}
}

private function getTransportMessageId(Envelope $envelope): string
{
/** @var null|TransportMessageIdStamp $stamp */
$stamp = $envelope->last(TransportMessageIdStamp::class);

return $stamp ? $stamp->getId() : '';
}
}

0 comments on commit e85bbc4

Please sign in to comment.