Skip to content

Commit

Permalink
Merge pull request #40 from darkwood-fr/driver
Browse files Browse the repository at this point in the history
💄 Cleanup DriverInterface
  • Loading branch information
matyo91 authored Sep 3, 2023
2 parents d184869 + 1f996df commit 5953a4b
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 48 deletions.
5 changes: 4 additions & 1 deletion docs/src/content/en/docs/getting-started/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ toc: true
- Upgrade to Symfony 6.3
- Refactor docs structure
- Refactor tooling from https://github.com/jolicode/castor
- Update `tick` and Remove `start` and `stop` from Flow\DriverInterface
- Rework on Flow\DriverInterface
- Update `async` that now $onResolve get called with async $callback result or Flow\Exception as first argument
- Update `tick` that now return a Closure to cleanup tick interval
- Remove `start` and `stop`
- Add Fiber Driver from https://github.com/jolicode/castor/blob/main/src/functions.php

### 1.1.2
Expand Down
19 changes: 10 additions & 9 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Closure;
use Flow\DriverInterface;
use Flow\Exception;
use Revolt\EventLoop;
use RuntimeException;
use Throwable;
Expand All @@ -24,23 +25,23 @@ public function __construct()
}
}

public function async(Closure $callback, Closure $onResolved = null): Closure
public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolved): void {
EventLoop::queue(function (Closure $callback, array $args, Closure $onResolved = null) {
return function (...$args) use ($callback, $onResolve): void {
EventLoop::queue(function (Closure $callback, array $args, Closure $onResolve = null) {
try {
$callback(...$args, ...($args = []));
if ($onResolved) {
$onResolved(null);
$return = $callback(...$args, ...($args = []));
if ($onResolve) {
$onResolve($return);
}
} catch (Throwable $exception) {
if ($onResolved) {
$onResolved($exception);
if ($onResolve) {
$onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception));
}
} finally {
$this->pop();
}
}, $callback, $args, $onResolved);
}, $callback, $args, $onResolve);
$this->push();
};
}
Expand Down
25 changes: 13 additions & 12 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Closure;
use Fiber;
use Flow\DriverInterface;
use Flow\Exception;
use Throwable;

class FiberDriver implements DriverInterface
Expand All @@ -20,21 +21,21 @@ class FiberDriver implements DriverInterface

private bool $isLooping = false;

