Skip to content

Commit

Permalink
📝 Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 12, 2024
1 parent 214a658 commit cb634e8
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 13 deletions.
14 changes: 9 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

## v1.2.x

- Add Batch IP from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch
- YFlow rework
- Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP
- Add Flow\AsyncHandler\AsyncHandler the default handler when Event::SYNC is dispatched
- Add Batch IP with Flow\AsyncHandler\BatchAsyncHandler from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch
- Add Flow\AsyncHandler\DeferAsyncHandler to gain granular control on the async Event::SYNC step event
- Flow\Flow\YFlow rework
- Add more exemples in `examples/yflow.php` to play with Y-Combinators
- Update DX for Flow\DriverInterface : add `defer` to gain much granular control on asynchronous callbacks

## v1.2.0
Expand Down
6 changes: 6 additions & 0 deletions docs/src/content/en/docs/getting-started/async-handler.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ This is the default one. Ip is async processed immediately.

This async process Ip as batch capability : the handler will wait for a certain amount of async messages ($batchSize) to be processed before pushing them.

## DeferAsyncHandler

This async process Ip to offer defer capability : the handler will pass [$data, $defer] as entry for the job. In that case, the job can fine control the async process. $defer is a callable that embark two callbacks
- an complete callback to store result
- an async callback to go to the next async call.

## Make your Async Handler

You can make your custom Ip strategy by implementing `Flow\AsyncHandlerInterface`
2 changes: 1 addition & 1 deletion examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;

$driver = match (random_int(1, 4)) {
$driver = match (random_int(4, 4)) {
1 => new AmpDriver(),

Check failure on line 22 in examples/flow.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Match arm comparison between 4 and 1 is always false.
2 => new FiberDriver(),

Check failure on line 23 in examples/flow.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Match arm comparison between 4 and 2 is always false.
3 => new ReactDriver(),

Check failure on line 24 in examples/flow.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Match arm comparison between 4 and 3 is always false.
Expand Down
30 changes: 23 additions & 7 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,32 @@ public function async(Closure $callback): Closure
};
}

public function defer(Closure $callback): mixed
{
return null;
}

public function await(array &$stream): void
{
$async = function ($ip, $fnFlows, $index, $onResolve) {
$async = $this->async($fnFlows[$index]['job']);
$async = function (Closure $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

$next = fn($value) => $value;
if ($data === null) {
$async($next)();
} else {
$async($next)($data);
}

if ($ip->data === null) {
return $async($onResolve)();
}
return static function($onResolve) use ($next) {
$next($onResolve);
};
};
};

return $async($onResolve)($ip->data);
$defer = function (Closure $job) use ($async) {
return $async($job);
};

$nextIp = null;
Expand All @@ -78,7 +94,7 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
Expand Down
5 changes: 5 additions & 0 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public function async(Closure $callback): Closure
};
}

public function defer(Closure $callback): mixed
{
return null;
}

public function await(array &$stream): void
{
$async = function ($ip, $fnFlows, $index, $onResolve) {
Expand Down

0 comments on commit cb634e8

Please sign in to comment.