Skip to content

Commit

Permalink
🚧 Update FiberDriver for YFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 12, 2024
1 parent d87d723 commit 6e35db5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
4 changes: 2 additions & 2 deletions examples/ampYFlow.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
require __DIR__ . '/../vendor/autoload.php';

use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\ReactDriver;
use Flow\Driver\FiberDriver;
use Flow\Examples\Data\YFlowData;
use Flow\Flow\Flow;
use Flow\Ip;
Expand Down Expand Up @@ -47,7 +47,7 @@
});
};

$driver = new ReactDriver();
$driver = new FiberDriver();

$flow = (new Flow($factorialYJobDeferBefore, null, null, null, null, $driver))
->fn(new Flow($asyncFactorialDefer, null, null, null, new DeferAsyncHandler(), $driver))
Expand Down
55 changes: 34 additions & 21 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,43 @@ public function async(Closure $callback): Closure
return static function () {};
}

public function defer(Closure $callback): mixed
{
return null;
}

public function await(array &$stream): void
{
$async = static function ($ip, $fnFlows, $index, $isTick) use (&$fiberDatas) {
$fiber = new Fiber($fnFlows[$index]['job']);
$async = static function ($ip, $index, $isTick) use (&$fiberDatas) {
return static function (Closure $job) use (&$fiberDatas, $ip, $index, $isTick) {
return static function (mixed $data) use (&$fiberDatas, $ip, $index, $isTick, $job) {
$fiber = new Fiber($job);

$exception = null;
$exception = null;

try {
if ($ip->data === null) {
$fiber->start();
} else {
$fiber->start($ip->data);
}
} catch (Throwable $fiberException) {
$exception = $fiberException;
}
try {
if ($data === null) {
$fiber->start();
} else {
$fiber->start($data);
}
} catch (Throwable $fiberException) {
$exception = $fiberException;
}

$fiberDatas[] = [
'index' => $index,
'fiber' => $fiber,
'exception' => $exception,
'ip' => $ip,
'isTick' => $isTick,
];
$fiberDatas[] = [
'fiber' => $fiber,
'exception' => $exception,
'ip' => $ip,
'index' => $index,
'isTick' => $isTick,
];
};
};
};

$defer = static function (Closure $job) {};

$tick = 0;
$fiberDatas = [];
while ($stream['ips'] > 0 or count($this->ticks) > 0) {
Expand All @@ -75,7 +86,7 @@ public function await(array &$stream): void
if ($tick % $interval === 0) {
$ip = new Ip();
$stream['ips']++;
$async($ip, [['job' => $callback]], 0, true);
$async($ip, 0, true)($callback)($ip->data);
}
}

Expand All @@ -84,7 +95,9 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, false), Event::ASYNC);
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async, $nextIp, $index) {
return $async($nextIp, $index, false)($job);
}, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function () {}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down

0 comments on commit 6e35db5

Please sign in to comment.