From 82873714aff9e41d49d2f67bfe568c85173867ac Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Wed, 14 Aug 2024 17:38:00 +0200 Subject: [PATCH] :white_check_mark: YFlow with FiberDriver pass --- examples/yflow.php | 4 ++-- src/Driver/FiberDriver.php | 22 ++++++++++++++++++---- tests/Flow/FlowTest.php | 4 ++-- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/examples/yflow.php b/examples/yflow.php index 4e48a05..ac95430 100644 --- a/examples/yflow.php +++ b/examples/yflow.php @@ -15,10 +15,10 @@ use Flow\Flow\YFlow; use Flow\Ip; -$driver = match (random_int(1, 2)) { +$driver = match (random_int(1, 3)) { 1 => new AmpDriver(), 2 => new ReactDriver(), - // 3 => new FiberDriver(), + 3 => new FiberDriver(), // 4 => new SwooleDriver(), // 5 => new SpatieDriver(), }; diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 900e25a..fceb17c 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -96,11 +96,25 @@ public function await(array &$stream): void }; }; - $defer = static function ($isTick) use ($async) { - return static function (Closure $job) use ($async, $isTick) { - $asyncJob = $async($isTick); + $defer = static function ($isTick) { + return static function (Closure $job) use ($isTick) { + return static function (Closure $next) use ($isTick, $job) { + $fiber = new Fiber(static function () use ($isTick, $job, $next) { + try { + $job(static function ($return) use ($isTick, $next) { + if ($isTick === false) { + $next($return); + } + }, static function ($fn, $next) { + $fn($next); + }); + } catch (Throwable $exception) { + Fiber::suspend(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); + } + }); - return $asyncJob($job); + $fiber->start(); + }; }; }; diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index d0b3082..9cda89e 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -155,14 +155,14 @@ public static function provideJobCases(): iterable }, $strategyBuilder(), new AsyncHandler()]], 5]; $strategy = $strategyBuilder(); - if(!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) { + if (!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) { $cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) { $driver->delay(1 / 1000); $data['number'] = 5; return $data; }, $strategy, new AsyncHandler()]], 5]; - }; + } $cases['exceptionJob'] = [[[static function () use ($exception) { throw $exception;