Skip to content

Commit

Permalink
🚧 Update FiberDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Ledru committed Aug 14, 2024
1 parent 49b86c2 commit d2c0b80
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 39 deletions.
64 changes: 36 additions & 28 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,37 @@ public function defer(Closure $callback): mixed

public function await(array &$stream): void
{
$async = function ($ip, $index, $isTick) use (&$fiberDatas) {
return function (Closure $job) use (&$fiberDatas, $ip, $index, $isTick) {
return function (mixed $data) use (&$fiberDatas, $ip, $index, $isTick, $job) {
$async = function ($isTick) use (&$fiberDatas) {
return function (Closure $job) use (&$fiberDatas, $isTick) {
return function (mixed $data) use (&$fiberDatas, $isTick, $job) {
$async = $this->async($job);

$fiber = $async($data);
$fiber->start();

$next = static function ($return) {};

$fiberDatas[] = [
'fiber' => $fiber,
'ip' => $ip,
'index' => $index,
'isTick' => $isTick,
'next' => static function ($return) use (&$next) {
$next($return);
},
];

return static function () {};
return static function (Closure $callback) use ($isTick, &$next) {
if ($isTick === false) {
$next = static function ($return) use ($callback) {
$callback($return);
};
}
};
};
};
};

$defer = static function ($ip, $index, $isTick) use ($async) {
return static function (Closure $job) use ($async, $ip, $index, $isTick) {
$asyncJob = $async($ip, $index, $isTick);
$defer = static function ($isTick) use ($async) {
return static function (Closure $job) use ($async, $isTick) {
$asyncJob = $async($isTick);

return $asyncJob($job);
};
Expand All @@ -105,8 +113,7 @@ public function await(array &$stream): void
]) {
if ($tick % $interval === 0) {
$ip = new Ip();
$stream['ips']++;
$async($ip, 0, true)($callback)($ip->data);
$async(true)($callback)($ip->data);
}
}

Expand All @@ -115,11 +122,22 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async, $nextIp, $index) {
return $async($nextIp, $index, false)($job);
}, static function (Closure $job) use ($defer, $nextIp, $index) {
return $defer($nextIp, $index, false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function () {}), Event::ASYNC);
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) {
return $async(false)($job);
}, static function (Closure $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand All @@ -129,17 +147,7 @@ public function await(array &$stream): void
$fiberData['fiber']->resume();
} else {
$data = $fiberData['fiber']->getReturn();

if ($data instanceof RuntimeException and array_key_exists($fiberData['index'], $stream['fnFlows']) and $stream['fnFlows'][$fiberData['index']]['errorJob'] !== null) {
$stream['fnFlows'][$fiberData['index']]['errorJob']($data);
} elseif ($fiberData['isTick'] === false and array_key_exists($fiberData['index'] + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), Event::POP);
$stream['ips']--;
$fiberData['next']($data);
unset($fiberDatas[$i]);
}
}
Expand Down
28 changes: 17 additions & 11 deletions tests/Flow/FlowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\AsyncHandler\BatchAsyncHandler;
use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ReactDriver;
use Flow\DriverInterface;
use Flow\ExceptionInterface;
Expand Down Expand Up @@ -145,22 +146,27 @@ public static function provideJobCases(): iterable
$exception = new RuntimeException('job error');

return self::matrix(static function (DriverInterface $driver, $strategyBuilder) use ($exception) {
$cases = [
'job' => [[[static function (ArrayObject $data) {
$data['number'] = 5;
$cases = [];

return $data;
}, $strategyBuilder(), new AsyncHandler()]], 5],
'asyncJob' => [[[static function (ArrayObject $data) use ($driver) {
$cases['job'] = [[[static function (ArrayObject $data) {
$data['number'] = 5;

return $data;
}, $strategyBuilder(), new AsyncHandler()]], 5];

$strategy = $strategyBuilder();
if(!$driver instanceof FiberDriver && !$strategy instanceof MaxIpStrategy) {
$cases['asyncJob'] = [[[static function (ArrayObject $data) use ($driver) {
$driver->delay(1 / 1000);
$data['number'] = 5;

return $data;
}, $strategyBuilder(), new AsyncHandler()]], 5],
'exceptionJob' => [[[static function () use ($exception) {
throw $exception;
}, $strategyBuilder(), new AsyncHandler()]], 0],
];
}, $strategy, new AsyncHandler()]], 5];
};

$cases['exceptionJob'] = [[[static function () use ($exception) {
throw $exception;
}, $strategyBuilder(), new AsyncHandler()]], 0];

if ($driver instanceof AmpDriver || $driver instanceof ReactDriver) {
$cases['deferJob'] = [[[static function ($args) {
Expand Down

0 comments on commit d2c0b80

Please sign in to comment.