diff --git a/CHANGELOG.md b/CHANGELOG.md index 109165bd..67f89e09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,6 @@ - Update DX for Flow\DriverInterface - Update `async` that now $onResolve get called with async $callback result or Flow\Exception as first argument - Update `tick` that now return a Closure to cleanup tick interval - - Remove `start` and `stop` - Add Flow\Driver\FiberDriver from https://github.com/jolicode/castor/blob/main/src/functions.php - Upgrade to Symfony 6.3 and PHPUnit 10.3 - Refactor docs structure diff --git a/examples/flow.php b/examples/flow.php index ce04fc6e..ecc00365 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -17,8 +17,8 @@ $driver = match (random_int(1, 5)) { 1 => new AmpDriver(), - 2 => new ReactDriver(), - 3 => new FiberDriver(), + 2 => new FiberDriver(), + 3 => new ReactDriver(), 4 => new RevoltDriver(), 5 => new SwooleDriver(), }; @@ -28,7 +28,7 @@ printf("*. #%d - Job 1 : Calculating %d + %d\n", $data->id, $data->number, $data->number); // simulating calculating some "light" operation from 0.1 to 1 seconds - $delay = random_int(1, 10) / 10; + $delay = random_int(1, 3); $driver->delay($delay); $result = $data->number; $result += $result; @@ -88,5 +88,13 @@ for ($i = 1; $i <= 5; $i++) { $ip = new Ip(new Data($i, $i)); + $ipPool->offsetSet($ip, true); $flow($ip, static fn ($ip) => $ipPool->offsetUnset($ip)); } + +$driver->tick(1, static function () use ($driver, $ipPool) { + if ($ipPool->count() === 0) { + $driver->stop(); + } +}); +$driver->start(); diff --git a/examples/server.php b/examples/server.php index 4ab3c466..199dab0f 100644 --- a/examples/server.php +++ b/examples/server.php @@ -17,8 +17,8 @@ $driver = match (random_int(1, 5)) { 1 => new AmpDriver(), - 2 => new ReactDriver(), - 3 => new FiberDriver(), + 2 => new FiberDriver(), + 3 => new ReactDriver(), 4 => new RevoltDriver(), 5 => new SwooleDriver(), }; diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index d3a89590..770a88d0 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -23,6 +23,11 @@ */ class AmpDriver implements DriverInterface { + /** + * @var array + */ + private array $ticksIds = []; + public function __construct() { if (!function_exists('Amp\\async')) { @@ -44,7 +49,7 @@ public function async(Closure $callback, Closure $onResolve = null): Closure $onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); } } - }, $callback, $args, $onResolve)->await(); + }, $callback, $args, $onResolve); }; } @@ -57,8 +62,29 @@ public function tick(int $interval, Closure $callback): Closure { $tickId = EventLoop::repeat($interval / 1000, $callback); - return static function () use ($tickId) { + $cancel = function () use ($tickId) { + unset($this->ticksIds[$tickId]); EventLoop::cancel($tickId); }; + + $this->ticksIds[$tickId] = $cancel; + + return $cancel; + } + + public function start(): void + { + if (!EventLoop::getDriver()->isRunning()) { + EventLoop::run(); + } + } + + public function stop(): void + { + foreach ($this->ticksIds as $cancel) { + $cancel(); + } + + EventLoop::getDriver()->stop(); } } diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 9ae415bc..adb6916d 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -25,6 +25,11 @@ class FiberDriver implements DriverInterface */ private array $fibers = []; + /** + * @var array + */ + private array $ticksIds = []; + private bool $isLooping = false; public function async(Closure $callback, Closure $onResolve = null): Closure @@ -45,7 +50,6 @@ public function async(Closure $callback, Closure $onResolve = null): Closure } $this->fibers[] = $fiberData; - $this->loop(); }; } @@ -59,50 +63,61 @@ public function delay(float $seconds): void */ public function tick(int $interval, Closure $callback): Closure { - $closure = static fn () => $callback(); + $tickId = uniqid('flow_fiber_tick_id'); + $closure = static fn () => $callback(); register_tick_function($closure); - return static fn () => unregister_tick_function($closure); + $cancel = function () use ($tickId, $closure) { + unset($this->ticksIds[$tickId]); + unregister_tick_function($closure); + }; + + return $cancel; } - private function loop(): void + public function start(): void { - if ($this->isLooping === false) { - $this->isLooping = true; + $this->isLooping = true; - $isRunning = true; + $isRunning = true; - while ($isRunning) { - $isRunning = false; + while ($this->isLooping || $isRunning) { /** @phpstan-ignore-line */ + $isRunning = false; - foreach ($this->fibers as $i => $fiber) { - $isRunning = $isRunning || !$fiber['fiber']->isTerminated(); + foreach ($this->fibers as $i => $fiber) { + $isRunning = $isRunning || !$fiber['fiber']->isTerminated(); - if (!$fiber['fiber']->isTerminated() and $fiber['fiber']->isSuspended()) { - try { - $fiber['fiber']->resume(); - } catch (Throwable $exception) { - $this->fibers[$i]['exception'] = $exception; - } - } else { - if ($fiber['onResolve']) { - if ($fiber['exception'] === null) { - $fiber['onResolve']($fiber['fiber']->getReturn()); - } else { - $fiber['onResolve'](new RuntimeException($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception'])); - } + if (!$fiber['fiber']->isTerminated() and $fiber['fiber']->isSuspended()) { + try { + $fiber['fiber']->resume(); + } catch (Throwable $exception) { + $this->fibers[$i]['exception'] = $exception; + } + } else { + if ($fiber['onResolve']) { + if ($fiber['exception'] === null) { + $fiber['onResolve']($fiber['fiber']->getReturn()); + } else { + $fiber['onResolve'](new RuntimeException($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception'])); } - unset($this->fibers[$i]); } + unset($this->fibers[$i]); } + } - if (Fiber::getCurrent()) { - Fiber::suspend(); - usleep(1_000); - } + if (Fiber::getCurrent()) { + Fiber::suspend(); + usleep(1_000); } } + } + + public function stop(): void + { + foreach ($this->ticksIds as $cancel) { + $cancel(); + } $this->isLooping = false; } diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index ee97c655..50082db1 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -26,7 +26,10 @@ class ReactDriver implements DriverInterface { private LoopInterface $eventLoop; - private int $counter = 0; + /** + * @var array + */ + private array $ticksIds = []; public function __construct(LoopInterface $eventLoop = null) { @@ -39,8 +42,8 @@ public function __construct(LoopInterface $eventLoop = null) public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolve): void { - async(function () use ($callback, $onResolve, $args) { + return static function (...$args) use ($callback, $onResolve): void { + async(static function () use ($callback, $onResolve, $args) { try { $return = $callback(...$args, ...($args = [])); if ($onResolve) { @@ -50,11 +53,8 @@ public function async(Closure $callback, Closure $onResolve = null): Closure if ($onResolve) { $onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); } - } finally { - $this->pop(); } })(); - $this->push(); }; } @@ -65,28 +65,27 @@ public function delay(float $seconds): void public function tick(int $interval, Closure $callback): Closure { - $tickId = $this->eventLoop->addPeriodicTimer($interval, $callback); - $this->push(); + $tickId = uniqid('flow_react_tick_id'); - return function () use ($tickId) { - $this->eventLoop->cancelTimer($tickId); - $this->pop(); + $timer = $this->eventLoop->addPeriodicTimer($interval, $callback); + + return function () use ($tickId, $timer) { + unset($this->ticksIds[$tickId]); + $this->eventLoop->cancelTimer($timer); }; } - private function push(): void + public function start(): void { - if ($this->counter === 0) { - $this->eventLoop->run(); - } - $this->counter++; + $this->eventLoop->run(); } - private function pop(): void + public function stop(): void { - $this->counter--; - if ($this->counter === 0) { - $this->eventLoop->stop(); + foreach ($this->ticksIds as $cancel) { + $cancel(); } + + $this->eventLoop->stop(); } } diff --git a/src/Driver/RevoltDriver.php b/src/Driver/RevoltDriver.php index 01cde1a1..267f742a 100644 --- a/src/Driver/RevoltDriver.php +++ b/src/Driver/RevoltDriver.php @@ -20,7 +20,10 @@ */ class RevoltDriver implements DriverInterface { - private int $counter = 0; + /** + * @var array + */ + private array $ticksIds = []; public function __construct(Driver $driver = null) { @@ -35,8 +38,8 @@ public function __construct(Driver $driver = null) public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolve): void { - EventLoop::queue(function (Closure $callback, array $args, Closure $onResolve = null) { + return static function (...$args) use ($callback, $onResolve): void { + EventLoop::queue(static function (Closure $callback, array $args, Closure $onResolve = null) { try { $return = $callback(...$args, ...($args = [])); if ($onResolve) { @@ -46,11 +49,8 @@ public function async(Closure $callback, Closure $onResolve = null): Closure if ($onResolve) { $onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); } - } finally { - $this->pop(); } }, $callback, $args, $onResolve); - $this->push(); }; } @@ -69,27 +69,26 @@ public function delay(float $seconds): void public function tick(int $interval, Closure $callback): Closure { $tickId = EventLoop::repeat($interval / 1000, $callback); - $this->push(); - return function () use ($tickId) { + $cancel = function () use ($tickId) { + unset($this->ticksIds[$tickId]); EventLoop::cancel($tickId); - $this->pop(); }; + + $this->ticksIds[$tickId] = $cancel; + + return $cancel; } - private function push(): void + public function start(): void { - if (/* $this->counter === 0 || */ !EventLoop::getDriver()->isRunning()) { + if (!EventLoop::getDriver()->isRunning()) { EventLoop::run(); } - $this->counter++; } - private function pop(): void + public function stop(): void { - $this->counter--; - if ($this->counter === 0) { - EventLoop::getDriver()->stop(); - } + EventLoop::getDriver()->stop(); } } diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 5efbac97..55e14c21 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -22,6 +22,11 @@ */ class SwooleDriver implements DriverInterface { + /** + * @var array + */ + private array $ticksIds = []; + public function __construct() { if (!extension_loaded('openswoole')) { @@ -59,8 +64,20 @@ public function tick(int $interval, Closure $callback): Closure { $tickId = Timer::tick($interval, $callback); - return static function () use ($tickId) { + return function () use ($tickId) { + unset($this->ticksIds[$tickId]); Timer::clear($tickId); // @phpstan-ignore-line }; } + + public function start(): void + { + } + + public function stop(): void + { + foreach ($this->ticksIds as $cancel) { + $cancel(); + } + } } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index c0cd0537..c6c58344 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -28,4 +28,8 @@ public function delay(float $seconds): void; * @return Closure(): void when called, this cleanup tick interval */ public function tick(int $interval, Closure $callback): Closure; + + public function start(): void; + + public function stop(): void; } diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 85c870ca..566eb533 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -122,8 +122,6 @@ private function nextIpJob(): void } else { throw $value; } - - return; } if ($this->fnFlow) { diff --git a/tests/Driver/AmpDriverTest.php b/tests/Driver/AmpDriverTest.php index cb098f80..011fc708 100644 --- a/tests/Driver/AmpDriverTest.php +++ b/tests/Driver/AmpDriverTest.php @@ -6,7 +6,6 @@ use Flow\Driver\AmpDriver; use Flow\DriverInterface; -use Revolt\EventLoop; /** * @template T1 @@ -16,16 +15,6 @@ */ class AmpDriverTest extends DriverTestCase { - protected function setUp(): void - { - EventLoop::getDriver()->run(); - } - - protected function tearDown(): void - { - EventLoop::getDriver()->stop(); - } - /** * @return DriverInterface */ diff --git a/tests/Driver/DriverTestCase.php b/tests/Driver/DriverTestCase.php index 61d0ef0a..87c49cce 100644 --- a/tests/Driver/DriverTestCase.php +++ b/tests/Driver/DriverTestCase.php @@ -18,9 +18,11 @@ public function testAsync(): void { $driver = $this->createDriver(); $driver->async(static function () { - }, function (mixed $value) { + }, function (mixed $value) use ($driver) { + $driver->stop(); $this->assertNull($value); })(); + $driver->start(); } public function testAsyncReturn(): void @@ -28,9 +30,11 @@ public function testAsyncReturn(): void $driver = $this->createDriver(); $driver->async(static function () { return 2; - }, function (mixed $value) { + }, function (mixed $value) use ($driver) { + $driver->stop(); $this->assertSame(2, $value); })(); + $driver->start(); } public function testAsyncError(): void @@ -38,9 +42,11 @@ public function testAsyncError(): void $driver = $this->createDriver(); $driver->async(static function () { throw new Exception(); - }, function (mixed $value) { + }, function (mixed $value) use ($driver) { + $driver->stop(); $this->assertInstanceOf(Exception::class, $value); })(); + $driver->start(); } public function testDelay(): void @@ -48,9 +54,11 @@ public function testDelay(): void $driver = $this->createDriver(); $driver->async(static function () use ($driver) { $driver->delay(1 / 1000); - }, function (mixed $value) { + }, function (mixed $value) use ($driver) { + $driver->stop(); $this->assertNull($value); })(); + $driver->start(); } public function testTick(): void @@ -64,9 +72,12 @@ public function testTick(): void }); $driver->async(function () use ($driver, &$i) { $driver->delay(3 / 1000); + $driver->stop(); $this->assertGreaterThan(3, $i); - })();*/ + })(); + + $driver->start();*/ } /** diff --git a/tests/Driver/RevoltDriverTest.php b/tests/Driver/RevoltDriverTest.php index 7064c089..04c46034 100644 --- a/tests/Driver/RevoltDriverTest.php +++ b/tests/Driver/RevoltDriverTest.php @@ -6,7 +6,6 @@ use Flow\Driver\RevoltDriver; use Flow\DriverInterface; -use Revolt\EventLoop; /** * @template T1 @@ -16,16 +15,6 @@ */ class RevoltDriverTest extends DriverTestCase { - protected function setUp(): void - { - EventLoop::getDriver()->run(); - } - - protected function tearDown(): void - { - EventLoop::getDriver()->stop(); - } - /** * @return DriverInterface */ diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index 3f8cca77..71ed71f5 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -35,10 +35,14 @@ public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy array_map(static fn ($job) => new Flow($job, static function () {}, $ipStrategy, $driver), $jobs), static fn ($flow, $flowIt) => $flow ? $flow->fn($flowIt) : $flowIt ); - ($flow)($ip, static function (Ip $ip) use ($resultNumber) { + ($flow)($ip, static function (Ip $ip) use ($driver, $resultNumber) { + $driver->stop(); + self::assertSame(ArrayObject::class, $ip->data::class); self::assertSame($resultNumber, $ip->data['number']); }); + + $driver->start(); } /** @@ -63,9 +67,11 @@ public function testJobs(DriverInterface $driver): void $ips = new ArrayObject(); - $callback = function (Ip $ip) use ($ips, $ip1, $ip2) { + $callback = function (Ip $ip) use ($driver, $ips, $ip1, $ip2) { $ips->append($ip); if ($ips->count() === 2) { + $driver->stop(); + $this->assertSame($ip1, $ips->offsetGet(0)); $this->assertSame($ip2, $ips->offsetGet(1)); self::assertSame(6, $ip1->data['n1']); @@ -77,6 +83,8 @@ public function testJobs(DriverInterface $driver): void ($flow)($ip1, $callback); ($flow)($ip2, $callback); + + $driver->start(); } /** @@ -101,9 +109,9 @@ public static function jobProvider(): iterable $driver->delay(1 / 1000); $data['number'] *= 2; }], 10], - /*'exceptionJob' => [[static function () use ($exception) { + 'exceptionJob' => [[static function () use ($exception) { throw $exception; - }], 0],*/ + }], 0], ]); } } diff --git a/tests/Flow/FlowTrait.php b/tests/Flow/FlowTrait.php index 55488f7f..292abd5b 100644 --- a/tests/Flow/FlowTrait.php +++ b/tests/Flow/FlowTrait.php @@ -22,10 +22,10 @@ trait FlowTrait protected static function matrix(Closure $datas): array { $drivers = [ - // 'amp' => fn (): AmpDriver => new AmpDriver(), - // 'fiber' => fn (): FiberDriver => new FiberDriver(), + 'amp' => static fn (): AmpDriver => new AmpDriver(), + 'fiber' => static fn (): FiberDriver => new FiberDriver(), 'react' => static fn (): ReactDriver => new ReactDriver(), - // 'revolt' => static fn (): RevoltDriver => new RevoltDriver(), + 'revolt' => static fn (): RevoltDriver => new RevoltDriver(), // 'swoole' => fn (): SwooleDriver => new SwooleDriver(), ]; diff --git a/tests/Flow/TransportFlowTest.php b/tests/Flow/TransportFlowTest.php index d38c2f19..20c0511a 100644 --- a/tests/Flow/TransportFlowTest.php +++ b/tests/Flow/TransportFlowTest.php @@ -42,16 +42,20 @@ public function testJobs(DriverInterface $driver, IpStrategyInterface $ipStrateg new TransportFlow($flow, $transport1, $transport2, $driver); - $driver->tick(1, static function () use ($transport2, $resultNumber) { + $driver->tick(1, static function () use ($driver, $transport2, $resultNumber) { $ips = $transport2->get(); foreach ($ips as $ip) { $data = $ip->getMessage(); self::assertEquals($resultNumber, $data['number']); } + + $driver->stop(); }); $envelope = new Envelope(new ArrayObject(['number' => 0])); - $transport1->send($envelope);*/ + $transport1->send($envelope); + + $driver->start();*/ } /** diff --git a/tests/Flow/YFlowTest.php b/tests/Flow/YFlowTest.php index 8a200664..453da13c 100644 --- a/tests/Flow/YFlowTest.php +++ b/tests/Flow/YFlowTest.php @@ -31,10 +31,14 @@ public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy $ip = new Ip(new ArrayObject(['number' => 6])); $errorJob = static function () {}; $yFlow = new YFlow($job, $errorJob, $ipStrategy, $driver); - ($yFlow)($ip, static function (Ip $ip) use ($resultNumber) { + ($yFlow)($ip, static function (Ip $ip) use ($driver, $resultNumber) { + $driver->stop(); + self::assertSame(ArrayObject::class, $ip->data::class); self::assertSame($resultNumber, $ip->data['number']); }); + + $driver->start(); } /**