diff --git a/examples/client.php b/examples/client.php index b29851b..31a6e2b 100644 --- a/examples/client.php +++ b/examples/client.php @@ -45,7 +45,10 @@ public function wait(array $handlers): void } } -$connection = DriverManager::getConnection(['url' => 'mysql://flow:flow@127.0.0.1:3306/flow?serverVersion=8.1']); +$connection = DriverManager::getConnection([ + 'driver' => 'pdo_sqlite', + 'path' => __DIR__ . '/flow.sqlite', +]); $transport = new DoctrineIpTransport($connection, uniqid('transport_', true)); $client = new client($transport, $transport); diff --git a/examples/server.php b/examples/server.php index 75f4591..7f09146 100644 --- a/examples/server.php +++ b/examples/server.php @@ -11,12 +11,14 @@ use Flow\Driver\SpatieDriver; use Flow\Driver\SwooleDriver; use Flow\Examples\Transport\DoctrineIpTransport; +use Flow\ExceptionInterface; use Flow\Flow\Flow; use Flow\Flow\TransportFlow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; +use Symfony\Component\Messenger\Envelope; -$driver = match (random_int(1, 4)) { +$driver = match (random_int(1, 1)) { 1 => new AmpDriver(), 2 => new FiberDriver(), 3 => new ReactDriver(), @@ -25,17 +27,21 @@ }; printf("Use %s\n", $driver::class); -$addOneJob = static function (object $data) use ($driver): void { - printf("Client %s #%d : Calculating %d + 1\n", $data['client'], $data['id'], $data['number']); +$addOneJob = static function (Envelope $envelope) use ($driver): Envelope { + $message = $envelope->getMessage(); + printf("Client %s #%d : Calculating %d + 1\n", $message['client'], $message['id'], $message['number']); // simulating calculating some "light" operation from 0.1 to 1 seconds $delay = random_int(1, 10) / 10; $driver->delay($delay); - $data['number']++; + $message['number']++; + + return $envelope; }; -$multbyTwoJob = static function (object $data) use ($driver): void { - printf("Client %s #%d : Calculating %d * 2\n", $data['client'], $data['id'], $data['number']); +$multbyTwoJob = static function (Envelope $envelope) use ($driver): Envelope { + $message = $envelope->getMessage(); + printf("Client %s #%d : Calculating %d * 2\n", $message['client'], $message['id'], $message['number']); // simulating calculating some "heavy" operation from from 1 to 3 seconds $delay = random_int(1, 3); @@ -43,26 +49,29 @@ // simulating 1 chance on 3 to produce an exception from the "heavy" operation if (1 === random_int(1, 3)) { - throw new Error('Failure when processing "Mult by two"'); + throw new Error(sprintf('Client %s #%d : Exception Failure when processing "Mult by two"', $message['client'], $message['id'])); } - $data['number'] *= 2; + $message['number'] *= 2; + + return $envelope; }; -$minusThreeJob = static function (object $data): void { - printf("Client %s #%d : Calculating %d - 3\n", $data['client'], $data['id'], $data['number']); +$minusThreeJob = static function (Envelope $envelope): Envelope { + $message = $envelope->getMessage(); + printf("Client %s #%d : Calculating %d - 3\n", $message['client'], $message['id'], $message['number']); // simulating calculating some "instant" operation - $data['number'] -= 3; + $message['number'] -= 3; + + return $envelope; }; /** * @param Ip $ip */ -$errorJob = static function (Ip $ip, Throwable $exception): void { - printf("Client %s #%d : Exception %s\n", $ip->data['client'], $ip->data['id'], $exception->getMessage()); - - $ip->data->offsetSet('number', null); +$errorJob = static function (ExceptionInterface $exception): void { + printf("%s\n", $exception->getMessage()); }; $flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) { @@ -71,7 +80,10 @@ yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)]; }, ['driver' => $driver]); -$connection = DriverManager::getConnection(['url' => 'mysql://flow:flow@127.0.0.1:3306/flow?serverVersion=8.1']); +$connection = DriverManager::getConnection([ + 'driver' => 'pdo_sqlite', + 'path' => __DIR__ . '/flow.sqlite', +]); $transport = new DoctrineIpTransport($connection); $transportFlow = new TransportFlow( @@ -81,3 +93,4 @@ $driver ); $transportFlow->pull(1); +$flow->await(); diff --git a/src/EnvelopeTrait.php b/src/EnvelopeTrait.php deleted file mode 100644 index a9a1490..0000000 --- a/src/EnvelopeTrait.php +++ /dev/null @@ -1,24 +0,0 @@ -last(TransportMessageIdStamp::class); - - if (null === $stamp || null === $stamp->getId()) { - throw new RuntimeException('Transport does not define id for envelope'); - } - - return $stamp->getId(); - } -} diff --git a/src/Flow/TransportFlow.php b/src/Flow/TransportFlow.php index b3fa299..b06955b 100644 --- a/src/Flow/TransportFlow.php +++ b/src/Flow/TransportFlow.php @@ -8,7 +8,6 @@ use Exception; use Flow\Driver\FiberDriver; use Flow\DriverInterface; -use Flow\EnvelopeTrait; use Flow\FlowInterface; use Flow\Ip; use SplObjectStorage; @@ -24,18 +23,6 @@ */ class TransportFlow extends FlowDecorator { - use EnvelopeTrait; - - /** - * @var SplObjectStorage, Envelope> - */ - private SplObjectStorage $envelopePool; - - /** - * @var array - */ - private array $envelopeIds; - /** * @var DriverInterface */ @@ -51,8 +38,21 @@ public function __construct( ?DriverInterface $driver = null ) { parent::__construct($flow); + $this->fn(function (Envelope $envelope) { + try { + $this->consumer->send(Envelope::wrap($envelope->getMessage(), array_reduce($envelope->all(), static function (array $all, array $stamps) { + foreach ($stamps as $stamp) { + $all[] = $stamp; + } + + return $all; + }, []))); + $this->producer->ack($envelope); + } catch (Exception $e) { + $this->producer->reject($envelope); + } + }); - $this->envelopePool = new SplObjectStorage(); $this->driver = $driver ?? new FiberDriver(); } @@ -73,36 +73,7 @@ public function pull(int $interval): Closure private function emit(Envelope $envelope): void { - $id = $this->getEnvelopeId($envelope); - if (!isset($this->envelopeIds[$id])) { - $this->envelopeIds[$id] = $envelope; - $ip = new Ip($envelope->getMessage()); - $this->envelopePool->offsetSet($ip, $envelope); - - try { - $this->flow->fn(function ($ip) { - $envelope = $this->envelopePool->offsetGet($ip); - $this->consumer->send(Envelope::wrap($ip->data, array_reduce($envelope->all(), static function (array $all, array $stamps) { - foreach ($stamps as $stamp) { - $all[] = $stamp; - } - - return $all; - }, []))); - $this->producer->ack($envelope); - - $this->envelopePool->offsetUnset($ip); - $id = $this->getEnvelopeId($envelope); - unset($this->envelopeIds[$id]); - }); - ($this->flow)($ip); - } catch (Exception $e) { - $envelope = $this->envelopePool->offsetGet($ip); - $this->producer->reject($envelope); - $this->envelopePool->offsetUnset($ip); - $id = $this->getEnvelopeId($envelope); - unset($this->envelopeIds[$id]); - } - } + $ip = new Ip($envelope); + ($this->flow)($ip); } }