Skip to content

Commit

Permalink
♻️ Rollback start and stop to DriverInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 10, 2023
1 parent 8bc3f2d commit 6948911
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 114 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
- Update DX for Flow\DriverInterface
- Update `async` that now $onResolve get called with async $callback result or Flow\Exception as first argument
- Update `tick` that now return a Closure to cleanup tick interval
- Remove `start` and `stop`
- Add Flow\Driver\FiberDriver from https://github.com/jolicode/castor/blob/main/src/functions.php
- Upgrade to Symfony 6.3 and PHPUnit 10.3
- Refactor docs structure
Expand Down
14 changes: 11 additions & 3 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

$driver = match (random_int(1, 5)) {
1 => new AmpDriver(),
2 => new ReactDriver(),
3 => new FiberDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SwooleDriver(),
};
Expand All @@ -28,7 +28,7 @@
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data->id, $data->number, $data->number);

// simulating calculating some "light" operation from 0.1 to 1 seconds
$delay = random_int(1, 10) / 10;
$delay = random_int(1, 3);
$driver->delay($delay);
$result = $data->number;
$result += $result;
Expand Down Expand Up @@ -88,5 +88,13 @@

for ($i = 1; $i <= 5; $i++) {
$ip = new Ip(new Data($i, $i));
$ipPool->offsetSet($ip, true);
$flow($ip, static fn ($ip) => $ipPool->offsetUnset($ip));
}

$driver->tick(1, static function () use ($driver, $ipPool) {
if ($ipPool->count() === 0) {
$driver->stop();
}
});
$driver->start();
4 changes: 2 additions & 2 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

$driver = match (random_int(1, 5)) {
1 => new AmpDriver(),
2 => new ReactDriver(),
3 => new FiberDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SwooleDriver(),
};
Expand Down
30 changes: 28 additions & 2 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
*/
class AmpDriver implements DriverInterface
{
/**
* @var array<string>
*/
private array $ticksIds = [];

public function __construct()
{
if (!function_exists('Amp\\async')) {
Expand All @@ -44,7 +49,7 @@ public function async(Closure $callback, Closure $onResolve = null): Closure
$onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
}
}, $callback, $args, $onResolve)->await();
}, $callback, $args, $onResolve);
};
}

Expand All @@ -57,8 +62,29 @@ public function tick(int $interval, Closure $callback): Closure
{
$tickId = EventLoop::repeat($interval / 1000, $callback);

return static function () use ($tickId) {
$cancel = function () use ($tickId) {
unset($this->ticksIds[$tickId]);
EventLoop::cancel($tickId);
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

public function start(): void
{
if (!EventLoop::getDriver()->isRunning()) {
EventLoop::run();
}
}

public function stop(): void
{
foreach ($this->ticksIds as $cancel) {
$cancel();
}

EventLoop::getDriver()->stop();
}
}
73 changes: 44 additions & 29 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ class FiberDriver implements DriverInterface
*/
private array $fibers = [];

/**
* @var array<string>
*/
private array $ticksIds = [];

private bool $isLooping = false;

public function async(Closure $callback, Closure $onResolve = null): Closure
Expand All @@ -45,7 +50,6 @@ public function async(Closure $callback, Closure $onResolve = null): Closure
}

$this->fibers[] = $fiberData;
$this->loop();
};
}

Expand All @@ -59,50 +63,61 @@ public function delay(float $seconds): void
*/
public function tick(int $interval, Closure $callback): Closure
{
$closure = static fn () => $callback();
$tickId = uniqid('flow_fiber_tick_id');

$closure = static fn () => $callback();
register_tick_function($closure);

return static fn () => unregister_tick_function($closure);
$cancel = function () use ($tickId, $closure) {
unset($this->ticksIds[$tickId]);
unregister_tick_function($closure);
};

return $cancel;
}

