Skip to content

Commit

Permalink
🏗️ YFlow rework
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 8, 2024
1 parent ed4953d commit 7d0809e
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch
- YFlow rework

## v1.2.0

Expand Down
1 change: 1 addition & 0 deletions docs/src/content/en/docs/getting-started/ressources.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Video of Y-Combinator : [https://www.youtube.com/watch?v=QSS_ZcO8Q1g](https://ww
- Lambda calculus language explanation : [https://tgdwyer.github.io/lambdacalculus](https://tgdwyer.github.io/lambdacalculus)
- Combinator : [https://github.com/loophp/combinator](https://github.com/loophp/combinator)
- Lambda-php : [https://github.com/igorw/lambda-php](https://github.com/igorw/lambda-php)
- Deriving the y combinator in 7 easy steps : [https://gist.github.com/igstan/388351](https://gist.github.com/igstan/388351)

## Messaging approach with East oriented code from [Frédéric Hardy](https://twitter.com/mageekguy)

Expand Down
10 changes: 10 additions & 0 deletions examples/Data/YFlowData.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Data;

class YFlowData
{
public function __construct(public int $id, public ?int $number, public ?int $result = null) {}
}
203 changes: 203 additions & 0 deletions examples/yflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
<?php

declare(strict_types=1);

require __DIR__ . '/../vendor/autoload.php';

use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Data\YFlowData;
use Flow\Flow\Flow;
use Flow\Flow\YFlow;
use Flow\Ip;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
};
printf("Use %s\n", $driver::class);

function factorial(int $n): int
{
return ($n <= 1) ? 1 : $n * factorial($n - 1);
}

function Ywrap(callable $f, callable $wrapperFunc): Closure
{
$U = static fn ($x) => $x($x);

return $U(static fn ($x) => $f($wrapperFunc(static fn ($y) => $U($x)($y))));
}

function memoWrapperGenerator(callable $f): Closure
{
static $cache = [];

return static function ($y) use ($f, &$cache) {
if (!isset($cache[$y])) {
$cache[$y] = $f($y);
}

return $cache[$y];
};
}

function Ymemo(callable $f): Closure
{
return Ywrap($f, 'memoWrapperGenerator');
}

function factorialGen(callable $func): Closure
{
return static function (int $n) use ($func): int {
return ($n <= 1) ? 1 : $n * $func($n - 1);
};
}

function factorialYMemo(int $n): int
{
return Ymemo('factorialGen')($n);
}

/*
use Amp\Promise;
use Amp\Deferred;
function Ywrap(callable $f, callable $wrapperFunc) {
$U = fn($x) => $x($x);
return $U(fn($x) => $f($wrapperFunc(fn($y) => Promise\wait($U($x)($y)))));
}
function asyncWrapper(callable $f) {
return function($y) use ($f) {
$deferred = new Deferred();
$deferred->resolve($f($y)); // Resolve immediately
return $deferred->promise();
};
}
function Ymemo($f) {
return Ywrap($f, 'asyncWrapper');
}
function factorialGen(callable $func) {
return function (int $n) use ($func) {
$deferred = new Deferred();
$result = ($n <= 1) ? 1 : $n * Promise\wait($func($n - 1));
$deferred->resolve($result); // Resolve immediately
return $deferred->promise();
};
}
function factorialYMemo(int $n) {
return Promise\wait(Ymemo('factorialGen')($n));
}
// Usage
Amp\Loop::run(function() {
$result = factorialYMemo(5);
echo $result; // Expected: 120
});
*/

/*
use Amp\Promise;
class Flow {
// ... other combinators ...
public static function Y($f) {
return (function ($x) use ($f) {
return $f(function ($y) use ($x) {
return Promise\wait($x($x)($y));
});
})(function ($x) {
return $f(function ($y) use ($x) {
return Promise\wait($x($x)($y));
});
});
}
// ... rest of the class ...
}
use Amp\Loop;
Loop::run(function() {
$factorial = Flow::Y(function ($f) {
return function ($x) use ($f) {
return $x == 0 ? 1 : $x * $f($x - 1);
};
});
echo $factorial(5); // Outputs: 120
});
*/

// factorialYMemo(6) . ' ' . factorialYMemo(5);

$factorialJob = static function (YFlowData $data): YFlowData {
printf("*.. #%d - Job 1 : Calculating factorial(%d)\n", $data->id, $data->number);

// raw factorial calculation
$result = factorial($data->number);

printf("*.. #%d - Job 1 : Result for factorial(%d) = %d\n", $data->id, $data->number, $result);

return new YFlowData($data->id, $data->number);
};

$factorialYJobBefore = static function (YFlowData $data): YFlowData {
printf(".*. #%d - Job 2 : Calculating factorial(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$factorialYJob = static function ($factorial) {
return static function (YFlowData $data) use ($factorial): YFlowData {
return new YFlowData(
$data->id,
$data->number,
($data->result <= 1) ? 1 : $data->result * $factorial(new YFlowData($data->id, $data->number, $data->result - 1))->result
);
};
};

$factorialYJobAfter = static function (YFlowData $data): YFlowData {
printf(".*. #%d - Job 2 : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result);

return new YFlowData(5, 5);
};

$factorialYMemoJob = static function (YFlowData $data): YFlowData {
printf("..* #%d - Job 3 : Calculating factorialYMemo(%d)\n", $data->id, $data->number);

$result = factorialYMemo($data->number);

printf("..* #%d - Job 3 : Result for factorialYMemo(%d) = %d\n", $data->id, $data->number, $result);

return new YFlowData($data->id, $data->number);
};

$flow = Flow::do(static function () use ($factorialJob, $factorialYJobBefore, $factorialYJob, $factorialYJobAfter, $factorialYMemoJob) {
yield [$factorialJob];
yield [$factorialYJobBefore];
yield new YFlow($factorialYJob);
yield [$factorialYJobAfter];
yield [$factorialYMemoJob];
}, ['driver' => $driver]);

$ipPool = new SplObjectStorage();

for ($i = 5; $i <= 5; $i++) {
$ip = new Ip(new YFlowData($i, $i));
$flow($ip);
}

$flow->await();
2 changes: 1 addition & 1 deletion src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public function await(): void
* "driver"?: DriverInterface
* } $config
*/
private static function flowUnwrap($flow, ?array $config = null): self
private static function flowUnwrap($flow, ?array $config = null): FlowInterface
{
if ($flow instanceof Closure) {
return new self(...[...['job' => $flow], ...($config ?? [])]);
Expand Down
16 changes: 11 additions & 5 deletions src/Flow/YFlow.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
* @template T1
* @template T2
*
* @extends FlowDecorator<T1,T2>
* @extends Flow<T1,T2>
*/
class YFlow extends FlowDecorator
class YFlow extends Flow
{
/**
* @param null|Closure(ExceptionInterface): void $errorJob
* @param null|IpStrategyInterface<T1> $ipStrategy
* @param null|DriverInterface<T1,T2> $driver
*/
public function __construct(Closure $job, ?Closure $errorJob = null, ?IpStrategyInterface $ipStrategy = null, ?EventDispatcherInterface $dispatcher = null, ?AsyncHandlerInterface $asyncHandler = null, ?DriverInterface $driver = null)
{
public function __construct(
Closure $job,
?Closure $errorJob = null,
?IpStrategyInterface $ipStrategy = null,
?EventDispatcherInterface $dispatcher = null,
?AsyncHandlerInterface $asyncHandler = null,
?DriverInterface $driver = null
) {
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

parent::__construct(new Flow($Y($job), $errorJob, $ipStrategy, $dispatcher, $asyncHandler, $driver));
parent::__construct($Y($job), $errorJob, $ipStrategy, $dispatcher, $asyncHandler, $driver);
}
}
35 changes: 14 additions & 21 deletions tests/Flow/YFlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,16 @@ class YFlowTest extends TestCase
*/
public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy, Closure $job, int $resultNumber): void
{
self::assertTrue(true);

/*$ip = new Ip(new ArrayObject(['number' => 6]));
$ip = new Ip(new ArrayObject(['number' => 6]));
$errorJob = static function () {};
$yFlow = new YFlow($job, $errorJob, $ipStrategy, $driver);
($yFlow)($ip, static function (Ip $ip) use ($driver, $resultNumber) {
$driver->stop();
self::assertSame(ArrayObject::class, $ip->data::class);
self::assertSame($resultNumber, $ip->data['number']);
});
$driver->start();*/
$yFlow = (new YFlow($job, $errorJob, $ipStrategy, null, null, $driver))
->fn(static function (ArrayObject $data) use ($resultNumber) {
self::assertSame($resultNumber, $data['number']);
})
;
$yFlow($ip);

$yFlow->await();
}

/**
Expand All @@ -49,15 +46,11 @@ public function testJob(DriverInterface $driver, IpStrategyInterface $ipStrategy
public static function provideJobCases(): iterable
{
return self::matrix(static fn () => [
'job' => [static function (callable $function): Closure {
return static function (ArrayObject $data) use ($function) {
if ($data['number'] > 1) {
$calcData = new ArrayObject(['number' => $data['number'] - 1]);
$function($calcData);
$data['number'] *= $calcData['number'];
} else {
$data['number'] = 1;
}
'job' => [static function (callable $factorial): Closure {
return static function (ArrayObject $data) use ($factorial) {
return new ArrayObject([
'number' => ($data['number'] <= 1) ? 1 : $data['number'] * $factorial(new ArrayObject(['number' => $data['number'] - 1]))['number'],
]);
};
}, 720],
]);
Expand Down

0 comments on commit 7d0809e

Please sign in to comment.