Skip to content

Commit

Permalink
✨ Add Batch IP
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 6, 2024
1 parent 2a2136c commit 4050906
Show file tree
Hide file tree
Showing 30 changed files with 2,846 additions and 1,788 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v1.2.x

- Add Batch IP from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch

## v1.2.0

- Add event system for processing IpStrategy
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
],
"require": {
"php": ">=8.3",
"symfony/event-dispatcher": "^7.0"
"symfony/event-dispatcher": "^7.0",
"symfony/messenger": "^7.0"
},
"require-dev": {
"amphp/amp": "^3.0",
"openswoole/ide-helper": "^22.1.5",
"react/async": "^4.2",
"spatie/async": "^1.6",
"symfony/doctrine-messenger": "^7.0",
"symfony/messenger": "^7.0",
"symfony/orm-pack": "^2.4"
},
"suggest": {
Expand Down
30 changes: 30 additions & 0 deletions docs/src/content/en/docs/getting-started/async-handler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "Async Handler"
description: "Async Handler."
lead: "Async Handler."
date: 2020-10-13T15:21:01+02:00
lastmod: 2020-10-13T15:21:01+02:00
draft: false
images: []
menu:
docs:
parent: "getting-started"
weight: 35
toc: true
---

# Async Handler

When processing Flow at async step, you can choose a handler that will process asynchronously the Ip.

## AsyncHandler

This is the default one. Ip is async processed immediately.

## BatchAsyncHandler

This async process Ip as batch capability : the handler will wait for a certain amount of async messages ($batchSize) to be processed before pushing them.

## Make your Async Handler

You can make your custom Ip strategy by implementing `Flow\AsyncHandlerInterface`
26 changes: 26 additions & 0 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;

use function call_user_func_array;

final class AsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
call_user_func_array($event->getAsync(), $event->getArgs());
}
}
68 changes: 68 additions & 0 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

use function call_user_func_array;

final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

public function __construct(
private int $batchSize = 10,
) {}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) {
call_user_func_array($event->getAsync(), $event->getArgs());
});

$this->handle($event, $ack);
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs
*
* @phpstan-ignore method.unused
*/
private function process(array $jobs): void
{
foreach ($jobs as [$event, $ack]) {
$ack->ack($event);
}
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @phpstan-ignore method.unused
*/
private function getBatchSize(): int
{
return $this->batchSize;
}
}
9 changes: 9 additions & 0 deletions src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;

interface AsyncHandlerInterface extends EventSubscriberInterface {}
41 changes: 21 additions & 20 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand All @@ -34,7 +35,7 @@ class AmpDriver implements DriverInterface

public function __construct(?Driver $driver = null)
{
if (!function_exists('Amp\\async')) {
if (!function_exists('Amp\async')) {
throw new NativeRuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}

Expand All @@ -58,36 +59,36 @@ public function async(Closure $callback): Closure

public function await(array &$stream): void
{
$async = function ($ip, $fnFlows, $index) {
$async = function ($ip, $fnFlows, $index, $map) {
$async = $this->async($fnFlows[$index]['job']);

if ($ip->data === null) {
return $async();
$future = $async();
} else {
$future = $async($ip->data);
}

return $async($ip->data);
$future->map($map);
};

$loop = function () use (&$loop, &$stream, $async) {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
->map(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['ips']--;
})
;
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down
18 changes: 9 additions & 9 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use Closure;
use Fiber;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Throwable;

use function array_key_exists;
Expand All @@ -40,7 +41,7 @@ public function async(Closure $callback): Closure

public function await(array &$stream): void
{
$async = static function ($ip, $fnFlows, $index, $isTick) {
$async = static function ($ip, $fnFlows, $index, $isTick) use (&$fiberDatas) {
$fiber = new Fiber($fnFlows[$index]['job']);

$exception = null;
Expand All @@ -55,7 +56,7 @@ public function await(array &$stream): void
$exception = $fiberException;
}

return [
$fiberDatas[] = [
'index' => $index,
'fiber' => $fiber,
'exception' => $exception,
Expand All @@ -74,16 +75,16 @@ public function await(array &$stream): void
if ($tick % $interval === 0) {
$ip = new Ip();
$stream['ips']++;
$fiberDatas[] = $async($ip, [['job' => $callback]], 0, true);
$async($ip, [['job' => $callback]], 0, true);
}
}

$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$fiberDatas[] = $async($nextIp, $stream['fnFlows'], $index, false);
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, false), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand All @@ -102,15 +103,14 @@ public function await(array &$stream): void
if ($fiberData['isTick'] === false and array_key_exists($fiberData['index'] + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$fiberDatas[] = $async($ip, $stream['fnFlows'], $fiberData['index'] + 1, false);
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}
} elseif (array_key_exists($fiberData['index'], $stream['fnFlows']) and $stream['fnFlows'][$fiberData['index']]['errorJob'] !== null) {
$stream['fnFlows'][$fiberData['index']]['errorJob'](
new RuntimeException($fiberData['exception']->getMessage(), $fiberData['exception']->getCode(), $fiberData['exception'])
);
}
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), IpStrategyEvent::POP);
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), Event::POP);
$stream['ips']--;
unset($fiberDatas[$i]);
}
Expand Down
41 changes: 21 additions & 20 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use RuntimeException as NativeRuntimeException;
Expand All @@ -36,7 +37,7 @@ class ReactDriver implements DriverInterface

public function __construct(?LoopInterface $eventLoop = null)
{
if (!function_exists('React\\Async\\async')) {
if (!function_exists('React\Async\async')) {
throw new NativeRuntimeException('ReactPHP is not loaded. Suggest install it with composer require react/event-loop');
}

Expand All @@ -58,36 +59,36 @@ public function async(Closure $callback): Closure

public function await(array &$stream): void
{
$async = function ($ip, $fnFlows, $index) {
$async = function ($ip, $fnFlows, $index, $then) {
$async = $this->async($fnFlows[$index]['job']);

if ($ip->data === null) {
return $async();
$promise = $async();
} else {
$promise = $async($ip->data);
}

return $async($ip->data);
$promise->then($then);
};

$loop = function () use (&$loop, &$stream, $async) {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
->then(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['ips']--;
})
;
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down
Loading

0 comments on commit 4050906

Please sign in to comment.