From c07b2698218b55a4cd1b057e747b5ef8b68c93d6 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Mon, 12 Aug 2024 19:05:48 +0200 Subject: [PATCH] :zap: Found embark wrapWithDeferred --- examples/ampYFlowDriver.php | 39 +++++++++++-------------------------- src/Driver/AmpDriver.php | 5 +++-- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/examples/ampYFlowDriver.php b/examples/ampYFlowDriver.php index 9b70577..754ceea 100644 --- a/examples/ampYFlowDriver.php +++ b/examples/ampYFlowDriver.php @@ -19,45 +19,28 @@ $U = static fn (Closure $f) => $f($f); $Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y))); -function wrapWithDeferred(Closure $job): Future -{ - $deferred = new DeferredFuture(); - - // Queue the operation to be executed in the event loop - EventLoop::queue(static function () use ($job, $deferred) { - try { - $job(static function ($return) use ($deferred) { - $deferred->complete($return); - }, static function (Future $future, $next) { - $future->map($next); - }); - } catch (Throwable $exception) { - $deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception)); - } - }); - - return $deferred->getFuture(); -} - $asyncFactorial = $Y(static function ($factorial) { - return static function (YFlowData $data) use ($factorial) { - return wrapWithDeferred(static function ($complete, $async) use ($data, $factorial) { + return static function ($args) use ($factorial): Future{ + [$data, $defer] = $args; + return $defer(static function ($complete, $async) use ($data, $defer, $factorial) { if ($data->result <= 1) { - $complete(new YFlowData($data->id, $data->number, 1)); + $complete([new YFlowData($data->id, $data->number, 1), $defer]); } else { - $async($factorial(new YFlowData($data->id, $data->number, $data->result - 1)), static function ($resultData) use ($data, $complete) { - $complete(new YFlowData($data->id, $data->number, $data->result * $resultData->result)); + $async($factorial([new YFlowData($data->id, $data->number, $data->result - 1), $defer]), static function ($result) use ($data, $complete) { + [$resultData, $defer] = $result; + $complete([new YFlowData($data->id, $data->number, $data->result * $resultData->result), $defer]); }); } }); }; }); -$factorialYJobAfter = static function (YFlowData $data): Future { - return wrapWithDeferred(static function ($complete) use ($data) { +$factorialYJobAfter = static function ($args): Future { + [$data, $defer] = $args; + return $defer(static function ($complete) use ($data, $defer) { printf("* #%d - Job : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result); - $complete(new YFlowData($data->id, $data->number)); + $complete([new YFlowData($data->id, $data->number), $defer]); }); }; diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index d39b88d..c366701 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -91,9 +91,10 @@ public function await(array &$stream): void }; $async = $stream['fnFlows'][$index]['job']; - $future = $async($nextIp->data); + $future = $async([$nextIp->data, $defer]); - $future->map(static function ($data) use (&$stream, $index, $nextIp) { + $future->map(static function ($args) use (&$stream, $index, $nextIp) { + [$data, $defer] = $args; if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) {