diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 070412a..900e25a 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -68,29 +68,37 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { - $async = function ($ip, $index, $isTick) use (&$fiberDatas) { - return function (Closure $job) use (&$fiberDatas, $ip, $index, $isTick) { - return function (mixed $data) use (&$fiberDatas, $ip, $index, $isTick, $job) { + $async = function ($isTick) use (&$fiberDatas) { + return function (Closure $job) use (&$fiberDatas, $isTick) { + return function (mixed $data) use (&$fiberDatas, $isTick, $job) { $async = $this->async($job); $fiber = $async($data); $fiber->start(); + $next = static function ($return) {}; + $fiberDatas[] = [ 'fiber' => $fiber, - 'ip' => $ip, - 'index' => $index, - 'isTick' => $isTick, + 'next' => static function ($return) use (&$next) { + $next($return); + }, ]; - return static function () {}; + return static function (Closure $callback) use ($isTick, &$next) { + if ($isTick === false) { + $next = static function ($return) use ($callback) { + $callback($return); + }; + } + }; }; }; }; - $defer = static function ($ip, $index, $isTick) use ($async) { - return static function (Closure $job) use ($async, $ip, $index, $isTick) { - $asyncJob = $async($ip, $index, $isTick); + $defer = static function ($isTick) use ($async) { + return static function (Closure $job) use ($async, $isTick) { + $asyncJob = $async($isTick); return $asyncJob($job); }; @@ -105,8 +113,7 @@ public function await(array &$stream): void ]) { if ($tick % $interval === 0) { $ip = new Ip(); - $stream['ips']++; - $async($ip, 0, true)($callback)($ip->data); + $async(true)($callback)($ip->data); } } @@ -115,11 +122,22 @@ public function await(array &$stream): void foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async, $nextIp, $index) { - return $async($nextIp, $index, false)($job); - }, static function (Closure $job) use ($defer, $nextIp, $index) { - return $defer($nextIp, $index, false)($job); - }, $stream['fnFlows'][$index]['job'], $nextIp, static function () {}), Event::ASYNC); + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) { + return $async(false)($job); + }, static function (Closure $job) use ($defer) { + return $defer(false)($job); + }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['ips']++; + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + $stream['ips']--; + }), Event::ASYNC); } } } while ($nextIp !== null); @@ -129,17 +147,7 @@ public function await(array &$stream): void $fiberData['fiber']->resume(); } else { $data = $fiberData['fiber']->getReturn(); - - if ($data instanceof RuntimeException and array_key_exists($fiberData['index'], $stream['fnFlows']) and $stream['fnFlows'][$fiberData['index']]['errorJob'] !== null) { - $stream['fnFlows'][$fiberData['index']]['errorJob']($data); - } elseif ($fiberData['isTick'] === false and array_key_exists($fiberData['index'] + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), Event::POP); - $stream['ips']--; + $fiberData['next']($data); unset($fiberDatas[$i]); } } diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index 858903b..d0b3082 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -9,6 +9,7 @@ use Flow\AsyncHandler\BatchAsyncHandler; use Flow\AsyncHandler\DeferAsyncHandler; use Flow\Driver\AmpDriver; +use Flow\Driver\FiberDriver; use Flow\Driver\ReactDriver; use Flow\DriverInterface; use Flow\ExceptionInterface; @@ -145,22 +146,27 @@ public static function provideJobCases(): iterable $exception = new RuntimeException('job error'); return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) { - $cases = [ - 'job' => [[[static function (ArrayObject $data) { - $data['number'] = 5; + $cases = []; - return $data; - }, $strategyBuilder(), new AsyncHandler()]], 5], - 'asyncJob' => [[[static function (ArrayObject $data) use ($driver) { + $cases['job'] = [[[static function (ArrayObject $data) { + $data['number'] = 5; + + return $data; + }, $strategyBuilder(), new AsyncHandler()]], 5]; + + $strategy = $strategyBuilder(); + if(!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) { + $cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) { $driver->delay(1 / 1000); $data['number'] = 5; return $data; - }, $strategyBuilder(), new AsyncHandler()]], 5], - 'exceptionJob' => [[[static function () use ($exception) { - throw $exception; - }, $strategyBuilder(), new AsyncHandler()]], 0], - ]; + }, $strategy, new AsyncHandler()]], 5]; + }; + + $cases['exceptionJob'] = [[[static function () use ($exception) { + throw $exception; + }, $strategyBuilder(), new AsyncHandler()]], 0]; if ($driver instanceof AmpDriver || $driver instanceof ReactDriver) { $cases['deferJob'] = [[[static function ($args) {