Skip to content

Commit

Permalink
✨ Add ParallelDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Ledru authored and matyo91 committed Nov 17, 2024
1 parent e6c5794 commit c3585b9
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Add `Flow\IpPool` for managing pools of Ips.
- Update `Flow\Event\PullEvent` to pull multiple Ips instead one.
- Move `Flow::do` to `FlowFactory::create`
- Add `Flow\Driver\ParallelDriver`

## v1.2.2

Expand Down
2 changes: 2 additions & 0 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ParallelDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
Expand All @@ -25,6 +26,7 @@
3 => new ReactDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
// 6 => new ParallelDriver(),
};
printf("Use %s\n", $driver::class);
printf("Calculating:\n");
Expand Down
3 changes: 3 additions & 0 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ParallelDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
Expand All @@ -23,7 +24,9 @@
3 => new FiberDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
// 6 => new ParallelDriver(),
};

printf("Use %s\n", $driver::class);

$factorial = static function (int $n) use (&$factorial): int {
Expand Down
205 changes: 205 additions & 0 deletions src/Driver/ParallelDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
<?php

declare(strict_types=1);

namespace Flow\Driver;

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use parallel\Runtime;
use RuntimeException as NativeRuntimeException;
use Throwable;

use function array_key_exists;
use function count;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class ParallelDriver implements DriverInterface
{
use DriverTrait;

/**
* @var array<mixed>
*/
private array $ticks = [];

public function __construct()
{
if (!class_exists(Runtime::class)) {
throw new NativeRuntimeException('Parallel extension is not loaded. Suggest install it with pecl install parallel');
}
}

public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
$runtime = new Runtime('vendor/autoload.php');

return $runtime->run(static function () use ($callback, $args) {
try {
return $callback(...$args);
} catch (Throwable $exception) {
return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
});
};
}

public function defer(Closure $callback): mixed
{
$runtime = new Runtime('vendor/autoload.php');

return $runtime->run(static function () use ($callback) {
try {
$result = null;
$callback(
static function ($value) use (&$result) {
$result = $value;
},
static function ($fn, $next) {
$fn($next);
}
);

return $result;
} catch (Throwable $exception) {
return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
});
}

public function await(array &$stream): void
{
$async = function ($isTick) use (&$parallelDatas) {
return function (Closure|JobInterface $job) use (&$parallelDatas, $isTick) {
return function (mixed $data) use (&$parallelDatas, $isTick, $job) {
$async = $this->async($job);

$parallel = $async($data);

$next = static function ($return) {};

$parallelDatas[] = [
'parallel' => $parallel,
'next' => static function ($return) use (&$next) {
$next($return);
},
];

return static function (Closure $callback) use ($isTick, &$next) {
if ($isTick === false) {
$next = static function ($return) use ($callback) {
$callback($return);
};
}
};
};
};
};

$defer = static function ($isTick) use (&$parallelDatas) {
return static function (Closure|JobInterface $job) use ($isTick, &$parallelDatas) {
return static function (Closure $next) use ($isTick, $job, &$parallelDatas) {
$parallel = new Runtime('vendor/autoload.php');
$parallel->run(static function () use ($isTick, $job, $next) {
try {
$job(static function ($return) use ($isTick, $next) {
if ($isTick === false) {
$next($return);
}
}, static function ($fn, $next) {
$fn($next);
});
} catch (Throwable $exception) {
return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
});

$parallelDatas[] = [
'parallel' => $parallel,
'next' => static function ($return) {}, /*function ($return) use ($isTick, $next) {
if ($isTick === false) {
$next($return);
}
},*/
];
};
};
};

$tick = 0;
$parallelDatas = [];
do {
foreach ($this->ticks as [
'interval' => $interval,
'callback' => $callback,
]) {
if ($tick % $interval === 0) {
$ip = new Ip();
$async(true)($callback)($ip->data);
}
}

foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps();
foreach ($nextIps as $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException && array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
}), Event::ASYNC);
}
}

foreach ($parallelDatas as $i => $parallelData) { // @phpstan-ignore-line see https://github.com/phpstan/phpstan/issues/11468
if ($parallelData['parallel']->done()) {
$data = $parallelData['parallel']->value();
$parallelData['next']($data);
unset($parallelDatas[$i]);
}
}

$tick++;
} while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0);
}

public function delay(float $seconds): void
{
sleep((int) $seconds);
}

public function tick($interval, Closure $callback): Closure
{
$i = count($this->ticks) - 1;
$this->ticks[$i] = [
'interval' => $interval,
'callback' => $callback,
];

return function () use ($i) {
unset($this->ticks[$i]);
};
}
}
7 changes: 5 additions & 2 deletions tools/dev/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# syntax=docker/dockerfile:1.6.0

FROM php:8.3-fpm-alpine as php
#FROM php:8.3-fpm-alpine as php
FROM php:8.3-zts-alpine as php

RUN apk add --update make curl

Expand All @@ -18,8 +19,10 @@ RUN set -eux; \
pcntl \
pcov \
zip \
parallel \
;

WORKDIR /flow

CMD ["php-fpm"]
#CMD ["php-fpm"]
CMD ["tail", "-f", "/dev/null"]
1 change: 0 additions & 1 deletion tools/dev/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
php:
image: flow-php
Expand Down

0 comments on commit c3585b9

Please sign in to comment.