Skip to content

Commit

Permalink
✨ Add generics templating
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 10, 2023
1 parent da6d3f8 commit b3e525d
Show file tree
Hide file tree
Showing 31 changed files with 332 additions and 42 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v1.1.x

- Add generics templating
- Add Flow\Driver\RevoltDriver
- Add more quality tools from https://github.com/IngeniozIT/php-skeleton

Expand Down Expand Up @@ -90,4 +91,4 @@

## v1.0.0

- Initial release
- Initial release
12 changes: 12 additions & 0 deletions examples/Data.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples;

class Data
{
public function __construct(public int $id, public ?int $number)
{
}
}
42 changes: 25 additions & 17 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
Expand All @@ -22,54 +24,60 @@
};
printf("Use %s\n", $driver::class);

$job1 = static function (object $data) use ($driver): void {
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data['id'], $data['number'], $data['number']);
$job1 = static function (Data $data) use ($driver): void {
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data->id, $data->number, $data->number);

// simulating calculating some "light" operation from 0.1 to 1 seconds
$delay = random_int(1, 10) / 10;
$driver->delay($delay);
$result = $data['number'];
$result = $data->number;
$result += $result;

// simulating 1 chance on 5 to produce an exception from the "light" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job1"');
}

printf("*. #%d - Job 1 : Result for %d + %d = %d and took %.01f seconds\n", $data['id'], $data['number'], $data['number'], $result, $delay);
printf("*. #%d - Job 1 : Result for %d + %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);

$data['number'] = $result;
$data->number = $result;
};

