diff --git a/examples/ampYFlowDriver.php b/examples/ampYFlowDriver.php index 754ceea..679d0c7 100644 --- a/examples/ampYFlowDriver.php +++ b/examples/ampYFlowDriver.php @@ -4,24 +4,26 @@ require __DIR__ . '/../vendor/autoload.php'; -use Amp\DeferredFuture; -use Amp\Future; -use Flow\AsyncHandler\AsyncHandler; -use Flow\AsyncHandler\YAsyncHandler; +use Flow\AsyncHandler\DeferAsyncHandler; use Flow\Driver\AmpDriver; -use Flow\Event; use Flow\Examples\Data\YFlowData; use Flow\Flow\Flow; use Flow\Ip; -use Revolt\EventLoop; // Define the Y-Combinator $U = static fn (Closure $f) => $f($f); $Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); -$asyncFactorial = $Y(static function ($factorial) { - return static function ($args) use ($factorial): Future{ +$factorialYJobDeferBefore = static function (YFlowData $data) { + printf("* #%d - Job : Calculating factorial(%d)\n", $data->id, $data->number); + + return new YFlowData($data->id, $data->number, $data->number); +}; + +$asyncFactorialDefer = $Y(static function ($factorial) { + return static function ($args) use ($factorial) { [$data, $defer] = $args; + return $defer(static function ($complete, $async) use ($data, $defer, $factorial) { if ($data->result <= 1) { $complete([new YFlowData($data->id, $data->number, 1), $defer]); @@ -35,8 +37,9 @@ }; }); -$factorialYJobAfter = static function ($args): Future { +$factorialYJobDeferAfter = static function ($args) { [$data, $defer] = $args; + return $defer(static function ($complete) use ($data, $defer) { printf("* #%d - Job : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result); @@ -46,8 +49,9 @@ $driver = new AmpDriver(); -$flow = (new Flow($asyncFactorial, null, null, null, new YAsyncHandler(), $driver)) - ->fn(new Flow($factorialYJobAfter, null, null, null, new AsyncHandler(), $driver)) +$flow = (new Flow($factorialYJobDeferBefore, null, null, null, null, $driver)) + ->fn(new Flow($asyncFactorialDefer, null, null, null, new DeferAsyncHandler(), $driver)) + ->fn(new Flow($factorialYJobDeferAfter, null, null, null, new DeferAsyncHandler(), $driver)) ; $ip = new Ip(new YFlowData(5, 5, 5)); diff --git a/src/AsyncHandler/AsyncHandler.php b/src/AsyncHandler/AsyncHandler.php index 4e42c4e..8e11d53 100644 --- a/src/AsyncHandler/AsyncHandler.php +++ b/src/AsyncHandler/AsyncHandler.php @@ -8,8 +8,6 @@ use Flow\Event; use Flow\Event\AsyncEvent; -use function call_user_func_array; - final class AsyncHandler implements AsyncHandlerInterface { public static function getSubscribedEvents() @@ -21,10 +19,10 @@ public static function getSubscribedEvents() public function async(AsyncEvent $event): void { - $args = array_merge([$event->getWrapper()], $event->getArgs()); - - call_user_func_array($event->getAsync(), $args); - - // call_user_func_array($event->getAsync(), $event->getArgs())($event->getWrapper()); + $ip = $event->getIp(); + $async = $event->getAsync(); + $asyncJob = $async($event->getJob()); + $next = $asyncJob($ip->data); + $next($event->getCallback()); } } diff --git a/src/AsyncHandler/BatchAsyncHandler.php b/src/AsyncHandler/BatchAsyncHandler.php index 4593c21..136ffe3 100644 --- a/src/AsyncHandler/BatchAsyncHandler.php +++ b/src/AsyncHandler/BatchAsyncHandler.php @@ -12,15 +12,18 @@ use Symfony\Component\Messenger\Handler\BatchHandlerTrait; use Throwable; -use function call_user_func_array; - final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface { use BatchHandlerTrait; + private AsyncHandlerInterface $asyncHandler; + public function __construct( private int $batchSize = 10, - ) {} + ?AsyncHandlerInterface $asyncHandler = null, + ) { + $this->asyncHandler = $asyncHandler ?? new AsyncHandler(); + } public static function getSubscribedEvents() { @@ -31,8 +34,8 @@ public static function getSubscribedEvents() public function async(AsyncEvent $event): void { - $ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) { - call_user_func_array($event->getAsync(), $event->getArgs()); + $ack = new Acknowledger(get_debug_type($this), function (?Throwable $e = null, $event = null) { + $this->asyncHandler->async($event); }); $this->handle($event, $ack); diff --git a/src/AsyncHandler/DeferAsyncHandler.php b/src/AsyncHandler/DeferAsyncHandler.php new file mode 100644 index 0000000..8993af0 --- /dev/null +++ b/src/AsyncHandler/DeferAsyncHandler.php @@ -0,0 +1,31 @@ + 'async', + ]; + } + + public function async(AsyncEvent $event): void + { + $ip = $event->getIp(); + $job = $event->getJob(); + $next = $job([$ip->data, $event->getDefer()]); + $next(static function ($result) use ($event) { + [$data] = $result; + $callback = $event->getCallback(); + $callback($data); + }); + } +} diff --git a/src/AsyncHandler/YAsyncHandler.php b/src/AsyncHandler/YAsyncHandler.php deleted file mode 100644 index a1acb16..0000000 --- a/src/AsyncHandler/YAsyncHandler.php +++ /dev/null @@ -1,41 +0,0 @@ - 'async', - ]; - } - - public function async(AsyncEvent $event): void - { - $wrapper = static function ($job) use ($event) { - $U = static fn (Closure $f) => $f($f); - $Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); - - return $Y($event->getWrapper()($job)); - }; - - $args = array_merge([$wrapper], $event->getArgs()); - - call_user_func_array($event->getAsync(), $args); - - /*$U = static fn (Closure $f) => $f($f); - $Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); - - call_user_func_array($event->getAsync(), $event->getArgs())($Y($event->getWrapper()));*/ - } -} diff --git a/src/AsyncHandlerInterface.php b/src/AsyncHandlerInterface.php index 40ce871..2296c38 100644 --- a/src/AsyncHandlerInterface.php +++ b/src/AsyncHandlerInterface.php @@ -4,6 +4,10 @@ namespace Flow; +use Flow\Event\AsyncEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; -interface AsyncHandlerInterface extends EventSubscriberInterface {} +interface AsyncHandlerInterface extends EventSubscriberInterface +{ + public function async(AsyncEvent $event): void; +} diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index c366701..28a35eb 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -53,48 +53,65 @@ public function async(Closure $callback): Closure try { return $callback(...$args, ...($args = [])); } catch (Throwable $exception) { - dd($exception); - return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception); } }, $callback, $args); }; } + public function defer(Closure $callback): mixed + { + $deferred = new DeferredFuture(); + + EventLoop::queue(static function () use ($callback, $deferred) { + try { + $callback(static function ($return) use ($deferred) { + $deferred->complete($return); + }, static function ($callback, $next) { + $callback($next); + }); + } catch (Throwable $exception) { + $deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); + } + }); + + return $deferred->getFuture(); + } + public function await(array &$stream): void { - $loop = function () use (&$loop, &$stream) { + $async = function (Closure $job) { + return function (mixed $data) use ($job) { + $async = $this->async($job); + + if ($data === null) { + $future = $async(); + } else { + $future = $async($data); + } + + return static function ($map) use ($future) { + $future->map($map); + }; + }; + }; + + $defer = function (Closure $job) { + return function ($map) use ($job) { + $future = $this->defer($job); + $future->map($map); + }; + }; + + $loop = function () use (&$loop, &$stream, $async, $defer) { $nextIp = null; do { foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); if ($nextIp !== null) { - $defer = function(Closure $job): Future - { - $deferred = new DeferredFuture(); - - // Queue the operation to be executed in the event loop - EventLoop::queue(static function () use ($job, $deferred) { - try { - $job(static function ($return) use ($deferred) { - $deferred->complete($return); - }, static function (Future $future, $next) { - $future->map($next); - }); - } catch (Throwable $exception) { - dd($exception); - $deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); - } - }); - - return $deferred->getFuture(); - }; - - $async = $stream['fnFlows'][$index]['job']; - $future = $async([$nextIp->data, $defer]); - - $future->map(static function ($args) use (&$stream, $index, $nextIp) { - [$data, $defer] = $args; + $job = $stream['fnFlows'][$index]['job']; + + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) { if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { @@ -105,7 +122,7 @@ public function await(array &$stream): void $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); $stream['ips']--; - }); + }), Event::ASYNC); } } } while ($nextIp !== null); diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 79f9bd9..dae9a5e 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -19,6 +19,8 @@ interface DriverInterface */ public function async(Closure $callback): Closure; + public function defer(Closure $callback): mixed; + /** * @param array{'ips': int, 'fnFlows': array, 'dispatchers': array} $stream */ diff --git a/src/Event/AsyncEvent.php b/src/Event/AsyncEvent.php index 342c63b..dd68f74 100644 --- a/src/Event/AsyncEvent.php +++ b/src/Event/AsyncEvent.php @@ -5,38 +5,41 @@ namespace Flow\Event; use Closure; +use Flow\Ip; use Symfony\Contracts\EventDispatcher\Event; final class AsyncEvent extends Event { - /** - * @var array - */ - private array $args; - - /** - * @param array $args - */ - public function __construct(private Closure $async, private Closure $wrapper, ...$args) - { - $this->args = $args; - } + public function __construct( + private Closure $async, + private Closure $defer, + private Closure $job, + private Ip $ip, + private Closure $callback + ) {} public function getAsync(): Closure { return $this->async; } - public function getWrapper(): Closure + public function getDefer(): Closure + { + return $this->defer; + } + + public function getJob(): Closure + { + return $this->job; + } + + public function getIp(): Ip { - return $this->wrapper; + return $this->ip; } - /** - * @return array - */ - public function getArgs(): array + public function getCallback(): Closure { - return $this->args; + return $this->callback; } }