private function loop(): void
public function start(): void
{
if ($this->isLooping === false) {
$this->isLooping = true;
$this->isLooping = true;

$isRunning = true;
$isRunning = true;

while ($isRunning) {
$isRunning = false;
while ($this->isLooping || $isRunning) { /** @phpstan-ignore-line */
$isRunning = false;

foreach ($this->fibers as $i => $fiber) {
$isRunning = $isRunning || !$fiber['fiber']->isTerminated();
foreach ($this->fibers as $i => $fiber) {
$isRunning = $isRunning || !$fiber['fiber']->isTerminated();

if (!$fiber['fiber']->isTerminated() and $fiber['fiber']->isSuspended()) {
try {
$fiber['fiber']->resume();
} catch (Throwable $exception) {
$this->fibers[$i]['exception'] = $exception;
}
} else {
if ($fiber['onResolve']) {
if ($fiber['exception'] === null) {
$fiber['onResolve']($fiber['fiber']->getReturn());
} else {
$fiber['onResolve'](new RuntimeException($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception']));
}
if (!$fiber['fiber']->isTerminated() and $fiber['fiber']->isSuspended()) {
try {
$fiber['fiber']->resume();
} catch (Throwable $exception) {
$this->fibers[$i]['exception'] = $exception;
}
} else {
if ($fiber['onResolve']) {
if ($fiber['exception'] === null) {
$fiber['onResolve']($fiber['fiber']->getReturn());
} else {
$fiber['onResolve'](new RuntimeException($fiber['exception']->getMessage(), $fiber['exception']->getCode(), $fiber['exception']));
}
unset($this->fibers[$i]);
}
unset($this->fibers[$i]);
}
}

if (Fiber::getCurrent()) {
Fiber::suspend();
usleep(1_000);
}
if (Fiber::getCurrent()) {
Fiber::suspend();
usleep(1_000);
}
}
}

public function stop(): void
{
foreach ($this->ticksIds as $cancel) {
$cancel();
}

$this->isLooping = false;
}
Expand Down
39 changes: 19 additions & 20 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class ReactDriver implements DriverInterface
{
private LoopInterface $eventLoop;

private int $counter = 0;
/**
* @var array<string>
*/
private array $ticksIds = [];

public function __construct(LoopInterface $eventLoop = null)
{
Expand All @@ -39,8 +42,8 @@ public function __construct(LoopInterface $eventLoop = null)

public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolve): void {
async(function () use ($callback, $onResolve, $args) {
return static function (...$args) use ($callback, $onResolve): void {
async(static function () use ($callback, $onResolve, $args) {
try {
$return = $callback(...$args, ...($args = []));
if ($onResolve) {
Expand All @@ -50,11 +53,8 @@ public function async(Closure $callback, Closure $onResolve = null): Closure
if ($onResolve) {
$onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
} finally {
$this->pop();
}
})();
$this->push();
};
}

Expand All @@ -65,28 +65,27 @@ public function delay(float $seconds): void

public function tick(int $interval, Closure $callback): Closure
{
$tickId = $this->eventLoop->addPeriodicTimer($interval, $callback);
$this->push();
$tickId = uniqid('flow_react_tick_id');

return function () use ($tickId) {
$this->eventLoop->cancelTimer($tickId);
$this->pop();
$timer = $this->eventLoop->addPeriodicTimer($interval, $callback);

return function () use ($tickId, $timer) {
unset($this->ticksIds[$tickId]);
$this->eventLoop->cancelTimer($timer);
};
}

private function push(): void
public function start(): void
{
if ($this->counter === 0) {
$this->eventLoop->run();
}
$this->counter++;
$this->eventLoop->run();
}

private function pop(): void
public function stop(): void
{
$this->counter--;
if ($this->counter === 0) {
$this->eventLoop->stop();
foreach ($this->ticksIds as $cancel) {
$cancel();
}

$this->eventLoop->stop();
}
}
33 changes: 16 additions & 17 deletions src/Driver/RevoltDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
*/
class RevoltDriver implements DriverInterface
{
private int $counter = 0;
/**
* @var array<string>
*/
private array $ticksIds = [];

public function __construct(Driver $driver = null)
{
Expand All @@ -35,8 +38,8 @@ public function __construct(Driver $driver = null)

public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolve): void {
EventLoop::queue(function (Closure $callback, array $args, Closure $onResolve = null) {
return static function (...$args) use ($callback, $onResolve): void {
EventLoop::queue(static function (Closure $callback, array $args, Closure $onResolve = null) {
try {
$return = $callback(...$args, ...($args = []));
if ($onResolve) {
Expand All @@ -46,11 +49,8 @@ public function async(Closure $callback, Closure $onResolve = null): Closure
if ($onResolve) {
$onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
} finally {
$this->pop();
}
}, $callback, $args, $onResolve);
$this->push();
};
}

Expand All @@ -69,27 +69,26 @@ public function delay(float $seconds): void
public function tick(int $interval, Closure $callback): Closure
{
$tickId = EventLoop::repeat($interval / 1000, $callback);
$this->push();

return function () use ($tickId) {
$cancel = function () use ($tickId) {
unset($this->ticksIds[$tickId]);
EventLoop::cancel($tickId);
$this->pop();
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

private function push(): void
public function start(): void
{
if (/* $this->counter === 0 || */ !EventLoop::getDriver()->isRunning()) {
if (!EventLoop::getDriver()->isRunning()) {
EventLoop::run();
}
$this->counter++;
}

private function pop(): void
public function stop(): void
{
$this->counter--;
if ($this->counter === 0) {
EventLoop::getDriver()->stop();
}
EventLoop::getDriver()->stop();
}
}
19 changes: 18 additions & 1 deletion src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
*/
class SwooleDriver implements DriverInterface
{
/**
* @var array<int, callable(): void>
*/
private array $ticksIds = [];

public function __construct()
{
if (!extension_loaded('openswoole')) {
Expand Down Expand Up @@ -59,8 +64,20 @@ public function tick(int $interval, Closure $callback): Closure
{
$tickId = Timer::tick($interval, $callback);

return static function () use ($tickId) {
return function () use ($tickId) {
unset($this->ticksIds[$tickId]);
Timer::clear($tickId); // @phpstan-ignore-line
};
}

public function start(): void
{
}

public function stop(): void
{
foreach ($this->ticksIds as $cancel) {
$cancel();
}
}
}
Loading

0 comments on commit 6948911

Please sign in to comment.