Skip to content

Commit

Permalink
✅ YFlow with AmpDriver pass
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 12, 2024
1 parent c07b269 commit 61d94f5
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 114 deletions.
26 changes: 15 additions & 11 deletions examples/ampYFlowDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@

require __DIR__ . '/../vendor/autoload.php';

use Amp\DeferredFuture;
use Amp\Future;
use Flow\AsyncHandler\AsyncHandler;
use Flow\AsyncHandler\YAsyncHandler;
use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Event;
use Flow\Examples\Data\YFlowData;
use Flow\Flow\Flow;
use Flow\Ip;
use Revolt\EventLoop;

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

$asyncFactorial = $Y(static function ($factorial) {
return static function ($args) use ($factorial): Future{
$factorialYJobDeferBefore = static function (YFlowData $data) {
printf("* #%d - Job : Calculating factorial(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$asyncFactorialDefer = $Y(static function ($factorial) {
return static function ($args) use ($factorial) {
[$data, $defer] = $args;

return $defer(static function ($complete, $async) use ($data, $defer, $factorial) {
if ($data->result <= 1) {
$complete([new YFlowData($data->id, $data->number, 1), $defer]);
Expand All @@ -35,8 +37,9 @@
};
});

$factorialYJobAfter = static function ($args): Future {
$factorialYJobDeferAfter = static function ($args) {
[$data, $defer] = $args;

return $defer(static function ($complete) use ($data, $defer) {
printf("* #%d - Job : Result for factorial(%d) = %d\n", $data->id, $data->number, $data->result);

Expand All @@ -46,8 +49,9 @@

$driver = new AmpDriver();

$flow = (new Flow($asyncFactorial, null, null, null, new YAsyncHandler(), $driver))
->fn(new Flow($factorialYJobAfter, null, null, null, new AsyncHandler(), $driver))
$flow = (new Flow($factorialYJobDeferBefore, null, null, null, null, $driver))
->fn(new Flow($asyncFactorialDefer, null, null, null, new DeferAsyncHandler(), $driver))
->fn(new Flow($factorialYJobDeferAfter, null, null, null, new DeferAsyncHandler(), $driver))
;

$ip = new Ip(new YFlowData(5, 5, 5));
Expand Down
12 changes: 5 additions & 7 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use Flow\Event;
use Flow\Event\AsyncEvent;

use function call_user_func_array;

final class AsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
Expand All @@ -21,10 +19,10 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
$args = array_merge([$event->getWrapper()], $event->getArgs());

call_user_func_array($event->getAsync(), $args);

// call_user_func_array($event->getAsync(), $event->getArgs())($event->getWrapper());
$ip = $event->getIp();
$async = $event->getAsync();
$asyncJob = $async($event->getJob());
$next = $asyncJob($ip->data);
$next($event->getCallback());
}
}
13 changes: 8 additions & 5 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

use function call_user_func_array;

final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

private AsyncHandlerInterface $asyncHandler;

public function __construct(
private int $batchSize = 10,
) {}
?AsyncHandlerInterface $asyncHandler = null,
) {
$this->asyncHandler = $asyncHandler ?? new AsyncHandler();
}

public static function getSubscribedEvents()
{
Expand All @@ -31,8 +34,8 @@ public static function getSubscribedEvents()

public function async(AsyncEvent $event): void
{
$ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) {
call_user_func_array($event->getAsync(), $event->getArgs());
$ack = new Acknowledger(get_debug_type($this), function (?Throwable $e = null, $event = null) {
$this->asyncHandler->async($event);
});

$this->handle($event, $ack);
Expand Down
31 changes: 31 additions & 0 deletions src/AsyncHandler/DeferAsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;

final class DeferAsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$ip = $event->getIp();
$job = $event->getJob();
$next = $job([$ip->data, $event->getDefer()]);
$next(static function ($result) use ($event) {
[$data] = $result;
$callback = $event->getCallback();
$callback($data);
});
}
}
41 changes: 0 additions & 41 deletions src/AsyncHandler/YAsyncHandler.php

This file was deleted.

6 changes: 5 additions & 1 deletion src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

namespace Flow;

use Flow\Event\AsyncEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

interface AsyncHandlerInterface extends EventSubscriberInterface {}
interface AsyncHandlerInterface extends EventSubscriberInterface
{
public function async(AsyncEvent $event): void;
}
77 changes: 47 additions & 30 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,48 +53,65 @@ public function async(Closure $callback): Closure
try {
return $callback(...$args, ...($args = []));
} catch (Throwable $exception) {
dd($exception);

return new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
}
}, $callback, $args);
};
}

public function defer(Closure $callback): mixed
{
$deferred = new DeferredFuture();

EventLoop::queue(static function () use ($callback, $deferred) {
try {
$callback(static function ($return) use ($deferred) {
$deferred->complete($return);
}, static function ($callback, $next) {
$callback($next);
});
} catch (Throwable $exception) {
$deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
});

return $deferred->getFuture();
}

public function await(array &$stream): void
{
$loop = function () use (&$loop, &$stream) {
$async = function (Closure $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

if ($data === null) {
$future = $async();
} else {
$future = $async($data);
}

return static function ($map) use ($future) {
$future->map($map);
};
};
};

$defer = function (Closure $job) {
return function ($map) use ($job) {
$future = $this->defer($job);
$future->map($map);
};
};

$loop = function () use (&$loop, &$stream, $async, $defer) {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$defer = function(Closure $job): Future
{
$deferred = new DeferredFuture();

// Queue the operation to be executed in the event loop
EventLoop::queue(static function () use ($job, $deferred) {
try {
$job(static function ($return) use ($deferred) {
$deferred->complete($return);
}, static function (Future $future, $next) {
$future->map($next);
});
} catch (Throwable $exception) {
dd($exception);
$deferred->complete(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
});

return $deferred->getFuture();
};

$async = $stream['fnFlows'][$index]['job'];
$future = $async([$nextIp->data, $defer]);

$future->map(static function ($args) use (&$stream, $index, $nextIp) {
[$data, $defer] = $args;
$job = $stream['fnFlows'][$index]['job'];

$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
Expand All @@ -105,7 +122,7 @@ public function await(array &$stream): void

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
});
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down
2 changes: 2 additions & 0 deletions src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ interface DriverInterface
*/
public function async(Closure $callback): Closure;

public function defer(Closure $callback): mixed;

/**
* @param array{'ips': int, 'fnFlows': array<mixed>, 'dispatchers': array<mixed>} $stream
*/
Expand Down
41 changes: 22 additions & 19 deletions src/Event/AsyncEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,41 @@
namespace Flow\Event;

use Closure;
use Flow\Ip;
use Symfony\Contracts\EventDispatcher\Event;

final class AsyncEvent extends Event
{
/**
* @var array<mixed>
*/
private array $args;

/**
* @param array<mixed> $args
*/
public function __construct(private Closure $async, private Closure $wrapper, ...$args)
{
$this->args = $args;
}
public function __construct(
private Closure $async,
private Closure $defer,
private Closure $job,
private Ip $ip,
private Closure $callback
) {}

public function getAsync(): Closure
{
return $this->async;
}

public function getWrapper(): Closure
public function getDefer(): Closure
{
return $this->defer;
}

public function getJob(): Closure
{
return $this->job;
}

public function getIp(): Ip
{
return $this->wrapper;
return $this->ip;
}

/**
* @return array<mixed>
*/
public function getArgs(): array
public function getCallback(): Closure
{
return $this->args;
return $this->callback;
}
}

0 comments on commit 61d94f5

Please sign in to comment.