diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bbce56..b2c017e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v1.2.x + +- Flow can now use `Flow\JobInterface` as job input +- Add Symfony Bridge + - new `Flow\Attribute\AsJob` attribute allows cast job on function or class and embed it's name and description + ## v1.2.1 - Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP diff --git a/docs/src/content/en/docs/getting-started/job.md b/docs/src/content/en/docs/getting-started/job.md new file mode 100644 index 0000000..181037e --- /dev/null +++ b/docs/src/content/en/docs/getting-started/job.md @@ -0,0 +1,20 @@ +--- +title: "Job" +description: "Job." +lead: "Job." +date: 2020-10-13T15:21:01+02:00 +lastmod: 2020-10-13T15:21:01+02:00 +draft: false +images: [] +menu: + docs: + parent: "getting-started" +weight: 10 +toc: true +--- + +# Job + +## Make your own Job + +You can make your custom Job by implementing `Flow\JobInterface`. diff --git a/docs/src/content/en/docs/getting-started/ressources.md b/docs/src/content/en/docs/getting-started/ressources.md index 0e47a65..2494a02 100644 --- a/docs/src/content/en/docs/getting-started/ressources.md +++ b/docs/src/content/en/docs/getting-started/ressources.md @@ -93,6 +93,7 @@ Video of Y-Combinator : [https://www.youtube.com/watch?v=QSS_ZcO8Q1g](https://ww - Combinator : [https://github.com/loophp/combinator](https://github.com/loophp/combinator) - Lambda-php : [https://github.com/igorw/lambda-php](https://github.com/igorw/lambda-php) - Deriving the y combinator in 7 easy steps : [https://gist.github.com/igstan/388351](https://gist.github.com/igstan/388351) +- Y combinator real life application: recursive memoization in clojure : [https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html](https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html) ## Messaging approach with East oriented code from [Frédéric Hardy](https://twitter.com/mageekguy) diff --git a/examples/flow.php b/examples/flow.php index 9faac36..3a4ec3c 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -17,6 +17,7 @@ use Flow\Flow\Flow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; +use Flow\Job\ClosureJob; $driver = match (random_int(1, 4)) { 1 => new AmpDriver(), @@ -49,7 +50,7 @@ return new DataB($dataA->id, $d, $dataA->c); }; -$job2 = static function (DataB $dataB) use ($driver): DataC { +$job2 = new ClosureJob(static function (DataB $dataB) use ($driver): DataC { printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e); // simulating calculating some "heavy" operation from from 1 to 3 seconds @@ -65,7 +66,7 @@ printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay); return new DataC($dataB->id, $f); -}; +}); $job3 = static function (DataC $dataC): DataD { printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f); diff --git a/examples/yflow.php b/examples/yflow.php index aeac8b1..7d845d6 100644 --- a/examples/yflow.php +++ b/examples/yflow.php @@ -14,6 +14,8 @@ use Flow\Flow\Flow; use Flow\Flow\YFlow; use Flow\Ip; +use Flow\Job\YJob; +use Flow\JobInterface; $driver = match (random_int(1, 4)) { 1 => new AmpDriver(), @@ -29,12 +31,14 @@ function factorial(int $n): int return ($n <= 1) ? 1 : $n * factorial($n - 1); } -function Ywrap(callable $func, callable $wrapperFunc): Closure +/** + * @return JobInterface + */ +function Ywrap(callable $func, callable $wrapperFunc): JobInterface { - $U = static fn ($f) => $f($f); - $Y = static fn (callable $f, callable $g) => $U(static fn (Closure $x) => $f($g(static fn ($y) => $U($x)($y)))); + $wrappedFunc = static fn ($recurse) => $wrapperFunc(static fn (...$args) => $func($recurse)(...$args)); - return $Y($func, $wrapperFunc); + return new YJob($wrappedFunc); } function memoWrapperGenerator(callable $f): Closure @@ -50,7 +54,10 @@ function memoWrapperGenerator(callable $f): Closure }; } -function Ymemo(callable $f): Closure +/** + * @return JobInterface + */ +function Ymemo(callable $f): JobInterface { return Ywrap($f, 'memoWrapperGenerator'); } @@ -110,17 +117,13 @@ function factorialYMemo(int $n): int return new YFlowData($data->id, $data->number); }; -// Define the Y-Combinator -$U = static fn (Closure $f) => $f($f); -$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); - $factorialYJobDeferBefore = static function (YFlowData $data) { printf("...* #%d - Job 4 : Calculating factorialYJobDefer(%d)\n", $data->id, $data->number); return new YFlowData($data->id, $data->number, $data->number); }; -$factorialYJobDefer = $Y(static function ($factorial) { +$factorialYJobDefer = new YJob(static function ($factorial) { return static function ($args) use ($factorial) { [$data, $defer] = $args; diff --git a/src/Attribute/AsJob.php b/src/Attribute/AsJob.php new file mode 100644 index 0000000..a6ce7c8 --- /dev/null +++ b/src/Attribute/AsJob.php @@ -0,0 +1,16 @@ + */ - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { - return async(static function (Closure $callback, array $args) { + return async(static function (Closure|JobInterface $callback, array $args) { try { return $callback(...$args, ...($args = [])); } catch (Throwable $exception) { @@ -86,7 +87,7 @@ public function defer(Closure $callback): Future public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -99,7 +100,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function (Closure $map) use ($job) { /** @var Closure(TReturn): mixed $map */ $future = $this->defer($job); diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 53d7065..451297f 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -16,6 +16,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use Throwable; use function array_key_exists; @@ -34,7 +35,7 @@ class FiberDriver implements DriverInterface */ private array $ticks = []; - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return new Fiber(static function () use ($callback, $args) { @@ -69,7 +70,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { $async = function ($isTick) use (&$fiberDatas) { - return function (Closure $job) use (&$fiberDatas, $isTick) { + return function (Closure|JobInterface $job) use (&$fiberDatas, $isTick) { return function (mixed $data) use (&$fiberDatas, $isTick, $job) { $async = $this->async($job); @@ -97,7 +98,7 @@ public function await(array &$stream): void }; $defer = static function ($isTick) { - return static function (Closure $job) use ($isTick) { + return static function (Closure|JobInterface $job) use ($isTick) { return static function (Closure $next) use ($isTick, $job) { $fiber = new Fiber(static function () use ($isTick, $job, $next) { try { @@ -136,9 +137,9 @@ public function await(array &$stream): void foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { return $async(false)($job); - }, static function (Closure $job) use ($defer) { + }, static function (Closure|JobInterface $job) use ($defer) { return $defer(false)($job); }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) { diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 60453b0..0d01656 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -13,6 +13,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Promise\Deferred; @@ -46,7 +47,7 @@ public function __construct(?LoopInterface $eventLoop = null) $this->eventLoop = $eventLoop ?? Loop::get(); } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return async(static function () use ($callback, $args) { @@ -81,7 +82,7 @@ public function defer(Closure $callback): Promise public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -93,7 +94,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function ($then) use ($job) { $promise = $this->defer($job); $promise->then($then); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index c6e8c3b..b008e5d 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -15,6 +15,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use RuntimeException as NativeRuntimeException; use Spatie\Async\Pool; use Throwable; @@ -45,7 +46,7 @@ public function __construct() } } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return function (...$args) use ($callback) { return function ($onResolve) use ($callback, $args) { @@ -67,7 +68,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -75,7 +76,7 @@ public function await(array &$stream): void }; }; - $defer = function (Closure $job) { + $defer = function (Closure|JobInterface $job) { return function (Closure $onResolve) use ($job) { $this->pool->add(static function () use ($job, $onResolve) { return $job($onResolve, static function ($fn, $next) { diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index d129eaa..413c99d 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -14,6 +14,7 @@ use Flow\Event\PushEvent; use Flow\Exception\RuntimeException; use Flow\Ip; +use Flow\JobInterface; use OpenSwoole\Timer; use RuntimeException as NativeRuntimeException; use Throwable; @@ -38,7 +39,7 @@ public function __construct() } } - public function async(Closure $callback): Closure + public function async(Closure|JobInterface $callback): Closure { return static function (...$args) use ($callback) { return static function ($onResolve) use ($callback, $args) { @@ -61,7 +62,7 @@ public function defer(Closure $callback): mixed public function await(array &$stream): void { - $async = function (Closure $job) { + $async = function (Closure|JobInterface $job) { return function (mixed $data) use ($job) { $async = $this->async($job); @@ -69,7 +70,7 @@ public function await(array &$stream): void }; }; - $defer = static function (Closure $job) { + $defer = static function (Closure|JobInterface $job) { return static function (Closure $onResolve) use ($job) { go(static function () use ($job, $onResolve) { try { diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 55298e2..69a588b 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -13,11 +13,11 @@ interface DriverInterface { /** - * #return Closure(TArgs): void when called this start async $callback. + * #return JobInterface|Closure(TArgs): void when called this start async $callback. * - * @param Closure(TArgs): TReturn $callback + * @param Closure(TArgs): TReturn|JobInterface $callback */ - public function async(Closure $callback): Closure; + public function async(Closure|JobInterface $callback): Closure; /** * This allow more granular control on async diff --git a/src/Event/AsyncEvent.php b/src/Event/AsyncEvent.php index 8304643..ddc5f0e 100644 --- a/src/Event/AsyncEvent.php +++ b/src/Event/AsyncEvent.php @@ -6,20 +6,22 @@ use Closure; use Flow\Ip; +use Flow\JobInterface; use Symfony\Contracts\EventDispatcher\Event; /** - * @template T + * @template T1 */ final class AsyncEvent extends Event { /** - * @param Ip $ip + * @param Closure|JobInterface $job + * @param Ip $ip */ public function __construct( private Closure $async, private Closure $defer, - private Closure $job, + private Closure|JobInterface $job, private Ip $ip, private Closure $callback ) {} @@ -34,13 +36,16 @@ public function getDefer(): Closure return $this->defer; } - public function getJob(): Closure + /** + * @return Closure|JobInterface + */ + public function getJob(): Closure|JobInterface { return $this->job; } /** - * @return Ip + * @return Ip */ public function getIp(): Ip { diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 903625c..511bd24 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -17,6 +17,7 @@ use Flow\Ip; use Flow\IpStrategy\LinearIpStrategy; use Flow\IpStrategyInterface; +use Flow\JobInterface; use Generator; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; @@ -42,12 +43,12 @@ class Flow implements FlowInterface ]; /** - * @var Closure(T1): T2 + * @var Closure(T1): T2|JobInterface */ private $job; /** - * @var null|Closure(ExceptionInterface): void + * @var null|Closure(ExceptionInterface): void|JobInterface */ private $errorJob; @@ -59,15 +60,15 @@ class Flow implements FlowInterface private DriverInterface $driver; /** - * @param Closure(T1): T2 $job - * @param Closure(ExceptionInterface): void $errorJob - * @param null|IpStrategyInterface $ipStrategy - * @param null|AsyncHandlerInterface $asyncHandler - * @param null|DriverInterface $driver + * @param Closure(T1): T2|JobInterface $job + * @param null|Closure(ExceptionInterface): void|JobInterface $errorJob + * @param null|IpStrategyInterface $ipStrategy + * @param null|AsyncHandlerInterface $asyncHandler + * @param null|DriverInterface $driver */ public function __construct( - Closure $job, - ?Closure $errorJob = null, + Closure|JobInterface $job, + null|Closure|JobInterface $errorJob = null, ?IpStrategyInterface $ipStrategy = null, ?EventDispatcherInterface $dispatcher = null, ?AsyncHandlerInterface $asyncHandler = null, @@ -121,7 +122,7 @@ public static function do(callable $callable, ?array $config = null): FlowInterf return self::flowUnwrap($generator, $config); } - public function fn(array|Closure|FlowInterface $flow): FlowInterface + public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface { $flow = self::flowUnwrap($flow, ['driver' => $this->driver]); @@ -161,7 +162,7 @@ public function await(): void */ private static function flowUnwrap($flow, ?array $config = null): FlowInterface { - if ($flow instanceof Closure) { + if ($flow instanceof Closure || $flow instanceof JobInterface) { return new self(...[...['job' => $flow], ...($config ?? [])]); } if (is_array($flow)) { diff --git a/src/Flow/FlowDecorator.php b/src/Flow/FlowDecorator.php index 3e0ae04..f91e7bb 100644 --- a/src/Flow/FlowDecorator.php +++ b/src/Flow/FlowDecorator.php @@ -7,6 +7,7 @@ use Closure; use Flow\FlowInterface; use Flow\Ip; +use Flow\JobInterface; /** * @template T1 @@ -26,7 +27,7 @@ public function __invoke(Ip $ip): void ($this->flow)($ip); } - public function fn(array|Closure|FlowInterface $flow): FlowInterface + public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface { return $this->flow->fn($flow); } diff --git a/src/Flow/YFlow.php b/src/Flow/YFlow.php index dd2eff7..4cb0305 100644 --- a/src/Flow/YFlow.php +++ b/src/Flow/YFlow.php @@ -9,6 +9,8 @@ use Flow\DriverInterface; use Flow\ExceptionInterface; use Flow\IpStrategyInterface; +use Flow\Job\YJob; +use Flow\JobInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; /** @@ -20,22 +22,19 @@ class YFlow extends Flow { /** - * @param null|Closure(ExceptionInterface): void $errorJob - * @param null|IpStrategyInterface $ipStrategy - * @param null|AsyncHandlerInterface $asyncHandler - * @param null|DriverInterface $driver + * @param null|Closure(ExceptionInterface): void|JobInterface $errorJob + * @param null|IpStrategyInterface $ipStrategy + * @param null|AsyncHandlerInterface $asyncHandler + * @param null|DriverInterface $driver */ public function __construct( - Closure $job, - ?Closure $errorJob = null, + Closure|JobInterface $job, + null|Closure|JobInterface $errorJob = null, ?IpStrategyInterface $ipStrategy = null, ?EventDispatcherInterface $dispatcher = null, ?AsyncHandlerInterface $asyncHandler = null, ?DriverInterface $driver = null ) { - $U = static fn (Closure $f) => $f($f); - $Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); - - parent::__construct($Y($job), $errorJob, $ipStrategy, $dispatcher, $asyncHandler, $driver); + parent::__construct(new YJob($job), $errorJob, $ipStrategy, $dispatcher, $asyncHandler, $driver); } } diff --git a/src/FlowInterface.php b/src/FlowInterface.php index 51225df..6f27a20 100644 --- a/src/FlowInterface.php +++ b/src/FlowInterface.php @@ -20,22 +20,22 @@ public function __invoke(Ip $ip): void; /** * @template T2 * - * @param array|Closure|FlowInterface $flow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance - * #param ?array{ - * 0: Closure, - * 1?: Closure, - * 2?: IpStrategyInterface, - * 3?: DriverInterface - * }|array{ - * "job"?: Closure, - * "errorJob"?: Closure, - * "ipStrategy"?: IpStrategyInterface, - * "driver"?: DriverInterface - * }|Closure|FlowInterface $config + * @param array|Closure(T1): T2|FlowInterface|JobInterface $flow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance + * #param ?array{ + * 0: Closure, + * 1?: Closure, + * 2?: IpStrategyInterface, + * 3?: DriverInterface + * }|array{ + * "job"?: JobInterface|Closure, + * "errorJob"?: JobInterface|Closure, + * "ipStrategy"?: IpStrategyInterface, + * "driver"?: DriverInterface + * }|Closure|FlowInterface $config * * @return FlowInterface */ - public function fn(array|Closure|self $flow): self; + public function fn(array|Closure|JobInterface|self $flow): self; /** * Do-notation a.k.a. for-comprehension. @@ -68,8 +68,8 @@ public function fn(array|Closure|self $flow): self; * 4?: AsyncHandlerInterface, * 5?: DriverInterface * }|array{ - * "jobs"?: Closure|array, - * "errorJobs"?: Closure|array, + * "jobs"?: JobInterface|Closure|array, + * "errorJobs"?: JobInterface|Closure|array, * "ipStrategy"?: IpStrategyInterface, * "dispatcher"?: EventDispatcherInterface, * "asyncHandler"?: AsyncHandlerInterface, diff --git a/src/Job/ClosureJob.php b/src/Job/ClosureJob.php new file mode 100644 index 0000000..1dd100f --- /dev/null +++ b/src/Job/ClosureJob.php @@ -0,0 +1,29 @@ + + */ +class ClosureJob implements JobInterface +{ + /** + * @param Closure(TArgs): TReturn|JobInterface $job + */ + public function __construct(private Closure|JobInterface $job) {} + + public function __invoke($data): mixed + { + $job = $this->job; + + return $job($data); + } +} diff --git a/src/Job/YJob.php b/src/Job/YJob.php new file mode 100644 index 0000000..117466e --- /dev/null +++ b/src/Job/YJob.php @@ -0,0 +1,31 @@ + + */ +class YJob implements JobInterface +{ + /** + * @param Closure(mixed): mixed|JobInterface $job + */ + public function __construct(private Closure|JobInterface $job) {} + + public function __invoke($data): mixed + { + $U = static fn (Closure $f) => $f($f); + $Y = static fn (Closure|JobInterface $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); + $job = $this->job; + + return $Y($job)($data); + } +} diff --git a/src/JobInterface.php b/src/JobInterface.php new file mode 100644 index 0000000..b3e5741 --- /dev/null +++ b/src/JobInterface.php @@ -0,0 +1,19 @@ + $x, static fn ($x) => $x, - static function ($args) use (&$result) { + static function (array $args) use (&$result) { [[$n1, $n2], $defer] = $args; $result = $n1 + $n2; diff --git a/tests/Flow/FlowTest.php b/tests/Flow/FlowTest.php index 9cda89e..e3c0149 100644 --- a/tests/Flow/FlowTest.php +++ b/tests/Flow/FlowTest.php @@ -16,6 +16,7 @@ use Flow\Flow\Flow; use Flow\Ip; use Flow\IpStrategy\MaxIpStrategy; +use Flow\Job\ClosureJob; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -148,12 +149,18 @@ public static function provideJobCases(): iterable return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) { $cases = []; - $cases['job'] = [[[static function (ArrayObject $data) { + $cases['closureJob'] = [[[static function (ArrayObject $data) { $data['number'] = 5; return $data; }, $strategyBuilder(), new AsyncHandler()]], 5]; + $cases['classJob'] = [[[new ClosureJob(static function (ArrayObject $data) { + $data['number'] = 6; + + return $data; + }), $strategyBuilder(), new AsyncHandler()]], 6]; + $strategy = $strategyBuilder(); if (!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) { $cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) { diff --git a/tests/Job/ClosureJobTest.php b/tests/Job/ClosureJobTest.php new file mode 100644 index 0000000..f0bd4f6 --- /dev/null +++ b/tests/Job/ClosureJobTest.php @@ -0,0 +1,19 @@ +