Skip to content

Commit

Permalink
Merge pull request #46 from darkwood-fr/spatie-driver
Browse files Browse the repository at this point in the history
✨ Add Flow\Driver\SpatieDriver
  • Loading branch information
matyo91 authored Sep 11, 2023
2 parents 6948911 + 723f264 commit 24d0275
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Add generic templating
- Add Flow\Driver\RevoltDriver
- Add Flow\Driver\SpatieDriver
- Add more quality tools from https://github.com/IngeniozIT/php-skeleton

## v1.1.3
Expand Down
9 changes: 6 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"fiber",
"monad",
"reactphp",
"spatie",
"swoole",
"symfony",
"php"
Expand All @@ -29,14 +30,16 @@
"symfony/doctrine-messenger": "^6.3",
"symfony/messenger": "^6.3",
"symfony/orm-pack": "^2.4",
"revolt/event-loop": "^1.0"
"revolt/event-loop": "^1.0",
"spatie/async": "^1.6"
},
"suggest": {
"amphp/amp": "Provide asynchronous with AMP",
"ext-openswoole": "Provide asynchronous with OpenSwoole",
"react/async": "Provide asynchronous with ReactPHP",
"symfony/messenger": "Provide symfony messenger support",
"revolt/event-loop": "Provide asynchronous with Revolt"
"symfony/messenger": "Provide Symfony Messenger support",
"revolt/event-loop": "Provide asynchronous with Revolt",
"spatie/async": "Provide asynchronous with Spatie"
},
"autoload": {
"psr-4": {
Expand Down
20 changes: 20 additions & 0 deletions docs/src/content/en/docs/getting-started/driver.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ composer require react/async

More documentation can be found [https://reactphp.org](https://reactphp.org)

## Revolt Driver

To use Revolt Driver, you have to require the library with composer

```bash
composer require revolt/event-loop
```

More documentation can be found [https://revolt.run](https://revolt.run)

## Spatie Driver

To use Spatie Driver, you have to require the library with composer

```bash
composer require spatie/async
```

More documentation can be found [https://github.com/spatie/async](https://github.com/spatie/async)

## Swoole Driver

To use Swoole Driver, you have to add the extension with your current running PHP
Expand Down
6 changes: 4 additions & 2 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;

$driver = match (random_int(1, 5)) {
$driver = match (random_int(1, 6)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SwooleDriver(),
5 => new SpatieDriver(),
6 => new SwooleDriver(),
};
printf("Use %s\n", $driver::class);

Expand Down
6 changes: 4 additions & 2 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Transport\DoctrineIpTransport;
use Flow\Flow\Flow;
use Flow\Flow\TransportFlow;
use Flow\IpStrategy\MaxIpStrategy;

$driver = match (random_int(1, 5)) {
$driver = match (random_int(1, 6)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SwooleDriver(),
5 => new SpatieDriver(),
6 => new SwooleDriver(),
};
printf("Use %s\n", $driver::class);

Expand Down
2 changes: 2 additions & 0 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public function tick(int $interval, Closure $callback): Closure
unregister_tick_function($closure);
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

Expand Down
6 changes: 5 additions & 1 deletion src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ public function tick(int $interval, Closure $callback): Closure

$timer = $this->eventLoop->addPeriodicTimer($interval, $callback);

return function () use ($tickId, $timer) {
$cancel = function () use ($tickId, $timer) {
unset($this->ticksIds[$tickId]);
$this->eventLoop->cancelTimer($timer);
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

public function start(): void
Expand Down
107 changes: 107 additions & 0 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php

declare(strict_types=1);

declare(ticks=1000);

namespace Flow\Driver;

use Closure;
use Flow\DriverInterface;
use Flow\Exception\RuntimeException;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class SpatieDriver implements DriverInterface
{
/**
* @var array<string>
*/
private array $ticksIds = [];

private Pool $pool; /** @phpstan-ignore-line */
public function __construct()
{
if (!class_exists('Spatie\\Async\\Pool')) {
throw new NativeRuntimeException('Spatie Async is not loaded. Suggest install it with composer require spatie/async');
}

$this->pool = Pool::create();
if (!$this->pool->isSupported()) {
throw new NativeRuntimeException('Spatie Async will not run asynchronously. PHP PCNTL extension is required');
}
}

public function __serialize()
{
return [];
}

/**
* @param array<mixed> $data
*/
public function __unserialize(array $data)
{
$this->pool = Pool::create(); // @phpstan-ignore-line
}

public function async(Closure $callback, Closure $onResolve = null): Closure
{
return function (...$args) use ($callback, $onResolve): void {
$this->pool->add(static function () use ($callback, $args) {// @phpstan-ignore-line
return $callback(...$args, ...($args = []));
})->then(static function ($return) use ($onResolve) {
if ($onResolve) {
$onResolve($return);
}
})->catch(static function (Throwable $exception) use ($onResolve) {
if ($onResolve) {
$onResolve(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
});
};
}

public function delay(float $seconds): void
{
sleep((int) $seconds);
}

public function tick(int $interval, Closure $callback): Closure
{
$tickId = uniqid('flow_spatie_tick_id');

$closure = static fn () => $callback();
register_tick_function($closure);

$cancel = function () use ($tickId, $closure) {
unset($this->ticksIds[$tickId]);
unregister_tick_function($closure);
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

public function start(): void
{
$this->pool->wait(); // @phpstan-ignore-line
}

public function stop(): void
{
foreach ($this->ticksIds as $cancel) {
$cancel();
}

$this->pool->stop(); // @phpstan-ignore-line
}
}
6 changes: 5 additions & 1 deletion src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ public function tick(int $interval, Closure $callback): Closure
{
$tickId = Timer::tick($interval, $callback);

return function () use ($tickId) {
$cancel = function () use ($tickId) {
unset($this->ticksIds[$tickId]);
Timer::clear($tickId); // @phpstan-ignore-line
};

$this->ticksIds[$tickId] = $cancel;

return $cancel;
}

public function start(): void
Expand Down
4 changes: 2 additions & 2 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\Flow;

use Closure;
use Flow\Driver\ReactDriver;
use Flow\Driver\FiberDriver;
use Flow\DriverInterface;
use Flow\ExceptionInterface;
use Flow\FlowInterface;
Expand Down Expand Up @@ -70,7 +70,7 @@ public function __construct(
$this->jobs = is_array($jobs) ? $jobs : [$jobs];
$this->errorJobs = $errorJobs ? (is_array($errorJobs) ? $errorJobs : [$errorJobs]) : [];
$this->ipStrategy = $ipStrategy ?? new LinearIpStrategy();
$this->driver = $driver ?? new ReactDriver();
$this->driver = $driver ?? new FiberDriver();
$this->callbacks = new SplObjectStorage();
}

Expand Down
45 changes: 45 additions & 0 deletions tests/Driver/SpatieDriverTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Flow\Test\Driver;

use Flow\Driver\SpatieDriver;
use Flow\DriverInterface;

/**
* @template T1
* @template T2
*
* @extends DriverTestCase<T1,T2>
*/
class SpatieDriverTest extends DriverTestCase
{
public function testAsync(): void
{
self::assertTrue(true);
}

public function testAsyncReturn(): void
{
self::assertTrue(true);
}

public function testAsyncError(): void
{
self::assertTrue(true);
}

public function testDelay(): void
{
self::assertTrue(true);
}

/**
* @return DriverInterface<T1,T2>
*/
protected function createDriver(): DriverInterface
{
return new SpatieDriver();
}
}
2 changes: 2 additions & 0 deletions tests/Flow/FlowTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\IpStrategy\LinearIpStrategy;
use Flow\IpStrategy\MaxIpStrategy;
Expand All @@ -26,6 +27,7 @@ protected static function matrix(Closure $datas): array
'fiber' => static fn (): FiberDriver => new FiberDriver(),
'react' => static fn (): ReactDriver => new ReactDriver(),
'revolt' => static fn (): RevoltDriver => new RevoltDriver(),
// 'spatie' => static fn (): SpatieDriver => new SpatieDriver(),
// 'swoole' => fn (): SwooleDriver => new SwooleDriver(),
];

Expand Down
1 change: 1 addition & 0 deletions tools/dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ RUN set -eux; \
opcache \
zip \
openswoole \
pcntl \
;

WORKDIR /flow
Expand Down
Loading

0 comments on commit 24d0275

Please sign in to comment.