Skip to content

Commit

Permalink
✨ Add Batch IP
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 6, 2024
1 parent 2a2136c commit 63cd11d
Show file tree
Hide file tree
Showing 29 changed files with 2,738 additions and 1,752 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v1.2.x

- Add Batch IP from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch

## v1.2.0

- Add event system for processing IpStrategy
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
],
"require": {
"php": ">=8.3",
"symfony/event-dispatcher": "^7.0"
"symfony/event-dispatcher": "^7.0",
"symfony/messenger": "^7.0"
},
"require-dev": {
"amphp/amp": "^3.0",
"openswoole/ide-helper": "^22.1.5",
"react/async": "^4.2",
"spatie/async": "^1.6",
"symfony/doctrine-messenger": "^7.0",
"symfony/messenger": "^7.0",
"symfony/orm-pack": "^2.4"
},
"suggest": {
Expand Down
26 changes: 26 additions & 0 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;

use function call_user_func_array;

final class AsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$event->setReturn(call_user_func_array($event->getAsync(), $event->getArgs()));
}
}
68 changes: 68 additions & 0 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

use function call_user_func_array;

final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

public function __construct(
private int $batchSize = 10,
) {}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) {
$event->setReturn(call_user_func_array($event->getAsync(), $event->getArgs()));
});

$this->handle($event, $ack);
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs
*
* @phpstan-ignore method.unused
*/
private function process(array $jobs): void
{
foreach ($jobs as [$event, $ack]) {
$ack->ack($event);
}
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @phpstan-ignore method.unused
*/
private function getBatchSize(): int
{
return $this->batchSize;
}
}
9 changes: 9 additions & 0 deletions src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;

interface AsyncHandlerInterface extends EventSubscriberInterface {}
13 changes: 7 additions & 6 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand All @@ -34,7 +35,7 @@ class AmpDriver implements DriverInterface

public function __construct(?Driver $driver = null)
{
if (!function_exists('Amp\\async')) {
if (!function_exists('Amp\async')) {
throw new NativeRuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}

Expand Down Expand Up @@ -72,19 +73,19 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index), Event::ASYNC)->getReturn()
->map(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
})
;
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use Closure;
use Fiber;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Throwable;

use function array_key_exists;
Expand Down Expand Up @@ -81,9 +82,9 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$fiberDatas[] = $async($nextIp, $stream['fnFlows'], $index, false);
$fiberDatas[] = $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, false), Event::ASYNC)->getReturn();
}
}
} while ($nextIp !== null);
Expand All @@ -102,15 +103,15 @@ public function await(array &$stream): void
if ($fiberData['isTick'] === false and array_key_exists($fiberData['index'] + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$fiberDatas[] = $async($ip, $stream['fnFlows'], $fiberData['index'] + 1, false);
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), Event::PUSH);
$fiberDatas[] = $stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new AsyncEvent($async, $ip, $stream['fnFlows'], $fiberData['index'] + 1, false), Event::ASYNC)->getReturn();
}
} elseif (array_key_exists($fiberData['index'], $stream['fnFlows']) and $stream['fnFlows'][$fiberData['index']]['errorJob'] !== null) {
$stream['fnFlows'][$fiberData['index']]['errorJob'](
new RuntimeException($fiberData['exception']->getMessage(), $fiberData['exception']->getCode(), $fiberData['exception'])
);
}
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), IpStrategyEvent::POP);
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), Event::POP);
$stream['ips']--;
unset($fiberDatas[$i]);
}
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use RuntimeException as NativeRuntimeException;
Expand All @@ -36,7 +37,7 @@ class ReactDriver implements DriverInterface

public function __construct(?LoopInterface $eventLoop = null)
{
if (!function_exists('React\\Async\\async')) {
if (!function_exists('React\Async\async')) {
throw new NativeRuntimeException('ReactPHP is not loaded. Suggest install it with composer require react/event-loop');
}

Expand Down Expand Up @@ -72,19 +73,19 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index), Event::ASYNC)->getReturn()
->then(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
})
;
Expand Down
15 changes: 8 additions & 7 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;
Expand All @@ -34,7 +35,7 @@ class SpatieDriver implements DriverInterface

public function __construct()
{
if (!class_exists('Spatie\\Async\\Pool')) {
if (!class_exists('Spatie\Async\Pool')) {
throw new NativeRuntimeException('Spatie Async is not loaded. Suggest install it with composer require spatie/async');
}

Expand Down Expand Up @@ -75,20 +76,20 @@ public function await(array &$stream): void
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
});
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
use Closure;
use co;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use OpenSwoole\Timer;
use RuntimeException as NativeRuntimeException;
use Throwable;
Expand Down Expand Up @@ -70,20 +71,20 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
});
}), Event::ASYNC);
}
}
co::sleep(1);
Expand Down
Loading

0 comments on commit 63cd11d

Please sign in to comment.