Skip to content

Commit

Permalink
💫 Update DriverInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Mar 25, 2024
1 parent a27ed4f commit e7dd0fe
Show file tree
Hide file tree
Showing 41 changed files with 6,119 additions and 1,550 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Changelog

## v1.1.6
## v1.2.0

- Add event system for processing IpStrategy
- Remove `start` and `stop` in favor of `await` for Flow\DriverInterface

## v1.1.5

Expand All @@ -18,7 +19,6 @@
## v1.1.4

- Add generic templating
- Add Flow\Driver\RevoltDriver
- Add Flow\Driver\SpatieDriver
- Add more quality tools from https://github.com/IngeniozIT/php-skeleton

Expand Down
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Flow concept aims to solve

## Installation

PHP 8.2 is the minimal version to use Flow
PHP 8.2 is the minimal version to use Flow
The recommended way to install it through [Composer](http://getcomposer.org) and execute

```bash
Expand All @@ -37,21 +37,25 @@ class D2 {
public function __construct(public int $n2) {}
}

class D3 {
public function __construct(public int $n3) {}
}

$flow = Flow::do(static function() {
yield fn (D1 $data1) => new D2($data1->n1 += 1);
yield fn (D2 $data2) => $data2->n2 * 2;
yield fn (D2 $data2) => new D3($data2->n2 * 2);
});

$ip = new Ip(new D1(4));
$flow($ip, fn ($ip) => printf("my number %d\n", $ip->data->n2)); // display 'my number 10'
$flow($ip, fn ($ip) => printf("my number %d\n", $ip->data->n3)); // display 'my number 10'
```

## Examples

A working script is available in the bundled `examples` directory

- Run Flow : `php examples/flow.php`
- Start Server : `php examples/server.php`
- Start Server : `php examples/server.php`
Start Client(s) : `php examples/client.php`

## Documentation
Expand Down
12 changes: 5 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,23 @@
"php"
],
"require": {
"php": ">=8.2",
"symfony/event-dispatcher": "^6.3"
"php": ">=8.3",
"symfony/event-dispatcher": "^7.0"
},
"require-dev": {
"amphp/amp": "^3.0",
"openswoole/ide-helper": "^22.0.1",
"openswoole/ide-helper": "^22.1.5",
"react/async": "^4.2",
"spatie/async": "^1.6",
"symfony/doctrine-messenger": "^7.0",
"symfony/messenger": "^7.0",
"symfony/orm-pack": "^2.4",
"revolt/event-loop": "^1.0.6",
"spatie/async": "^1.6"
"symfony/orm-pack": "^2.4"
},
"suggest": {
"amphp/amp": "Provide asynchronous with AMP",
"ext-openswoole": "Provide asynchronous with OpenSwoole",
"react/async": "Provide asynchronous with ReactPHP",
"symfony/messenger": "Provide Symfony Messenger support",
"revolt/event-loop": "Provide asynchronous with Revolt",
"spatie/async": "Provide asynchronous with Spatie"
},
"autoload": {
Expand Down
24 changes: 7 additions & 17 deletions docs/src/content/en/docs/getting-started/driver.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ Flow embark multiple drivers.

## Coroutine

Drivers are useful to essentially provide asynchronous programming by using [coroutines](https://en.wikipedia.org/wiki/Coroutine).
Drivers are useful to essentially provide asynchronous programming by using [coroutines](https://en.wikipedia.org/wiki/Coroutine).
Thus, this can be implemented in severals ways in most popular programming languages.

Coroutine are very similar to [threads](https://en.wikipedia.org/wiki/Thread_(computing)) and provide concurrency but not parallelism.
Advantage of using coroutine :
- this can be a preferred usage to thread for [hard-realtime](https://en.wikipedia.org/wiki/Real-time_computing#Hard) context.
- there is no need for synchronisation primitives such as mutexes, semaphore.
- it reduces the usage of system lock for sharing resources.
Coroutine are very similar to [threads](https://en.wikipedia.org/wiki/Thread_(computing)) and provide concurrency but not parallelism.
Advantage of using coroutine :
- this can be a preferred usage to thread for [hard-realtime](https://en.wikipedia.org/wiki/Real-time_computing#Hard) context.
- there is no need for synchronisation primitives such as mutexes, semaphore.
- it reduces the usage of system lock for sharing resources.

## Amp Driver

Expand All @@ -52,16 +52,6 @@ composer require react/async

More documentation can be found [https://reactphp.org](https://reactphp.org)

## Revolt Driver

To use Revolt Driver, you have to require the library with composer

```bash
composer require revolt/event-loop
```

More documentation can be found [https://revolt.run](https://revolt.run)

## Spatie Driver

To use Spatie Driver, you have to require the library with composer
Expand All @@ -84,4 +74,4 @@ More documentation can be found [https://openswoole.com](https://openswoole.com)

## Make your custom driver

You can make your custom driver by implementing `Flow\DriverInterface`
You can make your custom driver by implementing `Flow\DriverInterface`
12 changes: 0 additions & 12 deletions examples/Data.php

This file was deleted.

12 changes: 12 additions & 0 deletions examples/Data/DataA.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

readonly class DataA
{
public function __construct(public int $id, public int $a, public int $b, public int $c)
{
}
}
12 changes: 12 additions & 0 deletions examples/Data/DataB.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

readonly class DataB
{
public function __construct(public int $id, public int $d, public int $e)
{
}
}
12 changes: 12 additions & 0 deletions examples/Data/DataC.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

readonly class DataC
{
public function __construct(public int $id, public int $f)
{
}
}
12 changes: 12 additions & 0 deletions examples/Data/DataD.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

readonly class DataD
{
public function __construct(public int $id)
{
}
}
95 changes: 51 additions & 44 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,97 +7,104 @@
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data;
use Flow\Examples\Data\DataA;
use Flow\Examples\Data\DataB;
use Flow\Examples\Data\DataC;
use Flow\Examples\Data\DataD;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;

$driver = match (random_int(1, 6)) {
$driver = match (random_int(4, 4)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SpatieDriver(),
6 => new SwooleDriver(),
4 => new SpatieDriver(),
5 => new SwooleDriver(),
};
printf("Use %s\n", $driver::class);
printf("Calculating:\n");
printf("- DataA(a, b, c): Job1((DataA->a + DataA->b))\n");
printf("- DataB(d, e): Job2(DataB->d * DataB->e)\n");
printf("- DataC(f)\n", $driver::class);

$job1 = static function (Data $data) use ($driver): void {
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data->id, $data->number, $data->number);
$job1 = static function (DataA $dataA) use ($driver): DataB {
printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b);

// simulating calculating some "light" operation from 0.1 to 1 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$result = $data->number;
$result += $result;
$d = $dataA->a + $dataA->b;

// simulating 1 chance on 5 to produce an exception from the "light" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job1"');
throw new Error(sprintf('#%d - Failure when processing Job1', $dataA->id));
}

printf("*. #%d - Job 1 : Result for %d + %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);
printf("*. #%d - Job 1 Result for %d + %d = %d and took %.01f seconds\n", $dataA->id, $dataA->a, $dataA->b, $d, $delay);

$data->number = $result;
return new DataB($dataA->id, $d, $dataA->c);
};

$job2 = static function (Data $data) use ($driver): void {
printf(".* #%d - Job 2 : Calculating %d * %d\n", $data->id, $data->number, $data->number);
$job2 = static function (DataB $dataB) use ($driver): DataC {
printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$result = $data->number;
$result *= $result;
$f = $dataB->d * $dataB->e;

// simulating 1 chance on 5 to produce an exception from the "heavy" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job2"');
throw new Error(sprintf('#%d - Failure when processing Job2', $dataB->id));
}

printf(".* #%d - Job 2 : Result for %d * %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);
printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay);

$data->number = $result;
return new DataC($dataB->id, $f);
};

$job3 = static function (DataC $dataC): DataD {
printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f);

return new DataD($dataC->id);
};

/**
* @param Ip<Data> $ip
*/
$errorJob1 = static function (Ip $ip, ExceptionInterface $exception): void {
printf("*. #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$ip->data->number = null;
$errorJob1 = static function (ExceptionInterface $exception): void {
printf("*. %s\n", $exception->getMessage());
};

/**
* @param Ip<Data> $ip
*/
$errorJob2 = static function (Ip $ip, ExceptionInterface $exception): void {
printf(".* #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$ip->data->number = null;
$errorJob2 = static function (ExceptionInterface $exception): void {
printf(".* %s\n", $exception->getMessage());
};

$flow = Flow::do(static function () use ($job1, $job2, $errorJob1, $errorJob2) {
yield [$job1, $errorJob1, new MaxIpStrategy(2)];
yield [$job2, $errorJob2, new MaxIpStrategy(2)];
}, ['driver' => $driver]);

$ipPool = new SplObjectStorage();
echo "begin - synchronous\n";
$asyncTask = static function ($job1, $job2, $job3, $errorJob1, $errorJob2, $driver) {
echo "begin - flow asynchronous\n";

for ($i = 1; $i <= 5; $i++) {
$ip = new Ip(new Data($i, $i));
$ipPool->offsetSet($ip, true);
$flow($ip, static fn ($ip) => $ipPool->offsetUnset($ip));
}
$flow = Flow::do(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
yield [$job1, $errorJob1, new MaxIpStrategy(2)];
yield [$job2, $errorJob2, new MaxIpStrategy(2)];
yield $job3;
}, ['driver' => $driver]);

$driver->tick(1, static function () use ($driver, $ipPool) {
if ($ipPool->count() === 0) {
$driver->stop();
for ($id = 1; $id <= 5; $id++) {
$ip = new Ip(new DataA($id, random_int(1, 10), random_int(1, 10), random_int(1, 5)));
$flow($ip);
}
});
$driver->start();
$flow->await();

echo "ended - flow asynchronous\n";
};
$asyncTask($job1, $job2, $job3, $errorJob1, $errorJob2, $driver);
echo "ended - synchronous\n";
echo 'maths - 4 + 4 = ' . (4 + 4) . "\n";
11 changes: 5 additions & 6 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Transport\DoctrineIpTransport;
Expand All @@ -21,7 +20,6 @@
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new RevoltDriver(),
5 => new SpatieDriver(),
6 => new SwooleDriver(),
};
Expand Down Expand Up @@ -67,10 +65,11 @@
$ip->data->offsetSet('number', null);
};

$flow = (new Flow($addOneJob, $errorJob, new MaxIpStrategy(1), $driver))
->fn(new Flow($multbyTwoJob, $errorJob, new MaxIpStrategy(3), $driver))
->fn(new Flow($minusThreeJob, $errorJob, new MaxIpStrategy(2), $driver))
;
$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
yield [$addOneJob, $errorJob, new MaxIpStrategy(1)];
yield [$multbyTwoJob, $errorJob, new MaxIpStrategy(3)];
yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)];
}, ['driver' => $driver]);

$connection = DriverManager::getConnection(['url' => 'mysql://flow:[email protected]:3306/flow?serverVersion=8.1']);
$transport = new DoctrineIpTransport($connection);
Expand Down
Loading

0 comments on commit e7dd0fe

Please sign in to comment.