From 67b2ebd76c32580b7f1e72a28044d4d8b80eff49 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Sat, 17 Aug 2024 21:55:17 +0200 Subject: [PATCH] :sparkles: Add Symfony Bridge --- CHANGELOG.md | 3 ++ .../content/en/docs/getting-started/job.md | 20 +++++++++++++ examples/Job/GenericJob.php | 29 ++++++++++++++++++ examples/flow.php | 5 ++-- src/Attribute/AsJob.php | 16 ++++++++++ src/Driver/AmpDriver.php | 9 +++--- src/Driver/FiberDriver.php | 11 +++---- src/Driver/ReactDriver.php | 7 +++-- src/Driver/SpatieDriver.php | 7 +++-- src/Driver/SwooleDriver.php | 7 +++-- src/DriverInterface.php | 6 ++-- src/Event/AsyncEvent.php | 15 ++++++---- src/Flow/Flow.php | 21 ++++++------- src/Flow/FlowDecorator.php | 3 +- src/Flow/YFlow.php | 13 ++++---- src/FlowInterface.php | 30 +++++++++---------- src/JobInterface.php | 19 ++++++++++++ tests/AsyncHandler/DeferAsyncHandlerTest.php | 2 +- tests/Flow/FlowTest.php | 9 +++++- tests/Job/GenericJob.php | 29 ++++++++++++++++++ 20 files changed, 199 insertions(+), 62 deletions(-) create mode 100644 docs/src/content/en/docs/getting-started/job.md create mode 100644 examples/Job/GenericJob.php create mode 100644 src/Attribute/AsJob.php create mode 100644 src/JobInterface.php create mode 100644 tests/Job/GenericJob.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b5f140c..d947c8ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ - Flow\Flow\YFlow rework - Add more exemples in `examples/yflow.php` to play with Y-Combinators - Update DX for Flow\DriverInterface : add `defer` to gain much granular control on asynchronous callbacks +- Add Symfony Bridge + - Flow can now use `Flow\JobInterface` as job input + - new `Flow\Attribute\AsJob` attribute allows cast job on function or class and embed it's name and description ## v1.2.0 diff --git a/docs/src/content/en/docs/getting-started/job.md b/docs/src/content/en/docs/getting-started/job.md new file mode 100644 index 00000000..181037ec --- /dev/null +++ b/docs/src/content/en/docs/getting-started/job.md @@ -0,0 +1,20 @@ +--- +title: "Job" +description: "Job." +lead: "Job." +date: 2020-10-13T15:21:01+02:00 +lastmod: 2020-10-13T15:21:01+02:00 +draft: false +images: [] +menu: + docs: + parent: "getting-started" +weight: 10 +toc: true +--- + +# Job + +## Make your own Job + +You can make your custom Job by implementing `Flow\JobInterface`. diff --git a/examples/Job/GenericJob.php b/examples/Job/GenericJob.php new file mode 100644 index 00000000..93a812b4 --- /dev/null +++ b/examples/Job/GenericJob.php @@ -0,0 +1,29 @@ + + */ +class GenericJob implements JobInterface +{ + /** + * @param Closure(TArgs): TReturn $job + */ + public function __construct(private Closure $job) {} + + public function __invoke($data): mixed + { + $job = $this->job; + + return $job($data); + } +} diff --git a/examples/flow.php b/examples/flow.php index 9faac36b..8f9ecd48 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -9,6 +9,7 @@ use Flow\Driver\ReactDriver; use Flow\Driver\SpatieDriver; use Flow\Driver\SwooleDriver; +use Flow\Examples\Job\GenericJob; use Flow\Examples\Model\DataA; use Flow\Examples\Model\DataB; use Flow\Examples\Model\DataC; @@ -49,7 +50,7 @@ return new DataB($dataA->id, $d, $dataA->c); }; -$job2 = static function (DataB $dataB) use ($driver): DataC { +$job2 = new GenericJob(static function (DataB $dataB) use ($driver): DataC { printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e); // simulating calculating some "heavy" operation from from 1 to 3 seconds @@ -65,7 +66,7 @@ printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay); return new DataC($dataB->id, $f); -}; +}); $job3 = static function (DataC $dataC): DataD { printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f); diff --git a/src/Attribute/AsJob.php b/src/Attribute/AsJob.php new file mode 100644 index 00000000..6e99695c --- /dev/null +++ b/src/Attribute/AsJob.php @@ -0,0 +1,16 @@ + */ - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { - return async(static function (Closure $callback, array $args) { + return async(static function (Closure|JobInterface $callback, array $args) { try { return $callback(...$args, ...($args = [])); } catch (Throwable $exception) { @@ -86,7 +87,7 @@ public function defer(Closure $callback): Future public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -99,7 +100,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function (Closure $map) use ($job) { /** @var Closure(TReturn): mixed $map */ $future = $this->defer($job); diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 53d7065d..451297fe 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -16,6 +16,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use Throwable; use function array_key_exists; @@ -34,7 +35,7 @@ class FiberDriver implements DriverInterface */ private array $ticks = []; - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return new Fiber(static function () use ($callback, $args) { @@ -69,7 +70,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { $async = function ($isTick) use (&$fiberDatas) { - return function (Closure $job) use (&$fiberDatas, $isTick) { + return function (Closure|JobInterface $job) use (&$fiberDatas, $isTick) { return function (mixed $data) use (&$fiberDatas, $isTick, $job) { $async = $this->async($job); @@ -97,7 +98,7 @@ public function await(array &$stream): void }; $defer = static function ($isTick) { - return static function (Closure $job) use ($isTick) { + return static function (Closure|JobInterface $job) use ($isTick) { return static function (Closure $next) use ($isTick, $job) { $fiber = new Fiber(static function () use ($isTick, $job, $next) { try { @@ -136,9 +137,9 @@ public function await(array &$stream): void foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { return $async(false)($job); - }, static function (Closure $job) use ($defer) { + }, static function (Closure|JobInterface $job) use ($defer) { return $defer(false)($job); }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) { diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 60453b0c..0d016563 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -13,6 +13,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Promise\Deferred; @@ -46,7 +47,7 @@ public function __construct(?LoopInterface $eventLoop = null) $this->eventLoop = $eventLoop ?? Loop::get(); } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return async(static function () use ($callback, $args) { @@ -81,7 +82,7 @@ public function defer(Closure $callback): Promise public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -93,7 +94,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function ($then) use ($job) { $promise = $this->defer($job); $promise->then($then); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index c6e8c3b4..b008e5d2 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -15,6 +15,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use RuntimeException as NativeRuntimeException; use Spatie\Async\Pool; use Throwable; @@ -45,7 +46,7 @@ public function __construct() } } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return function (...$args) use ($callback) { return function ($onResolve) use ($callback, $args) { @@ -67,7 +68,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -75,7 +76,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function (Closure $onResolve) use ($job) { $this->pool->add(static function () use ($job, $onResolve) { return $job($onResolve, static function ($fn, $next) { diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index d129eaa4..413c99dd 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -14,6 +14,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use OpenSwoole\Timer; use RuntimeException as NativeRuntimeException; use Throwable; @@ -38,7 +39,7 @@ public function __construct() } } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return static function ($onResolve) use ($callback, $args) { @@ -61,7 +62,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -69,7 +70,7 @@ public function await(array &$stream): void }; }; - $defer = static function (Closure $job) { + $defer = static function (Closure|JobInterface $job) { return static function (Closure $onResolve) use ($job) { go(static function () use ($job, $onResolve) { try { diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 55298e29..69a588b7 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -13,11 +13,11 @@ interface DriverInterface { /** - * #return Closure(TArgs): void when called this start async $callback. + * #return JobInterface|Closure(TArgs): void when called this start async $callback. * - * @param Closure(TArgs): TReturn $callback + * @param Closure(TArgs): TReturn|JobInterface $callback */ - public function async(Closure $callback): Closure; + public function async(Closure|JobInterface $callback): Closure; /** * This allow more granular control on async diff --git a/src/Event/AsyncEvent.php b/src/Event/AsyncEvent.php index 83046438..ddc5f0e7 100644 --- a/src/Event/AsyncEvent.php +++ b/src/Event/AsyncEvent.php @@ -6,20 +6,22 @@ use Closure; use Flow\Ip; +use Flow\JobInterface; use Symfony\Contracts\EventDispatcher\Event; /** - * @template T + * @template T1 */ final class AsyncEvent extends Event { /** - * @param Ip $ip + * @param Closure|JobInterface $job + * @param Ip $ip */ public function __construct( private Closure $async, private Closure $defer, - private Closure $job, + private Closure|JobInterface $job, private Ip $ip, private Closure $callback ) {} @@ -34,13 +36,16 @@ public function getDefer(): Closure return $this->defer; } - public function getJob(): Closure + /** + * @return Closure|JobInterface + */ + public function getJob(): Closure|JobInterface { return $this->job; } /** - * @return Ip + * @return Ip */ public function getIp(): Ip { diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 903625ce..9630eed1 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -17,6 +17,7 @@ use Flow\Ip; use Flow\IpStrategy\LinearIpStrategy; use Flow\IpStrategyInterface; +use Flow\JobInterface; use Generator; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; @@ -42,12 +43,12 @@ class Flow implements FlowInterface ]; /** - * @var Closure(T1): T2 + * @var Closure(T1): T2|JobInterface */ private $job; /** - * @var null|Closure(ExceptionInterface): void + * @var null|Closure(ExceptionInterface): void|JobInterface */ private $errorJob; @@ -59,15 +60,15 @@ class Flow implements FlowInterface private DriverInterface $driver; /** - * @param Closure(T1): T2 $job - * @param Closure(ExceptionInterface): void $errorJob - * @param null|IpStrategyInterface $ipStrategy - * @param null|AsyncHandlerInterface $asyncHandler - * @param null|DriverInterface $driver + * @param Closure(T1): T2|JobInterface $job + * @param null|Closure(ExceptionInterface): void|JobInterface $errorJob + * @param null|IpStrategyInterface $ipStrategy + * @param null|AsyncHandlerInterface $asyncHandler + * @param null|DriverInterface $driver */ public function __construct( - Closure $job, - ?Closure $errorJob = null, + Closure|JobInterface $job, + null|Closure|JobInterface $errorJob = null, ?IpStrategyInterface $ipStrategy = null, ?EventDispatcherInterface $dispatcher = null, ?AsyncHandlerInterface $asyncHandler = null, @@ -121,7 +122,7 @@ public static function do(callable $callable, ?array $config = null): FlowInterf return self::flowUnwrap($generator, $config); } - public function fn(array|Closure|FlowInterface $flow): FlowInterface + public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface { $flow = self::flowUnwrap($flow, ['driver' => $this->driver]); diff --git a/src/Flow/FlowDecorator.php b/src/Flow/FlowDecorator.php index 3e0ae04d..f91e7bb3 100644 --- a/src/Flow/FlowDecorator.php +++ b/src/Flow/FlowDecorator.php @@ -7,6 +7,7 @@ use Closure; use Flow\FlowInterface; use Flow\Ip; +use Flow\JobInterface; /** * @template T1 @@ -26,7 +27,7 @@ public function __invoke(Ip $ip): void ($this->flow)($ip); } - public function fn(array|Closure|FlowInterface $flow): FlowInterface + public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface { return $this->flow->fn($flow); } diff --git a/src/Flow/YFlow.php b/src/Flow/YFlow.php index dd2eff77..069167bb 100644 --- a/src/Flow/YFlow.php +++ b/src/Flow/YFlow.php @@ -9,6 +9,7 @@ use Flow\DriverInterface; use Flow\ExceptionInterface; use Flow\IpStrategyInterface; +use Flow\JobInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; /** @@ -20,14 +21,14 @@ class YFlow extends Flow { /** - * @param null|Closure(ExceptionInterface): void $errorJob - * @param null|IpStrategyInterface $ipStrategy - * @param null|AsyncHandlerInterface $asyncHandler - * @param null|DriverInterface $driver + * @param null|Closure(ExceptionInterface): void|JobInterface $errorJob + * @param null|IpStrategyInterface $ipStrategy + * @param null|AsyncHandlerInterface $asyncHandler + * @param null|DriverInterface $driver */ public function __construct( - Closure $job, - ?Closure $errorJob = null, + Closure|JobInterface $job, + null|Closure|JobInterface $errorJob = null, ?IpStrategyInterface $ipStrategy = null, ?EventDispatcherInterface $dispatcher = null, ?AsyncHandlerInterface $asyncHandler = null, diff --git a/src/FlowInterface.php b/src/FlowInterface.php index 51225df6..2a0243a0 100644 --- a/src/FlowInterface.php +++ b/src/FlowInterface.php @@ -20,22 +20,22 @@ public function __invoke(Ip $ip): void; /** * @template T2 * - * @param array|Closure|FlowInterface $flow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance - * #param ?array{ - * 0: Closure, - * 1?: Closure, - * 2?: IpStrategyInterface, - * 3?: DriverInterface - * }|array{ - * "job"?: Closure, - * "errorJob"?: Closure, - * "ipStrategy"?: IpStrategyInterface, - * "driver"?: DriverInterface - * }|Closure|FlowInterface $config + * @param array|Closure(T1): T2|FlowInterface|JobInterface $flow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance + * #param ?array{ + * 0: Closure, + * 1?: Closure, + * 2?: IpStrategyInterface, + * 3?: DriverInterface + * }|array{ + * "job"?: JobInterface|Closure, + * "errorJob"?: JobInterface|Closure, + * "ipStrategy"?: IpStrategyInterface, + * "driver"?: DriverInterface + * }|Closure|FlowInterface $config * * @return FlowInterface */ - public function fn(array|Closure|self $flow): self; + public function fn(array|Closure|self|JobInterface $flow): self; /** * Do-notation a.k.a. for-comprehension. @@ -68,8 +68,8 @@ public function fn(array|Closure|self $flow): self; * 4?: AsyncHandlerInterface, * 5?: DriverInterface * }|array{ - * "jobs"?: Closure|array, - * "errorJobs"?: Closure|array, + * "jobs"?: JobInterface|Closure|array, + * "errorJobs"?: JobInterface|Closure|array, * "ipStrategy"?: IpStrategyInterface, * "dispatcher"?: EventDispatcherInterface, * "asyncHandler"?: AsyncHandlerInterface, diff --git a/src/JobInterface.php b/src/JobInterface.php new file mode 100644 index 00000000..b3e57412 --- /dev/null +++ b/src/JobInterface.php @@ -0,0 +1,19 @@ + $x, static fn ($x) => $x, - static function ($args) use (&$result) { + static function (array $args) use (&$result) { [[$n1, $n2], $defer] = $args; $result = $n1 + $n2; diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index 9cda89e3..fd7d9f0d 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -16,6 +16,7 @@ use Flow\Flow\Flow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; +use Flow\Test\Job\GenericJob; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -148,12 +149,18 @@ public static function provideJobCases(): iterable return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) { $cases = []; - $cases['job'] = [[[static function (ArrayObject $data) { + $cases['closureJob'] = [[[static function (ArrayObject $data) { $data['number'] = 5; return $data; }, $strategyBuilder(), new AsyncHandler()]], 5]; + $cases['classJob'] = [[[new GenericJob(static function (ArrayObject $data) { + $data['number'] = 6; + + return $data; + }), $strategyBuilder(), new AsyncHandler()]], 6]; + $strategy = $strategyBuilder(); if (!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) { $cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) { diff --git a/tests/Job/GenericJob.php b/tests/Job/GenericJob.php new file mode 100644 index 00000000..5b47c8b4 --- /dev/null +++ b/tests/Job/GenericJob.php @@ -0,0 +1,29 @@ + + */ +class GenericJob implements JobInterface +{ + /** + * @param Closure(TArgs): TReturn $job + */ + public function __construct(private Closure $job) {} + + public function __invoke($data): mixed + { + $job = $this->job; + + return $job($data); + } +}