From 1f996df9f24cb709e7ddbb2a5c7d58edb432c46f Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Sun, 3 Sep 2023 23:30:30 +0200 Subject: [PATCH] :lipstick: Cleanup DriverInterface --- .../en/docs/getting-started/changelog.md | 5 +++- src/Driver/AmpDriver.php | 19 +++++++------- src/Driver/FiberDriver.php | 25 ++++++++++--------- src/Driver/ReactDriver.php | 19 +++++++------- src/Driver/SwooleDriver.php | 21 ++++++++-------- src/DriverInterface.php | 4 ++- src/Exception.php | 11 ++++++++ src/Flow/Flow.php | 12 ++++----- 8 files changed, 68 insertions(+), 48 deletions(-) create mode 100644 src/Exception.php diff --git a/docs/src/content/en/docs/getting-started/changelog.md b/docs/src/content/en/docs/getting-started/changelog.md index f75edb26..2350218d 100644 --- a/docs/src/content/en/docs/getting-started/changelog.md +++ b/docs/src/content/en/docs/getting-started/changelog.md @@ -18,7 +18,10 @@ toc: true - Upgrade to Symfony 6.3 - Refactor docs structure - Refactor tooling from https://github.com/jolicode/castor -- Update `tick` and Remove `start` and `stop` from Flow\DriverInterface +- Rework on Flow\DriverInterface + - Update `async` that now $onResolve get called with async $callback result or Flow\Exception as first argument + - Update `tick` that now return a Closure to cleanup tick interval + - Remove `start` and `stop` - Add Fiber Driver from https://github.com/jolicode/castor/blob/main/src/functions.php ### 1.1.2 diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index f8ba642d..a1325cc5 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -6,6 +6,7 @@ use Closure; use Flow\DriverInterface; +use Flow\Exception; use Revolt\EventLoop; use RuntimeException; use Throwable; @@ -24,23 +25,23 @@ public function __construct() } } - public function async(Closure $callback, Closure $onResolved = null): Closure + public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolved): void { - EventLoop::queue(function (Closure $callback, array $args, Closure $onResolved = null) { + return function (...$args) use ($callback, $onResolve): void { + EventLoop::queue(function (Closure $callback, array $args, Closure $onResolve = null) { try { - $callback(...$args, ...($args = [])); - if ($onResolved) { - $onResolved(null); + $return = $callback(...$args, ...($args = [])); + if ($onResolve) { + $onResolve($return); } } catch (Throwable $exception) { - if ($onResolved) { - $onResolved($exception); + if ($onResolve) { + $onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception)); } } finally { $this->pop(); } - }, $callback, $args, $onResolved); + }, $callback, $args, $onResolve); $this->push(); }; } diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 65b1e413..a8f5dbf9 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -9,6 +9,7 @@ use Closure; use Fiber; use Flow\DriverInterface; +use Flow\Exception; use Throwable; class FiberDriver implements DriverInterface @@ -20,21 +21,21 @@ class FiberDriver implements DriverInterface private bool $isLooping = false; - public function async(Closure $callback, Closure $onResolved = null): Closure + public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolved): void { + return function (...$args) use ($callback, $onResolve): void { $fiber = new Fiber($callback); $fiberData = [ 'fiber' => $fiber, - 'onResolved' => $onResolved, - 'error' => null, + 'onResolve' => $onResolve, + 'exception' => null, ]; try { $fiber->start(...$args); - } catch (Throwable $e) { - $fiberData['error'] = $e; + } catch (Throwable $exception) { + $fiberData['exception'] = $exception; } $this->fibers[] = $fiberData; @@ -75,15 +76,15 @@ private function loop(): void if (!$fiber['fiber']->isTerminated() && $fiber['fiber']->isSuspended()) { try { $fiber['fiber']->resume(); - } catch (Throwable $e) { - $this->fibers[$i]['error'] = $e; + } catch (Throwable $exception) { + $this->fibers[$i]['exception'] = $exception; } } else { - if ($fiber['onResolved']) { - if ($fiber['error'] === null) { - $fiber['onResolved'](null); + if ($fiber['onResolve']) { + if ($fiber['exception'] === null) { + $fiber['onResolve']($fiber['fiber']->getReturn()); } else { - $fiber['onResolved']($fiber['error']); + $fiber['onResolve'](new Exception($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception'])); } } unset($this->fibers[$i]); diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 987e00dd..5101c56b 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -6,6 +6,7 @@ use Closure; use Flow\DriverInterface; +use Flow\Exception; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use RuntimeException; @@ -30,18 +31,18 @@ public function __construct(LoopInterface $eventLoop = null) $this->eventLoop = $eventLoop ?? Loop::get(); } - public function async(Closure $callback, Closure $onResolved = null): Closure + public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolved): void { - async(function () use ($callback, $onResolved, $args) { + return function (...$args) use ($callback, $onResolve): void { + async(function () use ($callback, $onResolve, $args) { try { - $callback(...$args, ...($args = [])); - if ($onResolved) { - $onResolved(null); + $return = $callback(...$args, ...($args = [])); + if ($onResolve) { + $onResolve($return); } - } catch (Throwable $e) { - if ($onResolved) { - $onResolved($e); + } catch (Throwable $exception) { + if ($onResolve) { + $onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception)); } } finally { $this->pop(); diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 9f35d439..263e823e 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -6,6 +6,7 @@ use Closure; use Flow\DriverInterface; +use Flow\Exception; use OpenSwoole\Coroutine; use OpenSwoole\Timer; use RuntimeException; @@ -22,22 +23,22 @@ public function __construct() } } - public function async(Closure $callback, Closure $onResolved = null): Closure + public function async(Closure $callback, Closure $onResolve = null): Closure { - return function (...$args) use ($callback, $onResolved): void { - Coroutine::run(static function () use ($callback, $onResolved, $args) { - Coroutine::create(static function (Closure $callback, array $args, Closure $onResolved = null) { + return function (...$args) use ($callback, $onResolve): void { + Coroutine::run(static function () use ($callback, $onResolve, $args) { + Coroutine::create(static function (Closure $callback, array $args, Closure $onResolve = null) { try { $callback(...$args, ...($args = [])); - if ($onResolved) { - $onResolved(null); + if ($onResolve) { + $onResolve(null); } - } catch (Throwable $e) { - if ($onResolved) { - $onResolved($e); + } catch (Throwable $exception) { + if ($onResolve) { + $onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception)); } } - }, $callback, $args, $onResolved); + }, $callback, $args, $onResolve); }); }; } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index be14adf4..cec2ccc4 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -9,9 +9,11 @@ interface DriverInterface { /** + * @param Closure $onResolve called on resolved and first argument is $callback return or Flow\Exception on Exception + * * @return Closure when called, this start async $callback */ - public function async(Closure $callback, Closure $onResolved = null): Closure; + public function async(Closure $callback, Closure $onResolve = null): Closure; public function delay(float $seconds): void; diff --git a/src/Exception.php b/src/Exception.php new file mode 100644 index 00000000..2548883c --- /dev/null +++ b/src/Exception.php @@ -0,0 +1,11 @@ +jobs); foreach ($this->jobs as $i => $job) { - $this->driver->async($job, function (Throwable $exception = null) use ($ip, &$count, $i, $callback) { + $this->driver->async($job, function (mixed $value) use ($ip, &$count, $i, $callback) { $count--; - if ($count === 0 || $exception !== null) { + if ($count === 0 || $value instanceof Exception) { $count = 0; $this->ipStrategy->done($ip); $this->nextIpJob(); - if ($exception) { + if ($value instanceof Exception) { if (isset($this->errorJobs[$i])) { - $this->errorJobs[$i]($ip->data, $exception); + $this->errorJobs[$i]($ip->data, $value->getPrevious()); } else { - throw $exception; + throw $value->getPrevious(); } }