Skip to content

Commit

Permalink
🚑 Symfony insight fix minor
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Ledru committed Oct 17, 2024
1 parent aef7839 commit 72efcb3
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 418 deletions.
44 changes: 44 additions & 0 deletions examples/Transport/Client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class Client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}
41 changes: 2 additions & 39 deletions examples/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,16 @@
require __DIR__ . '/../vendor/autoload.php';

use Doctrine\DBAL\DriverManager;
use Flow\Examples\Transport\Client;
use Flow\Examples\Transport\DoctrineIpTransport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}

$connection = DriverManager::getConnection([
'driver' => 'pdo_sqlite',
'path' => __DIR__ . '/flow.sqlite',
]);
$transport = new DoctrineIpTransport($connection, uniqid('transport_', true));

$client = new client($transport, $transport);
$client = new Client($transport, $transport);

$ip = long2ip(random_int(ip2long('10.0.0.0'), ip2long('10.255.255.255')));
for ($i = 0; $i < 3; $i++) {
Expand Down
44 changes: 19 additions & 25 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,20 @@
};
printf("Use %s\n", $driver::class);

function factorial(int $n): int
{
return ($n <= 1) ? 1 : $n * factorial($n - 1);
}
$factorial = static function (int $n) use (&$factorial): int {
return ($n <= 1) ? 1 : $n * $factorial($n - 1);
};

/**
* @return JobInterface<mixed, mixed>
*/
function Ywrap(callable $func, callable $wrapperFunc): JobInterface
{
$Ywrap = static function (callable $func, callable $wrapperFunc): JobInterface {
$wrappedFunc = static fn ($recurse) => $wrapperFunc(static fn (...$args) => $func($recurse)(...$args));

return new YJob($wrappedFunc);
}
};

function memoWrapperGenerator(callable $f): Closure
{
$memoWrapperGenerator = static function (callable $f): Closure {
static $cache = [];

return static function ($y) use ($f, &$cache) {
Expand All @@ -52,33 +49,30 @@ function memoWrapperGenerator(callable $f): Closure

return $cache[$y];
};
}
};

/**
* @return JobInterface<mixed, mixed>
*/
function Ymemo(callable $f): JobInterface
{
return Ywrap($f, 'memoWrapperGenerator');
}
$Ymemo = static function (callable $f) use ($Ywrap, $memoWrapperGenerator): JobInterface {
return $Ywrap($f, $memoWrapperGenerator);
};

function factorialGen(callable $func): Closure
{
$factorialGen = static function (callable $func): Closure {
return static function (int $n) use ($func): int {
return ($n <= 1) ? 1 : $n * $func($n - 1);
};
}
};

function factorialYMemo(int $n): int
{
return Ymemo('factorialGen')($n);
}
$factorialYMemo = static function (int $n) use ($Ymemo, $factorialGen): int {
return $Ymemo($factorialGen)($n);
};

$factorialJob = static function (YFlowData $data): YFlowData {
$factorialJob = static function (YFlowData $data) use ($factorial): YFlowData {
printf("*.... #%d - Job 1 : Calculating factorial(%d)\n", $data->id, $data->number);

// raw factorial calculation
$result = factorial($data->number);
$result = $factorial($data->number);

printf("*.... #%d - Job 1 : Result for factorial(%d) = %d\n", $data->id, $data->number, $result);

Expand Down Expand Up @@ -107,11 +101,11 @@ function factorialYMemo(int $n): int
return new YFlowData($data->id, $data->number);
};

$factorialYMemoJob = static function (YFlowData $data) use ($driver): YFlowData {
$factorialYMemoJob = static function (YFlowData $data) use ($driver, $factorialYMemo): YFlowData {
printf("..*.. #%d - Job 3 : Calculating factorialYMemo(%d)\n", $data->id, $data->number);

$driver->delay(3);
$result = factorialYMemo($data->number);
$result = $factorialYMemo($data->number);

printf("..*.. #%d - Job 3 : Result for factorialYMemo(%d) = %d\n", $data->id, $data->number, $result);

Expand Down
4 changes: 2 additions & 2 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public function await(array &$stream): void
$job = $stream['fnFlows'][$index]['job'];

$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
Expand All @@ -129,7 +129,7 @@ public function await(array &$stream): void
}
}

if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 || $this->ticks > 0) {
EventLoop::defer($loop);
} else {
EventLoop::getDriver()->stop();
Expand Down
3 changes: 1 addition & 2 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public function await(array &$stream): void
if ($isTick === false) {
$next($return);
}
// Fiber::suspend();
}, static function ($fn, $next) {
$fn($next);
});
Expand Down Expand Up @@ -152,7 +151,7 @@ public function await(array &$stream): void
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
Expand Down
4 changes: 2 additions & 2 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function await(array &$stream): void
$job = $stream['fnFlows'][$index]['job'];

$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
Expand All @@ -122,7 +122,7 @@ public function await(array &$stream): void
}
}

if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 || $this->ticks > 0) {
$this->eventLoop->futureTick($loop);
} else {
$this->eventLoop->stop();
Expand Down
2 changes: 1 addition & 1 deletion src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public function await(array &$stream): void
$nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps();
foreach ($nextIps as $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
Expand Down
2 changes: 1 addition & 1 deletion src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public function await(array &$stream): void
$nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps();
foreach ($nextIps as $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
Expand Down
4 changes: 2 additions & 2 deletions tools/phpstan/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
"require-dev": {
"amphp/amp": "^3.0",
"openswoole/ide-helper": "^22.1.5",
"phpstan/phpstan": "^1.11",
"phpunit/phpunit": "^10.3",
"phpstan/phpstan": "^1.12",
"phpunit/phpunit": "^11.4",
"react/async": "^4.3",
"spatie/async": "^1.6",
"symfony/doctrine-messenger": "^7.0",
Expand Down
Loading

0 comments on commit 72efcb3

Please sign in to comment.