Skip to content

Commit

Permalink
Try wrapWithDeferred
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 11, 2024
1 parent d0903c6 commit f6d2a77
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 16 deletions.
69 changes: 69 additions & 0 deletions examples/ampYFlow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

declare(strict_types=1);

require __DIR__ . '/../vendor/autoload.php';

use Amp\DeferredFuture;
use Amp\Future;
use Revolt\EventLoop;

use function Amp\async;

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

function wrapWithDeferred(Closure $job): Future

Check failure on line 17 in examples/ampYFlow.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function wrapWithDeferred() return type with generic class Amp\Future does not specify its types: T
{
$deferred = new DeferredFuture();

// Queue the operation to be executed in the event loop
EventLoop::queue(static function () use ($job, $deferred) {
$job(static function ($value) use ($deferred) {
$deferred->complete($value);
}, static function (Future $future, $next) {
$future->map($next);

Check failure on line 26 in examples/ampYFlow.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Unable to resolve the template type Tr in call to method Amp\Future<mixed>::map()
});
});

return $deferred->getFuture();
}

/*function factorialGen(callable $func): Closure
{
return static function (int $n) use ($func): int {
return ($n <= 1) ? 1 : $n * $func($n - 1);
};
}*/

// Define the async factorial function using the Y-Combinator
$asyncFactorial = $Y(static function ($factorial) {
return static function ($n) use ($factorial): Future {
return wrapWithDeferred(static function ($complete, $async) use ($n, $factorial) {
if ($n <= 1) {
$complete(1);
} else {
$async($factorial($n - 1), static function ($result) use ($n, $complete) {
$complete($n * $result);
});
}
});
};
});

// The main loop that runs the async task
$loop = static function () use ($asyncFactorial) {
$future = $asyncFactorial(5);

// Use the map method to handle the result when it's ready
$future->map(static function ($result) {
echo 'Factorial: ' . $result . PHP_EOL;
});
};

// Defer the loop execution to run after the event loop starts
EventLoop::defer($loop);

// Run the event loop to process async tasks
EventLoop::run();
73 changes: 73 additions & 0 deletions examples/ampYFlowDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

declare(strict_types=1);

require __DIR__ . '/../vendor/autoload.php';

use Amp\DeferredFuture;
use Amp\Future;
use Flow\AsyncHandler\AsyncHandler;
use Flow\AsyncHandler\YAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Event;
use Flow\Examples\Data\YFlowData;
use Flow\Flow\Flow;
use Flow\Ip;
use Revolt\EventLoop;

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

function wrapWithDeferred(Closure $job): Future

