From 8e1eff5690bdb58383a340e7fff16c1452d50fd5 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru <mathieu@darkwood.fr> Date: Sun, 27 Oct 2024 13:43:55 +0100 Subject: [PATCH] :recycle: Update Flow:do to FlowFactory:create --- CHANGELOG.md | 1 + README.md | 3 +- examples/flow.php | 3 +- examples/server.php | 3 +- examples/yflow.php | 3 +- src/Flow/Flow.php | 92 +-------------------- src/Flow/FlowDecorator.php | 5 -- src/FlowFactory.php | 158 +++++++++++++++++++++++++++++++++++++ src/FlowInterface.php | 44 ----------- tests/Flow/FlowTest.php | 3 +- 10 files changed, 171 insertions(+), 144 deletions(-) create mode 100644 src/FlowFactory.php diff --git a/CHANGELOG.md b/CHANGELOG.md index f2e8e89..378b8e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Add event Event::POOL occurs when Flow needs to count IPs to process. - Add `Flow\IpPool` for managing pools of Ips. - Update `Flow\Event\PullEvent` to pull multiple Ips instead one. +- Move `Flow::do` to `FlowFactory::create` ## v1.2.2 diff --git a/README.md b/README.md index 03399db..8d648bc 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ composer require darkwood/flow <?php use Flow\Flow\Flow; +use Flow\FlowFactory; use Flow\Ip; class D1 { @@ -45,7 +46,7 @@ class D4 { public function __construct(public int $n4) {} } -$flow = Flow::do(static function() { +$flow = (new FlowFactory())->create(static function() { yield fn (D1 $data1) => new D2($data1->n1 += 1); yield fn (D2 $data2) => new D3($data2->n2 * 2); yield function(D3 $data3) { diff --git a/examples/flow.php b/examples/flow.php index 63fb476..e0d752a 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -15,6 +15,7 @@ use Flow\Examples\Model\DataD; use Flow\ExceptionInterface; use Flow\Flow\Flow; +use Flow\FlowFactory; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; use Flow\Job\ClosureJob; @@ -92,7 +93,7 @@ $asyncTask = static function ($job1, $job2, $job3, $errorJob1, $errorJob2, $driver) { echo "begin - flow asynchronous\n"; - $flow = Flow::do(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) { + $flow = (new FlowFactory())->create(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) { yield [$job1, $errorJob1, new MaxIpStrategy(2)]; yield [$job2, $errorJob2, new MaxIpStrategy(2)]; yield $job3; diff --git a/examples/server.php b/examples/server.php index 3c3375f..5ff5918 100644 --- a/examples/server.php +++ b/examples/server.php @@ -13,6 +13,7 @@ use Flow\Examples\Transport\DoctrineIpTransport; use Flow\ExceptionInterface; use Flow\Flow\Flow; +use Flow\FlowFactory; use Flow\Flow\TransportFlow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; @@ -74,7 +75,7 @@ printf("%s\n", $exception->getMessage()); }; -$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) { +$flow = (new FlowFactory())->create(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) { yield [$addOneJob, $errorJob, new MaxIpStrategy(1)]; yield [$multbyTwoJob, $errorJob, new MaxIpStrategy(3)]; yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)]; diff --git a/examples/yflow.php b/examples/yflow.php index a0d92a2..490f706 100644 --- a/examples/yflow.php +++ b/examples/yflow.php @@ -12,6 +12,7 @@ use Flow\Driver\SwooleDriver; use Flow\Examples\Model\YFlowData; use Flow\Flow\Flow; +use Flow\FlowFactory; use Flow\Flow\YFlow; use Flow\Ip; use Flow\Job\YJob; @@ -194,7 +195,7 @@ }); }; -$flow = Flow::do(static function () use ( +$flow = (new FlowFactory())->create(static function () use ( $factorialJob, $factorialYJobBefore, $factorialYJob, diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index a5f588f..5730fad 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -11,20 +11,16 @@ use Flow\DriverInterface; use Flow\Event; use Flow\Event\PushEvent; -use Flow\Exception\LogicException; use Flow\ExceptionInterface; +use Flow\FlowFactory; use Flow\FlowInterface; use Flow\Ip; use Flow\IpStrategy\LinearIpStrategy; use Flow\IpStrategyInterface; use Flow\JobInterface; -use Generator; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -use function array_key_exists; -use function is_array; - /** * @template T1 * @template T2 @@ -91,38 +87,9 @@ public function __invoke(Ip $ip): void $this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH); } - public static function do(callable $callable, ?array $config = null): FlowInterface - { - /** - * @var Closure|Generator $generator - */ - $generator = $callable(); - - if ($generator instanceof Generator) { - $flows = []; - - while ($generator->valid()) { - $flow = self::flowUnwrap($generator->current(), $config); - - $generator->send($flow); - - $flows[] = $flow; - } - - $return = $generator->getReturn(); - if (!empty($return)) { - $flows[] = self::flowUnwrap($return, $config); - } - - return self::flowMap($flows); - } - - return self::flowUnwrap($generator, $config); - } - public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface { - $flow = self::flowUnwrap($flow, ['driver' => $this->driver]); + $flow = (new FlowFactory($this->driver))->createFlow($flow); $this->stream['fnFlows'][] = [ 'job' => $flow->job, @@ -137,59 +104,4 @@ public function await(): void { $this->driver->await($this->stream); } - - /** - * @param array<mixed>|Closure|FlowInterface<mixed> $flow - * @param ?array<mixed> $config - * - * @return Flow<mixed, mixed> - * - * #param ?array{ - * 0: Closure, - * 1?: Closure, - * 2?: IpStrategyInterface, - * 3?: EventDispatcherInterface, - * 4?: AsyncHandlerInterface, - * 5?: DriverInterface - * }|array{ - * "ipStrategy"?: IpStrategyInterface, - * "dispatcher"?: EventDispatcherInterface, - * "asyncHandler"?: AsyncHandlerInterface, - * "driver"?: DriverInterface - * } $config - */ - private static function flowUnwrap($flow, ?array $config = null): FlowInterface - { - if ($flow instanceof Closure || $flow instanceof JobInterface) { - return new self(...[...['job' => $flow], ...($config ?? [])]); - } - if (is_array($flow)) { - if (array_key_exists(0, $flow) || array_key_exists('job', $flow)) { - return new self(...[...$flow, ...($config ?? [])]); - } - - return self::flowMap($flow); - } - - return $flow; - } - - /** - * @param array<FlowInterface<mixed>> $flows - * - * @return FlowInterface<mixed> - */ - private static function flowMap(array $flows) - { - $flow = array_shift($flows); - if (null === $flow) { - throw new LogicException('Flow is empty'); - } - - foreach ($flows as $flowIt) { - $flow = $flow->fn($flowIt); - } - - return $flow; - } } diff --git a/src/Flow/FlowDecorator.php b/src/Flow/FlowDecorator.php index f91e7bb..72d335f 100644 --- a/src/Flow/FlowDecorator.php +++ b/src/Flow/FlowDecorator.php @@ -32,11 +32,6 @@ public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterfac return $this->flow->fn($flow); } - public static function do(callable $callable, ?array $config = null): FlowInterface - { - return Flow::do($callable, $config); - } - public function await(): void { $this->flow->await(); diff --git a/src/FlowFactory.php b/src/FlowFactory.php new file mode 100644 index 0000000..6ea1c5d --- /dev/null +++ b/src/FlowFactory.php @@ -0,0 +1,158 @@ +<?php + +declare(strict_types=1); + +namespace Flow; + +use Closure; +use Flow\DriverInterface; +use Flow\Exception\LogicException; +use Flow\Flow\Flow; +use Flow\FlowInterface; +use Flow\JobInterface; +use Generator; + +use function array_key_exists; +use function is_array; + +class FlowFactory +{ + /** + * @param null|DriverInterface<mixed,mixed> $driver + */ + public function __construct( + private ?DriverInterface $driver = null + ) {} + + /** + * Do-notation a.k.a. for-comprehension. + * + * Syntax sugar for sequential {@see FlowInterface::fn()} calls + * + * Syntax "$flow = yield $wrapedFlow" mean: + * 1) $wrapedFlow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance + * 2) $flow is assigned as FlowInterface instance + * 3) optionnaly you can return another wrapedFlow + * + * ```php + * $flow = (new FlowFactory())->create(static function() { + * yield new Flow(fn($a) => $a + 1); + * $flow = yield fn($b) => $b * 2; + * $flow = yield $flow->fn([fn($c) => $c * 4]) + * return [$flow, [fn($d) => $d - 8]]; + * }); + * ``` + * $config if provided will be the fallback array configuration for Flow instanciation + * + * @param callable(): Generator|Closure $callable + * @param ?array<mixed> $config + * + * #param ?array{ + * 0: Closure|array, + * 1?: Closure|array, + * 2?: IpStrategyInterface<mixed>, + * 3?: EventDispatcherInterface, + * 4?: AsyncHandlerInterface, + * 5?: DriverInterface + * }|array{ + * "jobs"?: JobInterface|Closure|array, + * "errorJobs"?: JobInterface|Closure|array, + * "ipStrategy"?: IpStrategyInterface<mixed>, + * "dispatcher"?: EventDispatcherInterface, + * "asyncHandler"?: AsyncHandlerInterface, + * "driver"?: DriverInterface + * } $config + * + * @return FlowInterface<mixed> + */ + public function create(callable $callable, ?array $config = null): FlowInterface + { + /** + * @var Closure|Generator $generator + */ + $generator = $callable(); + + if ($generator instanceof Generator) { + return $this->createFromGenerator($generator, $config); + } + + return $this->createFlow($generator, $config); + } + + /** + * @param array<mixed>|Closure|FlowInterface<mixed> $flow + * @param ?array<mixed> $config + * + * @return Flow<mixed, mixed> + * + * #param ?array{ + * 0: Closure, + * 1?: Closure, + * 2?: IpStrategyInterface, + * 3?: EventDispatcherInterface, + * 4?: AsyncHandlerInterface, + * 5?: DriverInterface + * }|array{ + * "ipStrategy"?: IpStrategyInterface, + * "dispatcher"?: EventDispatcherInterface, + * "asyncHandler"?: AsyncHandlerInterface, + * "driver"?: DriverInterface + * } $config + */ + public function createFlow($flow, ?array $config = null): Flow + { + if ($flow instanceof Closure || $flow instanceof JobInterface) { + return new Flow(...[...['job' => $flow, 'driver' => $this->driver], ...($config ?? [])]); + } + if (is_array($flow)) { + if (array_key_exists(0, $flow) || array_key_exists('job', $flow)) { + return new Flow(...[...$flow, ...['driver' => $this->driver], ...($config ?? [])]); + } + + return $this->createFlowMap($flow); + } + + return $flow; + } + + /** + * @param ?array<mixed> $config + * @return FlowInterface<mixed> + */ + private function createFromGenerator(Generator $generator, ?array $config = null): FlowInterface + { + $flows = []; + + while ($generator->valid()) { + $flow = $this->createFlow($generator->current(), $config); + $generator->send($flow); + $flows[] = $flow; + } + + $return = $generator->getReturn(); + if (!empty($return)) { + $flows[] = $this->createFlow($return, $config); + } + + return $this->createFlowMap($flows); + } + + /** + * @param array<Flow<mixed, mixed>> $flows + * + * @return Flow<mixed, mixed> + */ + private function createFlowMap(array $flows): Flow + { + $flow = array_shift($flows); + if (null === $flow) { + throw new LogicException('Flow is empty'); + } + + foreach ($flows as $flowIt) { + $flow = $flow->fn($flowIt); + } + + return $flow; + } +} diff --git a/src/FlowInterface.php b/src/FlowInterface.php index 6f27a20..f718994 100644 --- a/src/FlowInterface.php +++ b/src/FlowInterface.php @@ -5,7 +5,6 @@ namespace Flow; use Closure; -use Generator; /** * @template T1 @@ -37,49 +36,6 @@ public function __invoke(Ip $ip): void; */ public function fn(array|Closure|JobInterface|self $flow): self; - /** - * Do-notation a.k.a. for-comprehension. - * - * Syntax sugar for sequential {@see FlowInterface::fn()} calls - * - * Syntax "$flow = yield $wrapedFlow" mean: - * 1) $wrapedFlow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance - * 2) $flow is assigned as FlowInterface instance - * 3) optionnaly you can return another wrapedFlow - * - * ```php - * $flow = Flow::do(static function() { - * yield new Flow(fn($a) => $a + 1); - * $flow = yield fn($b) => $b * 2; - * $flow = yield $flow->fn([fn($c) => $c * 4]) - * return [$flow, [fn($d) => $d - 8]]; - * }); - * ``` - * $config if provided will be the fallback array configuration for Flow instanciation - * - * @param callable(): Generator|Closure $callable - * @param ?array<mixed> $config - * - * #param ?array{ - * 0: Closure|array, - * 1?: Closure|array, - * 2?: IpStrategyInterface<mixed>, - * 3?: EventDispatcherInterface, - * 4?: AsyncHandlerInterface, - * 5?: DriverInterface - * }|array{ - * "jobs"?: JobInterface|Closure|array, - * "errorJobs"?: JobInterface|Closure|array, - * "ipStrategy"?: IpStrategyInterface<mixed>, - * "dispatcher"?: EventDispatcherInterface, - * "asyncHandler"?: AsyncHandlerInterface, - * "driver"?: DriverInterface - * } $config - * - * @return FlowInterface<mixed> - */ - public static function do(callable $callable, ?array $config = null): self; - /** * Await asynchonous call for current IPs. * After await, all IPs have been proceed, it continues synchronously. diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index e3c0149..c642c12 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -14,6 +14,7 @@ use Flow\DriverInterface; use Flow\ExceptionInterface; use Flow\Flow\Flow; +use Flow\FlowFactory; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; use Flow\Job\ClosureJob; @@ -124,7 +125,7 @@ static function (ExceptionInterface $exception) use ($cancel) { public function testDo(DriverInterface $driver, callable $callable, ?array $config, int $resultNumber): void { $ip = new Ip(new ArrayObject(['number' => 0])); - $flow = Flow::do($callable, [ + $flow = (new FlowFactory())->create($callable, [ ...['driver' => $driver], ...($config ?? []), ])->fn(static function (ArrayObject $data) use ($resultNumber) {