Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add generics templating #34

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v1.1.x

- Add generic templating
- Add Flow\Driver\RevoltDriver
- Add more quality tools from https://github.com/IngeniozIT/php-skeleton

Expand Down Expand Up @@ -90,4 +91,4 @@

## v1.0.0

- Initial release
- Initial release
12 changes: 12 additions & 0 deletions examples/Data.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Flow\Examples;

class Data
{
public function __construct(public int $id, public ?int $number)
{
}
}
42 changes: 25 additions & 17 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Flow\Driver\ReactDriver;
use Flow\Driver\RevoltDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
Expand All @@ -22,54 +24,60 @@
};
printf("Use %s\n", $driver::class);

$job1 = static function (object $data) use ($driver): void {
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data['id'], $data['number'], $data['number']);
$job1 = static function (Data $data) use ($driver): void {
printf("*. #%d - Job 1 : Calculating %d + %d\n", $data->id, $data->number, $data->number);

// simulating calculating some "light" operation from 0.1 to 1 seconds
$delay = random_int(1, 10) / 10;
$driver->delay($delay);
$result = $data['number'];
$result = $data->number;
$result += $result;

// simulating 1 chance on 5 to produce an exception from the "light" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job1"');
}

printf("*. #%d - Job 1 : Result for %d + %d = %d and took %.01f seconds\n", $data['id'], $data['number'], $data['number'], $result, $delay);
printf("*. #%d - Job 1 : Result for %d + %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);

$data['number'] = $result;
$data->number = $result;
};