$job2 = static function (object $data) use ($driver): void {
printf(".* #%d - Job 2 : Calculating %d * %d\n", $data['id'], $data['number'], $data['number']);
$job2 = static function (Data $data) use ($driver): void {
printf(".* #%d - Job 2 : Calculating %d * %d\n", $data->id, $data->number, $data->number);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$result = $data['number'];
$result = $data->number;
$result *= $result;

// simulating 1 chance on 5 to produce an exception from the "heavy" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job2"');
}

printf(".* #%d - Job 2 : Result for %d * %d = %d and took %.01f seconds\n", $data['id'], $data['number'], $data['number'], $result, $delay);
printf(".* #%d - Job 2 : Result for %d * %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);

$data['number'] = $result;
$data->number = $result;
};

$errorJob1 = static function (object $data, Throwable $exception): void {
printf("*. #%d - Error Job : Exception %s\n", $data['id'], $exception->getMessage());
/**
* @param Ip<Data> $ip
*/
$errorJob1 = static function (Ip $ip, ExceptionInterface $exception): void {
printf("*. #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$data['number'] = null;
$ip->data->number = null;
};

$errorJob2 = static function (object $data, Throwable $exception): void {
printf(".* #%d - Error Job : Exception %s\n", $data['id'], $exception->getMessage());
/**
* @param Ip<Data> $ip
*/
$errorJob2 = static function (Ip $ip, ExceptionInterface $exception): void {
printf(".* #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$data['number'] = null;
$ip->data->number = null;
};

$flow = (new Flow($job1, $errorJob1, new MaxIpStrategy(2), $driver))
Expand All @@ -79,6 +87,6 @@
$ipPool = new SplObjectStorage();

for ($i = 1; $i <= 5; $i++) {
$ip = new Ip(new ArrayObject(['id' => $i, 'number' => $i]));
$ip = new Ip(new Data($i, $i));
$flow($ip, static fn ($ip) => $ipPool->offsetUnset($ip));
}
6 changes: 6 additions & 0 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
use function Amp\delay;
use function function_exists;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class AmpDriver implements DriverInterface
{
public function __construct()
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
use Flow\Exception\RuntimeException;
use Throwable;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class FiberDriver implements DriverInterface
{
/**
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
use function React\Async\async;
use function React\Async\delay;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class ReactDriver implements DriverInterface
{
private LoopInterface $eventLoop;
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/RevoltDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
use RuntimeException as NativeRuntimeException;
use Throwable;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class RevoltDriver implements DriverInterface
{
private int $counter = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

use function extension_loaded;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class SwooleDriver implements DriverInterface
{
public function __construct()
Expand Down
13 changes: 10 additions & 3 deletions src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@

use Closure;

/**
* @template TArgs TArgs is supposed to be list of generic templating arguments https://github.com/phpstan/phpstan/issues/6873
* @template TReturn
*/
interface DriverInterface
{
/**
* @param Closure $onResolve called on resolved and first argument is $callback return or Flow\Exception on Exception
* #return Closure(TArgs): void when called this start async $callback.
*
* @return Closure when called, this start async $callback
* @param Closure(TArgs): TReturn $callback
* @param null|Closure(ExceptionInterface|TReturn): void $onResolve
*/
public function async(Closure $callback, Closure $onResolve = null): Closure;

public function delay(float $seconds): void;

/**
* @return Closure when called, this cleanup tick interval
* @param Closure(): void $callback
*
* @return Closure(): void when called, this cleanup tick interval
*/
public function tick(int $interval, Closure $callback): Closure;
}
1 change: 1 addition & 0 deletions src/Exception/RuntimeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\Exception;

use Flow\ExceptionInterface;
use RuntimeException as NativeRuntimeException;

class RuntimeException extends NativeRuntimeException implements ExceptionInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Flow\Exception;
namespace Flow;

use Throwable;

Expand Down
47 changes: 36 additions & 11 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Closure;
use Flow\Driver\ReactDriver;
use Flow\DriverInterface;
use Flow\Exception\RuntimeException;
use Flow\ExceptionInterface;
use Flow\FlowInterface;
use Flow\Ip;
use Flow\IpStrategy\LinearIpStrategy;
Expand All @@ -17,31 +17,49 @@
use function count;
use function is_array;

/**
* @template T1
* @template T2
*
* @implements FlowInterface<T1>
*/
class Flow implements FlowInterface
{
/**
* @var array<Closure>
* @var array<Closure(T1): T2>
*/
private array $jobs;

/**
* @var array<Closure>
* @var array<Closure(Ip<T1>, ExceptionInterface): void>
*/
private array $errorJobs;

/**
* @var IpStrategyInterface<T1>
*/
private IpStrategyInterface $ipStrategy;

/**
* @var DriverInterface<T1,T2>
*/
private DriverInterface $driver;

/**
* @var SplObjectStorage<Ip, mixed>
* @var SplObjectStorage<Ip<T1>, null|Closure(Ip<T1>): void>
*/
private SplObjectStorage $callbacks;

/**
* @var null|FlowInterface<T2>
*/
private ?FlowInterface $fnFlow = null;

/**
* @param array<Closure>|Closure $jobs
* @param array<Closure>|Closure $errorJobs
* @param array<Closure(T1): T2>|Closure(T1): T2 $jobs
* @param array<Closure(Ip<T1>, ExceptionInterface): void>|Closure(Ip<T1>, ExceptionInterface): void $errorJobs
* @param null|IpStrategyInterface<T1> $ipStrategy
* @param null|DriverInterface<T1,T2> $driver
*/
public function __construct(
Closure|array $jobs,
Expand All @@ -63,6 +81,11 @@ public function __invoke(Ip $ip, Closure $callback = null): void
$this->nextIpJob();
}

/**
* @param FlowInterface<T2> $flow
*
* @return FlowInterface<T1>
*/
public function fn(FlowInterface $flow): FlowInterface
{
if ($this->fnFlow) {
Expand All @@ -86,19 +109,21 @@ private function nextIpJob(): void

$count = count($this->jobs);
foreach ($this->jobs as $i => $job) {
$this->driver->async($job, function (mixed $value) use ($ip, &$count, $i, $callback) {
$this->driver->async($job, function ($value) use ($ip, &$count, $i, $callback) {
$count--;
if ($count === 0 || $value instanceof RuntimeException) {
if ($count === 0 || $value instanceof ExceptionInterface) {
$count = 0;
$this->ipStrategy->done($ip);
$this->nextIpJob();

if ($value instanceof RuntimeException) {
if ($value instanceof ExceptionInterface) {
if (isset($this->errorJobs[$i])) {
$this->errorJobs[$i]($ip->data, $value->getPrevious());
$this->errorJobs[$i]($ip, $value);
} else {
throw $value->getPrevious();
throw $value;
}

return;
}

if ($this->fnFlow) {
Expand Down
17 changes: 17 additions & 0 deletions src/Flow/FlowDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,34 @@
use Flow\FlowInterface;
use Flow\Ip;

/**
* @template T1
* @template T2
*
* @implements FlowInterface<T1>
*/
abstract class FlowDecorator implements FlowInterface
{
/**
* @param FlowInterface<T1> $flow
*/
public function __construct(private FlowInterface $flow)
{
}

/**
* @param Ip<T1> $ip
*/
public function __invoke(Ip $ip, Closure $callback = null): void
{
($this->flow)($ip, $callback);
}

/**
* @param FlowInterface<T2> $flow
*
* @return FlowInterface<T1>
*/
public function fn(FlowInterface $flow): FlowInterface
{
return $this->flow->fn($flow);
Expand Down
Loading

0 comments on commit b3e525d

Please sign in to comment.