Skip to content

Commit

Permalink
✨ Add Batch IP
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 6, 2024
1 parent 2a2136c commit e4a0031
Show file tree
Hide file tree
Showing 29 changed files with 2,722 additions and 1,752 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v1.2.1

- Add Batch IP from
- https://speakerdeck.com/alli83/symfony-messenger-et-ses-messages-a-la-queleuleu-dot-dot-dot-et-sil-etait-temps-de-grouper
- https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1
- https://github.com/wazum/symfony-messenger-batch

## v1.2.0

- Add event system for processing IpStrategy
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
],
"require": {
"php": ">=8.3",
"symfony/event-dispatcher": "^7.0"
"symfony/event-dispatcher": "^7.0",
"symfony/messenger": "^7.0"
},
"require-dev": {
"amphp/amp": "^3.0",
"openswoole/ide-helper": "^22.1.5",
"react/async": "^4.2",
"spatie/async": "^1.6",
"symfony/doctrine-messenger": "^7.0",
"symfony/messenger": "^7.0",
"symfony/orm-pack": "^2.4"
},
"suggest": {
Expand Down
26 changes: 26 additions & 0 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;

use function call_user_func_array;

final class AsyncHandler implements AsyncHandlerInterface
{
public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$event->setReturn(call_user_func_array($event->getAsync(), $event->getArgs()));
}
}
52 changes: 52 additions & 0 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

declare(strict_types=1);

namespace Flow\AsyncHandler;

use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

use function call_user_func_array;

final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

public function __construct(

Check failure on line 21 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Method Flow\AsyncHandler\BatchAsyncHandler::__construct() has parameter $batchSize with no type specified.
private $batchSize = 10,
) {}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
];
}

public function async(AsyncEvent $event): void
{
$ack = new Acknowledger(get_debug_type($this), static function (?Throwable $e = null, $event = null) {
$event->setReturn(call_user_func_array($event->getAsync(), $event->getArgs()));
});

$this->handle($event, $ack);
}

private function process(array $jobs): void

Check failure on line 41 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Method Flow\AsyncHandler\BatchAsyncHandler::process() has parameter $jobs with no value type specified in iterable type array.

Check failure on line 41 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Method Flow\AsyncHandler\BatchAsyncHandler::process() is unused.
{
foreach ($jobs as [$event, $ack]) {
$ack->ack($event);
}
}

private function getBatchSize(): int

Check failure on line 48 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Method Flow\AsyncHandler\BatchAsyncHandler::getBatchSize() is unused.
{
return $this->batchSize;
}
}
9 changes: 9 additions & 0 deletions src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;

interface AsyncHandlerInterface extends EventSubscriberInterface {}
13 changes: 7 additions & 6 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand All @@ -34,7 +35,7 @@ class AmpDriver implements DriverInterface

public function __construct(?Driver $driver = null)
{
if (!function_exists('Amp\\async')) {
if (!function_exists('Amp\async')) {
throw new NativeRuntimeException('Amp is not loaded. Suggest install it with composer require amphp/amp');
}

Expand Down Expand Up @@ -72,19 +73,19 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index), Event::ASYNC)->getReturn()
->map(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
})
;
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
use Closure;
use Fiber;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Throwable;

use function array_key_exists;
Expand Down Expand Up @@ -81,9 +82,9 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$fiberDatas[] = $async($nextIp, $stream['fnFlows'], $index, false);
$fiberDatas[] = $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, false), Event::ASYNC)->getReturn();
}
}
} while ($nextIp !== null);
Expand All @@ -102,15 +103,15 @@ public function await(array &$stream): void
if ($fiberData['isTick'] === false and array_key_exists($fiberData['index'] + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$fiberDatas[] = $async($ip, $stream['fnFlows'], $fiberData['index'] + 1, false);
$stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new PushEvent($ip), Event::PUSH);
$fiberDatas[] = $stream['dispatchers'][$fiberData['index'] + 1]->dispatch(new AsyncEvent($async, $ip, $stream['fnFlows'], $fiberData['index'] + 1, false), Event::ASYNC)->getReturn();
}
} elseif (array_key_exists($fiberData['index'], $stream['fnFlows']) and $stream['fnFlows'][$fiberData['index']]['errorJob'] !== null) {
$stream['fnFlows'][$fiberData['index']]['errorJob'](
new RuntimeException($fiberData['exception']->getMessage(), $fiberData['exception']->getCode(), $fiberData['exception'])
);
}
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), IpStrategyEvent::POP);
$stream['dispatchers'][$fiberData['index']]->dispatch(new PopEvent($fiberData['ip']), Event::POP);
$stream['ips']--;
unset($fiberDatas[$i]);
}
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use RuntimeException as NativeRuntimeException;
Expand All @@ -36,7 +37,7 @@ class ReactDriver implements DriverInterface

public function __construct(?LoopInterface $eventLoop = null)
{
if (!function_exists('React\\Async\\async')) {
if (!function_exists('React\Async\async')) {
throw new NativeRuntimeException('ReactPHP is not loaded. Suggest install it with composer require react/event-loop');
}

Expand Down Expand Up @@ -72,19 +73,19 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index)
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index), Event::ASYNC)->getReturn()
->then(static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
})
;
Expand Down
15 changes: 8 additions & 7 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

use Closure;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;
Expand All @@ -34,7 +35,7 @@ class SpatieDriver implements DriverInterface

public function __construct()
{
if (!class_exists('Spatie\\Async\\Pool')) {
if (!class_exists('Spatie\Async\Pool')) {
throw new NativeRuntimeException('Spatie Async is not loaded. Suggest install it with composer require spatie/async');
}

Expand Down Expand Up @@ -75,20 +76,20 @@ public function await(array &$stream): void
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
});
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
Expand Down
13 changes: 7 additions & 6 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
use Closure;
use co;
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\IpStrategyEvent;
use OpenSwoole\Timer;
use RuntimeException as NativeRuntimeException;
use Throwable;
Expand Down Expand Up @@ -70,20 +71,20 @@ public function await(array &$stream): void
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$async($nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $nextIp, $stream['fnFlows'], $index, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), IpStrategyEvent::POP);
$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
});
}), Event::ASYNC);
}
}
co::sleep(1);
Expand Down
17 changes: 13 additions & 4 deletions src/IpStrategyEvent.php → src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow;

final class IpStrategyEvent
final class Event
{
/**
* The PUSH event occurs at the very beginning of Flow dispatching before any async process execution.
Expand All @@ -13,7 +13,16 @@ final class IpStrategyEvent
*
* @Event("Flow\Event\PushEvent")
*/
public const PUSH = 'ip_strategy.push';
public const PUSH = 'push';

/**
* The ASYNC event occurs when Flow dispatch async process execution.
*
* This event allows you to process async for an IP in the Flow execution.
*
* @Event("Flow\Event\AsyncEvent")
*/
public const ASYNC = 'async';

/**
* The PULL event occurs when Flow need a next IP to async process.
Expand All @@ -22,7 +31,7 @@ final class IpStrategyEvent
*
* @Event("Flow\Event\PullEvent")
*/
public const PULL = 'ip_strategy.pull';
public const PULL = 'pull';

/**
* The POP event occurs when Flow finish async process of an IP.
Expand All @@ -31,5 +40,5 @@ final class IpStrategyEvent
*
* @Event("Flow\Event\PopEvent")
*/
public const POP = 'ip_strategy.pop';
public const POP = 'pop';
}
Loading

0 comments on commit e4a0031

Please sign in to comment.