From 6e35db5702c8f65b4c259f573152bfe595fe3625 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Mon, 12 Aug 2024 23:34:17 +0200 Subject: [PATCH] :construction: Update FiberDriver for YFlow --- examples/ampYFlow.php | 4 +-- src/Driver/FiberDriver.php | 55 +++++++++++++++++++++++--------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/examples/ampYFlow.php b/examples/ampYFlow.php index 9173299..c8efe17 100644 --- a/examples/ampYFlow.php +++ b/examples/ampYFlow.php @@ -5,7 +5,7 @@ require __DIR__ . '/../vendor/autoload.php'; use Flow\AsyncHandler\DeferAsyncHandler; -use Flow\Driver\ReactDriver; +use Flow\Driver\FiberDriver; use Flow\Examples\Data\YFlowData; use Flow\Flow\Flow; use Flow\Ip; @@ -47,7 +47,7 @@ }); }; -$driver = new ReactDriver(); +$driver = new FiberDriver(); $flow = (new Flow($factorialYJobDeferBefore, null, null, null, null, $driver)) ->fn(new Flow($asyncFactorialDefer, null, null, null, new DeferAsyncHandler(), $driver)) diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 17846d5..01045ce 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -39,32 +39,43 @@ public function async(Closure $callback): Closure return static function () {}; } + public function defer(Closure $callback): mixed + { + return null; + } + public function await(array &$stream): void { - $async = static function ($ip, $fnFlows, $index, $isTick) use (&$fiberDatas) { - $fiber = new Fiber($fnFlows[$index]['job']); + $async = static function ($ip, $index, $isTick) use (&$fiberDatas) { + return static function (Closure $job) use (&$fiberDatas, $ip, $index, $isTick) { + return static function (mixed $data) use (&$fiberDatas, $ip, $index, $isTick, $job) { + $fiber = new Fiber($job); - $exception = null; + $exception = null; - try { - if ($ip->data === null) { - $fiber->start(); - } else { - $fiber->start($ip->data); - } - } catch (Throwable $fiberException) { - $exception = $fiberException; - } + try { + if ($data === null) { + $fiber->start(); + } else { + $fiber->start($data); + } + } catch (Throwable $fiberException) { + $exception = $fiberException; + } - $fiberDatas[] = [ - 'index' => $index, - 'fiber' => $fiber, - 'exception' => $exception, - 'ip' => $ip, - 'isTick' => $isTick, - ]; + $fiberDatas[] = [ + 'fiber' => $fiber, + 'exception' => $exception, + 'ip' => $ip, + 'index' => $index, + 'isTick' => $isTick, + ]; + }; + }; }; + $defer = static function (Closure $job) {}; + $tick = 0; $fiberDatas = []; while ($stream['ips'] > 0 or count($this->ticks) > 0) { @@ -75,7 +86,7 @@ public function await(array &$stream): void if ($tick % $interval === 0) { $ip = new Ip(); $stream['ips']++; - $async($ip, [['job' => $callback]], 0, true); + $async($ip, 0, true)($callback)($ip->data); } } @@ -84,7 +95,9 @@ public function await(array &$stream): void foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, false), Event::ASYNC); + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async, $nextIp, $index) { + return $async($nextIp, $index, false)($job); + }, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function () {}), Event::ASYNC); } } } while ($nextIp !== null);