diff --git a/CHANGELOG.md b/CHANGELOG.md index 378b8e1..35ee188 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Add `Flow\IpPool` for managing pools of Ips. - Update `Flow\Event\PullEvent` to pull multiple Ips instead one. - Move `Flow::do` to `FlowFactory::create` +- Add `Flow\Driver\ParallelDriver` ## v1.2.2 diff --git a/examples/flow.php b/examples/flow.php index d49ee94..6fae3db 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -6,6 +6,7 @@ use Flow\Driver\AmpDriver; use Flow\Driver\FiberDriver; +use Flow\Driver\ParallelDriver; use Flow\Driver\ReactDriver; use Flow\Driver\SpatieDriver; use Flow\Driver\SwooleDriver; @@ -25,6 +26,7 @@ 3 => new ReactDriver(), 4 => new SwooleDriver(), // 5 => new SpatieDriver(), + // 6 => new ParallelDriver(), }; printf("Use %s\n", $driver::class); printf("Calculating:\n"); diff --git a/examples/yflow.php b/examples/yflow.php index ef53b70..47bb785 100644 --- a/examples/yflow.php +++ b/examples/yflow.php @@ -7,6 +7,7 @@ use Flow\AsyncHandler\DeferAsyncHandler; use Flow\Driver\AmpDriver; use Flow\Driver\FiberDriver; +use Flow\Driver\ParallelDriver; use Flow\Driver\ReactDriver; use Flow\Driver\SpatieDriver; use Flow\Driver\SwooleDriver; @@ -23,7 +24,9 @@ 3 => new FiberDriver(), 4 => new SwooleDriver(), // 5 => new SpatieDriver(), + // 6 => new ParallelDriver(), }; + printf("Use %s\n", $driver::class); $factorial = static function (int $n) use (&$factorial): int { diff --git a/src/Driver/ParallelDriver.php b/src/Driver/ParallelDriver.php new file mode 100644 index 0000000..9174a25 --- /dev/null +++ b/src/Driver/ParallelDriver.php @@ -0,0 +1,205 @@ + + */ +class ParallelDriver implements DriverInterface +{ + use DriverTrait; + + /** + * @var array + */ + private array $ticks = []; + + public function __construct() + { + if (!class_exists(Runtime::class)) { + throw new NativeRuntimeException('Parallel extension is not loaded. Suggest install it with pecl install parallel'); + } + } + + public function async(Closure|JobInterface $callback): Closure + { + return static function (...$args) use ($callback) { + $runtime = new Runtime('vendor/autoload.php'); + + return $runtime->run(static function () use ($callback, $args) { + try { + return $callback(...$args); + } catch (Throwable $exception) { + return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + }); + }; + } + + public function defer(Closure $callback): mixed + { + $runtime = new Runtime('vendor/autoload.php'); + + return $runtime->run(static function () use ($callback) { + try { + $result = null; + $callback( + static function ($value) use (&$result) { + $result = $value; + }, + static function ($fn, $next) { + $fn($next); + } + ); + + return $result; + } catch (Throwable $exception) { + return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + }); + } + + public function await(array &$stream): void + { + $async = function ($isTick) use (&$parallelDatas) { + return function (Closure|JobInterface $job) use (&$parallelDatas, $isTick) { + return function (mixed $data) use (&$parallelDatas, $isTick, $job) { + $async = $this->async($job); + + $parallel = $async($data); + + $next = static function ($return) {}; + + $parallelDatas[] = [ + 'parallel' => $parallel, + 'next' => static function ($return) use (&$next) { + $next($return); + }, + ]; + + return static function (Closure $callback) use ($isTick, &$next) { + if ($isTick === false) { + $next = static function ($return) use ($callback) { + $callback($return); + }; + } + }; + }; + }; + }; + + $defer = static function ($isTick) use (&$parallelDatas) { + return static function (Closure|JobInterface $job) use ($isTick, &$parallelDatas) { + return static function (Closure $next) use ($isTick, $job, &$parallelDatas) { + $parallel = new Runtime('vendor/autoload.php'); + $parallel->run(static function () use ($isTick, $job, $next) { + try { + $job(static function ($return) use ($isTick, $next) { + if ($isTick === false) { + $next($return); + } + }, static function ($fn, $next) { + $fn($next); + }); + } catch (Throwable $exception) { + return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); + } + }); + + $parallelDatas[] = [ + 'parallel' => $parallel, + 'next' => static function ($return) {}, /*function ($return) use ($isTick, $next) { + if ($isTick === false) { + $next($return); + } + },*/ + ]; + }; + }; + }; + + $tick = 0; + $parallelDatas = []; + do { + foreach ($this->ticks as [ + 'interval' => $interval, + 'callback' => $callback, + ]) { + if ($tick % $interval === 0) { + $ip = new Ip(); + $async(true)($callback)($ip->data); + } + } + + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { + return $async(false)($job); + }, static function (Closure|JobInterface $job) use ($defer) { + return $defer(false)($job); + }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); + } + } + + foreach ($parallelDatas as $i => $parallelData) { // @phpstan-ignore-line see https://github.com/phpstan/phpstan/issues/11468 + if ($parallelData['parallel']->done()) { + $data = $parallelData['parallel']->value(); + $parallelData['next']($data); + unset($parallelDatas[$i]); + } + } + + $tick++; + } while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0); + } + + public function delay(float $seconds): void + { + sleep((int) $seconds); + } + + public function tick($interval, Closure $callback): Closure + { + $i = count($this->ticks) - 1; + $this->ticks[$i] = [ + 'interval' => $interval, + 'callback' => $callback, + ]; + + return function () use ($i) { + unset($this->ticks[$i]); + }; + } +} diff --git a/tools/dev/Dockerfile b/tools/dev/Dockerfile index 3421511..3a588d5 100644 --- a/tools/dev/Dockerfile +++ b/tools/dev/Dockerfile @@ -1,6 +1,7 @@ # syntax=docker/dockerfile:1.6.0 -FROM php:8.3-fpm-alpine as php +#FROM php:8.3-fpm-alpine as php +FROM php:8.3-zts-alpine as php RUN apk add --update make curl @@ -18,8 +19,10 @@ RUN set -eux; \ pcntl \ pcov \ zip \ + parallel \ ; WORKDIR /flow -CMD ["php-fpm"] +#CMD ["php-fpm"] +CMD ["tail", "-f", "/dev/null"] diff --git a/tools/dev/docker-compose.yml b/tools/dev/docker-compose.yml index 3aad6ef..66bbead 100644 --- a/tools/dev/docker-compose.yml +++ b/tools/dev/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: php: image: flow-php