Skip to content

Commit

Permalink
✅ YFlow with FiberDriver pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Ledru committed Aug 14, 2024
1 parent d2c0b80 commit 8287371
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
4 changes: 2 additions & 2 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use Flow\Flow\YFlow;
use Flow\Ip;

$driver = match (random_int(1, 2)) {
$driver = match (random_int(1, 3)) {
1 => new AmpDriver(),
2 => new ReactDriver(),
// 3 => new FiberDriver(),
3 => new FiberDriver(),
// 4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
};
Expand Down
22 changes: 18 additions & 4 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,25 @@ public function await(array &$stream): void
};
};

$defer = static function ($isTick) use ($async) {
return static function (Closure $job) use ($async, $isTick) {
$asyncJob = $async($isTick);
$defer = static function ($isTick) {
return static function (Closure $job) use ($isTick) {
return static function (Closure $next) use ($isTick, $job) {
$fiber = new Fiber(static function () use ($isTick, $job, $next) {
try {
$job(static function ($return) use ($isTick, $next) {
if ($isTick === false) {
$next($return);
}
}, static function ($fn, $next) {
$fn($next);
});
} catch (Throwable $exception) {
Fiber::suspend(new RuntimeException($exception->getMessage(), $exception->getCode(), $exception));
}
});

return $asyncJob($job);
$fiber->start();
};
};
};

Expand Down
4 changes: 2 additions & 2 deletions tests/Flow/FlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ public static function provideJobCases(): iterable
}, $strategyBuilder(), new AsyncHandler()]], 5];

$strategy = $strategyBuilder();
if(!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) {
if (!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) {
$cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 5;

return $data;
}, $strategy, new AsyncHandler()]], 5];
};
}

$cases['exceptionJob'] = [[[static function () use ($exception) {
throw $exception;
Expand Down

0 comments on commit 8287371

Please sign in to comment.