$job2 = static function (object $data) use ($driver): void {
printf(".* #%d - Job 2 : Calculating %d * %d\n", $data['id'], $data['number'], $data['number']);
$job2 = static function (Data $data) use ($driver): void {
printf(".* #%d - Job 2 : Calculating %d * %d\n", $data->id, $data->number, $data->number);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$result = $data['number'];
$result = $data->number;
$result *= $result;

// simulating 1 chance on 5 to produce an exception from the "heavy" operation
if (1 === random_int(1, 5)) {
throw new Error('Failure when processing "Job2"');
}

printf(".* #%d - Job 2 : Result for %d * %d = %d and took %.01f seconds\n", $data['id'], $data['number'], $data['number'], $result, $delay);
printf(".* #%d - Job 2 : Result for %d * %d = %d and took %.01f seconds\n", $data->id, $data->number, $data->number, $result, $delay);

$data['number'] = $result;
$data->number = $result;
};

$errorJob1 = static function (object $data, Throwable $exception): void {
printf("*. #%d - Error Job : Exception %s\n", $data['id'], $exception->getMessage());
/**
* @param Ip<Data> $ip
*/
$errorJob1 = static function (Ip $ip, ExceptionInterface $exception): void {
printf("*. #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$data['number'] = null;
$ip->data->number = null;
};

$errorJob2 = static function (object $data, Throwable $exception): void {
printf(".* #%d - Error Job : Exception %s\n", $data['id'], $exception->getMessage());
/**
* @param Ip<Data> $ip
*/
$errorJob2 = static function (Ip $ip, ExceptionInterface $exception): void {
printf(".* #%d - Error Job : Exception %s\n", $ip->data->id, $exception->getMessage());

$data['number'] = null;
$ip->data->number = null;
};

$flow = (new Flow($job1, $errorJob1, new MaxIpStrategy(2), $driver))
Expand All @@ -79,6 +87,6 @@
$ipPool = new SplObjectStorage();

for ($i = 1; $i <= 5; $i++) {
$ip = new Ip(new ArrayObject(['id' => $i, 'number' => $i]));
$ip = new Ip(new Data($i, $i));
$flow($ip, static fn ($ip) => $ipPool->offsetUnset($ip));
}
6 changes: 6 additions & 0 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
use function Amp\delay;
use function function_exists;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class AmpDriver implements DriverInterface
{
public function __construct()
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
use Flow\Exception\RuntimeException;
use Throwable;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class FiberDriver implements DriverInterface
{
/**
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
use function React\Async\async;
use function React\Async\delay;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class ReactDriver implements DriverInterface
{
private LoopInterface $eventLoop;
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/RevoltDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
use RuntimeException as NativeRuntimeException;
use Throwable;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class RevoltDriver implements DriverInterface
{
private int $counter = 0;
Expand Down
6 changes: 6 additions & 0 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

use function extension_loaded;

/**
* @template TArgs
* @template TReturn
*
* @implements DriverInterface<TArgs,TReturn>
*/
class SwooleDriver implements DriverInterface
{
public function __construct()
Expand Down
13 changes: 10 additions & 3 deletions src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@

use Closure;

/**
* @template TArgs TArgs is supposed to be list of generic templating arguments https://github.com/phpstan/phpstan/issues/6873
* @template TReturn
*/
interface DriverInterface
{
/**
* @param Closure $onResolve called on resolved and first argument is $callback return or Flow\Exception on Exception
* #return Closure(TArgs): void when called this start async $callback.
*
* @return Closure when called, this start async $callback
* @param Closure(TArgs): TReturn $callback
* @param null|Closure(ExceptionInterface|TReturn): void $onResolve
*/
public function async(Closure $callback, Closure $onResolve = null): Closure;

public function delay(float $seconds): void;

/**
* @return Closure when called, this cleanup tick interval
* @param Closure(): void $callback
*
* @return Closure(): void when called, this cleanup tick interval
*/
public function tick(int $interval, Closure $callback): Closure;
}
1 change: 1 addition & 0 deletions src/Exception/RuntimeException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\Exception;

use Flow\ExceptionInterface;
use RuntimeException as NativeRuntimeException;

class RuntimeException extends NativeRuntimeException implements ExceptionInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Flow\Exception;
namespace Flow;

use Throwable;

Expand Down
47 changes: 36 additions & 11 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Closure;
use Flow\Driver\ReactDriver;
use Flow\DriverInterface;
use Flow\Exception\RuntimeException;
use Flow\ExceptionInterface;
use Flow\FlowInterface;
use Flow\Ip;
use Flow\IpStrategy\LinearIpStrategy;
Expand All @@ -17,31 +17,49 @@
use function count;
use function is_array;

/**
* @template T1
* @template T2
*
* @implements FlowInterface<T1>
*/
class Flow implements FlowInterface
{
/**
* @var array<Closure>
* @var array<Closure(T1): T2>
*/
private array $jobs;

/**
* @var array<Closure>
* @var array<Closure(Ip<T1>, ExceptionInterface): void>
*/
private array $errorJobs;

/**
* @var IpStrategyInterface<T1>
*/
private IpStrategyInterface $ipStrategy;

/**
* @var DriverInterface<T1,T2>
*/
private DriverInterface $driver;

/**
* @var SplObjectStorage<Ip, mixed>
* @var SplObjectStorage<Ip<T1>, null|Closure(Ip<T1>): void>
*/
private SplObjectStorage $callbacks;

/**
* @var null|FlowInterface<T2>
*/
private ?FlowInterface $fnFlow = null;

/**
* @param array<Closure>|Closure $jobs
* @param array<Closure>|Closure $errorJobs
* @param array<Closure(T1): T2>|Closure(T1): T2 $jobs
* @param array<Closure(Ip<T1>, ExceptionInterface): void>|Closure(Ip<T1>, ExceptionInterface): void $errorJobs
* @param null|IpStrategyInterface<T1> $ipStrategy
* @param null|DriverInterface<T1,T2> $driver
*/
public function __construct(
Closure|array $jobs,
Expand All @@ -63,6 +81,11 @@ public function __invoke(Ip $ip, Closure $callback = null): void
$this->nextIpJob();
}

/**
* @param FlowInterface<T2> $flow
*
* @return FlowInterface<T1>
*/
public function fn(FlowInterface $flow): FlowInterface
{
if ($this->fnFlow) {
Expand All @@ -86,19 +109,21 @@ private function nextIpJob(): void

$count = count($this->jobs);
foreach ($this->jobs as $i => $job) {
$this->driver->async($job, function (mixed $value) use ($ip, &$count, $i, $callback) {
$this->driver->async($job, function ($value) use ($ip, &$count, $i, $callback) {
$count--;
if ($count === 0 || $value instanceof RuntimeException) {
if ($count === 0 || $value instanceof ExceptionInterface) {
$count = 0;
$this->ipStrategy->done($ip);
$this->nextIpJob();

if ($value instanceof RuntimeException) {
if ($value instanceof ExceptionInterface) {
if (isset($this->errorJobs[$i])) {
$this->errorJobs[$i]($ip->data, $value->getPrevious());
$this->errorJobs[$i]($ip, $value);
} else {
throw $value->getPrevious();
throw $value;
}

return;
}

if ($this->fnFlow) {
Expand Down
17 changes: 17 additions & 0 deletions src/Flow/FlowDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,34 @@
use Flow\FlowInterface;
use Flow\Ip;

/**
* @template T1
* @template T2
*
* @implements FlowInterface<T1>
*/
abstract class FlowDecorator implements FlowInterface
{
/**
* @param FlowInterface<T1> $flow
*/
public function __construct(private FlowInterface $flow)
{
}

/**
* @param Ip<T1> $ip
*/
public function __invoke(Ip $ip, Closure $callback = null): void
{
($this->flow)($ip, $callback);
}

/**
* @param FlowInterface<T2> $flow
*
* @return FlowInterface<T1>
*/
public function fn(FlowInterface $flow): FlowInterface
{
return $this->flow->fn($flow);
Expand Down
Loading