public function async(Closure $callback, Closure $onResolved = null): Closure
public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolved): void {
return function (...$args) use ($callback, $onResolve): void {
$fiber = new Fiber($callback);

$fiberData = [
'fiber' => $fiber,
'onResolved' => $onResolved,
'error' => null,
'onResolve' => $onResolve,
'exception' => null,
];

try {
$fiber->start(...$args);
} catch (Throwable $e) {
$fiberData['error'] = $e;
} catch (Throwable $exception) {
$fiberData['exception'] = $exception;
}

$this->fibers[] = $fiberData;
Expand Down Expand Up @@ -75,15 +76,15 @@ private function loop(): void
if (!$fiber['fiber']->isTerminated() && $fiber['fiber']->isSuspended()) {
try {
$fiber['fiber']->resume();
} catch (Throwable $e) {
$this->fibers[$i]['error'] = $e;
} catch (Throwable $exception) {
$this->fibers[$i]['exception'] = $exception;
}
} else {
if ($fiber['onResolved']) {
if ($fiber['error'] === null) {
$fiber['onResolved'](null);
if ($fiber['onResolve']) {
if ($fiber['exception'] === null) {
$fiber['onResolve']($fiber['fiber']->getReturn());
} else {
$fiber['onResolved']($fiber['error']);
$fiber['onResolve'](new Exception($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception']));
}
}
unset($this->fibers[$i]);
Expand Down
19 changes: 10 additions & 9 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Closure;
use Flow\DriverInterface;
use Flow\Exception;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use RuntimeException;
Expand All @@ -30,18 +31,18 @@ public function __construct(LoopInterface $eventLoop = null)
$this->eventLoop = $eventLoop ?? Loop::get();
}

public function async(Closure $callback, Closure $onResolved = null): Closure
public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolved): void {
async(function () use ($callback, $onResolved, $args) {
return function (...$args) use ($callback, $onResolve): void {
async(function () use ($callback, $onResolve, $args) {
try {
$callback(...$args, ...($args = []));
if ($onResolved) {
$onResolved(null);
$return = $callback(...$args, ...($args = []));
if ($onResolve) {
$onResolve($return);
}
} catch (Throwable $e) {
if ($onResolved) {
$onResolved($e);
} catch (Throwable $exception) {
if ($onResolve) {
$onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception));
}
} finally {
$this->pop();
Expand Down
21 changes: 11 additions & 10 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Closure;
use Flow\DriverInterface;
use Flow\Exception;
use OpenSwoole\Coroutine;
use OpenSwoole\Timer;
use RuntimeException;
Expand All @@ -22,22 +23,22 @@ public function __construct()
}
}

public function async(Closure $callback, Closure $onResolved = null): Closure
public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolved): void {
Coroutine::run(static function () use ($callback, $onResolved, $args) {
Coroutine::create(static function (Closure $callback, array $args, Closure $onResolved = null) {
return function (...$args) use ($callback, $onResolve): void {
Coroutine::run(static function () use ($callback, $onResolve, $args) {
Coroutine::create(static function (Closure $callback, array $args, Closure $onResolve = null) {
try {
$callback(...$args, ...($args = []));
if ($onResolved) {
$onResolved(null);
if ($onResolve) {
$onResolve(null);
}
} catch (Throwable $e) {
if ($onResolved) {
$onResolved($e);
} catch (Throwable $exception) {
if ($onResolve) {
$onResolve(new Exception($exception->getMessage(), $exception->getCode(), $exception));
}
}
}, $callback, $args, $onResolved);
}, $callback, $args, $onResolve);
});
};
}
Expand Down
4 changes: 3 additions & 1 deletion src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
interface DriverInterface
{
/**
* @param Closure $onResolve called on resolved and first argument is $callback return or Flow\Exception on Exception
*
* @return Closure when called, this start async $callback
*/
public function async(Closure $callback, Closure $onResolved = null): Closure;
public function async(Closure $callback, Closure $onResolve = null): Closure;

public function delay(float $seconds): void;

Expand Down
11 changes: 11 additions & 0 deletions src/Exception.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow;

use RuntimeException;

class Exception extends RuntimeException
{
}
12 changes: 6 additions & 6 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
use Closure;
use Flow\Driver\ReactDriver;
use Flow\DriverInterface;
use Flow\Exception;
use Flow\FlowInterface;
use Flow\Ip;
use Flow\IpStrategy\LinearIpStrategy;
use Flow\IpStrategyInterface;
use SplObjectStorage;
use Throwable;

use function count;
use function is_array;
Expand Down Expand Up @@ -86,18 +86,18 @@ private function nextIpJob(): void

$count = count($this->jobs);
foreach ($this->jobs as $i => $job) {
$this->driver->async($job, function (Throwable $exception = null) use ($ip, &$count, $i, $callback) {
$this->driver->async($job, function (mixed $value) use ($ip, &$count, $i, $callback) {
$count--;
if ($count === 0 || $exception !== null) {
if ($count === 0 || $value instanceof Exception) {
$count = 0;
$this->ipStrategy->done($ip);
$this->nextIpJob();

if ($exception) {
if ($value instanceof Exception) {
if (isset($this->errorJobs[$i])) {
$this->errorJobs[$i]($ip->data, $exception);
$this->errorJobs[$i]($ip->data, $value->getPrevious());
} else {
throw $exception;
throw $value->getPrevious();
}
}

Expand Down

0 comments on commit 5953a4b

Please sign in to comment.