Check failure on line 22 in examples/ampYFlowDriver.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function wrapWithDeferred() return type with generic class Amp\Future does not specify its types: T
{
$deferred = new DeferredFuture();

// Queue the operation to be executed in the event loop
EventLoop::queue(static function () use ($job, $deferred) {
try {
$job(static function ($return) use ($deferred) {
$deferred->complete($return);
}, static function (Future $future, $next) {
$future->map($next);

Check failure on line 32 in examples/ampYFlowDriver.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Unable to resolve the template type Tr in call to method Amp\Future<mixed>::map()
});
} catch (Throwable $exception) {
$deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
});

return $deferred->getFuture();
}

$asyncFactorial = $Y(static function ($factorial) {
return static function (YFlowData $data) use ($factorial) {
return wrapWithDeferred(static function ($complete, $async) use ($data, $factorial) {
if ($data->result <= 1) {
$complete(new YFlowData($data->id, $data->number, 1));
} else {
$async($factorial(new YFlowData($data->id, $data->number, $data->result - 1)), static function ($resultData) use ($data, $complete) {
$complete(new YFlowData($data->id, $data->number, $data->result * $resultData->result));
});
}
});
};
});

$factorialYJobAfter = static function (YFlowData $data): Future {
return wrapWithDeferred(static function ($complete) use ($data) {
printf("* #%d - Job : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result);

$complete(new YFlowData($data->id, $data->number));
});
};

$driver = new AmpDriver();

$flow = (new Flow($asyncFactorial, null, null, null, new YAsyncHandler(), $driver))
->fn(new Flow($factorialYJobAfter, null, null, null, new AsyncHandler(), $driver))
;

$ip = new Ip(new YFlowData(5, 5, 5));
$flow($ip);

$flow->await();
61 changes: 55 additions & 6 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

require __DIR__ . '/../vendor/autoload.php';

use Amp\Future;
use Flow\AsyncHandler\YAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
Expand All @@ -15,7 +16,57 @@
use Flow\Flow\YFlow;
use Flow\Ip;

$driver = match (random_int(1, 4)) {
use function Amp\async;

$U = static fn ($f) => $f($f);
$Y = static fn (callable $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

$asyncFactorial = $Y(static function ($fact) {
return static function ($n) use ($fact): Future {
return async(static function () use ($n, $fact) {
if ($n <= 1) {
return 1;
}

$result = yield $fact($n - 1);

return $n * $result;
});
};
});

$future = $asyncFactorial(5);

$future->map(static function ($result) {
echo 'Factorial: ' . $result . PHP_EOL;
});

// $driver = new AmpDriver();
// $driver->

/*$factorialYJob = static function ($factorial) {
return static function (YFlowData $data) use ($factorial): YFlowData {
return new YFlowData(
$data->id,
$data->number,
($data->result <= 1) ? 1 : $data->result * $factorial(new YFlowData($data->id, $data->number, $data->result - 1))->result
);
};
};
$flow = (new Flow($factorialYJob, null, null, null, new YAsyncHandler(), $driver))
->fn(static function (YFlowData $data): YFlowData {
printf("* #%d - Job 4 : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result);
return new YFlowData($data->id, $data->number);
});
$ip = new Ip(new YFlowData(5, 5, 5));
$flow($ip);
$flow->await();*/

/*$driver = match (random_int(1, 1)) {
1 => new AmpDriver(),
2 => new FiberDriver(),
3 => new ReactDriver(),
Expand Down Expand Up @@ -65,7 +116,7 @@ function factorialGen(callable $func): Closure
function factorialYMemo(int $n): int
{
return Ymemo('factorialGen')($n);
}
}*/

/*
use Amp\Promise;
Expand Down Expand Up @@ -144,7 +195,7 @@ public static function Y($f) {

// factorialYMemo(6) . ' ' . factorialYMemo(5);

$factorialJob = static function (YFlowData $data): YFlowData {
/*$factorialJob = static function (YFlowData $data): YFlowData {
printf("*... #%d - Job 1 : Calculating factorial(%d)\n", $data->id, $data->number);
// raw factorial calculation
Expand Down Expand Up @@ -218,11 +269,9 @@ public static function Y($f) {
yield [$factorialYAsyncHandlerJobAfter];
}, ['driver' => $driver]);
$ipPool = new SplObjectStorage();

for ($i = 5; $i <= 5; $i++) {
$ip = new Ip(new YFlowData($i, $i));
$flow($ip);
}
$flow->await();
$flow->await();*/
6 changes: 5 additions & 1 deletion src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
call_user_func_array($event->getAsync(), $event->getArgs());
$args = array_merge([$event->getWrapper()], $event->getArgs());

call_user_func_array($event->getAsync(), $args);

// call_user_func_array($event->getAsync(), $event->getArgs())($event->getWrapper());
}
}
15 changes: 13 additions & 2 deletions src/AsyncHandler/YAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
$U = static fn (Closure $f) => $f($f);
$wrapper = static function ($job) use ($event) {
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

return $Y($event->getWrapper()($job));
};

$args = array_merge([$wrapper], $event->getArgs());

call_user_func_array($event->getAsync(), $args);

/*$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));
call_user_func_array($Y($event->getAsync()), $event->getArgs());
call_user_func_array($event->getAsync(), $event->getArgs())($Y($event->getWrapper()));*/
}
}
Loading

0 comments on commit f6d2a77

Please sign in to comment.