diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b17280..dde900f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.2.x + +- Add event Event::POOL occurs when Flow need to count IPs to process. + ## v1.2.2 - Flow can now use `Flow\JobInterface` as job input diff --git a/examples/flow.php b/examples/flow.php index 3a4ec3c..63fb476 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -35,7 +35,7 @@ $job1 = static function (DataA $dataA) use ($driver): DataB { printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b); - // simulating calculating some "light" operation from 0.1 to 1 seconds + // simulating calculating some "light" operation from 1 to 3 seconds $delay = random_int(1, 3); $driver->delay($delay); $d = $dataA->a + $dataA->b; diff --git a/src/AsyncHandler/AsyncHandler.php b/src/AsyncHandler/AsyncHandler.php index c81c5cc..264d391 100644 --- a/src/AsyncHandler/AsyncHandler.php +++ b/src/AsyncHandler/AsyncHandler.php @@ -7,6 +7,8 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; /** * @template T @@ -15,10 +17,21 @@ */ final class AsyncHandler implements AsyncHandlerInterface { + /** + * @var IpPool + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -27,7 +40,18 @@ public function async(AsyncEvent $event): void $ip = $event->getIp(); $async = $event->getAsync(); $asyncJob = $async($event->getJob()); + + $popIp = $this->ipPool->addIp($ip); + $next = $asyncJob($ip->data); - $next($event->getCallback()); + $next(static function ($data) use ($event, $popIp) { + $event->getCallback()($data); + $popIp(); + }); + } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); } } diff --git a/src/AsyncHandler/BatchAsyncHandler.php b/src/AsyncHandler/BatchAsyncHandler.php index 5462508..9f67467 100644 --- a/src/AsyncHandler/BatchAsyncHandler.php +++ b/src/AsyncHandler/BatchAsyncHandler.php @@ -7,28 +7,28 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; use Symfony\Component\Messenger\Handler\BatchHandlerTrait; use Throwable; /** - * @template T1 - * @template T2 + * @template T * - * @implements AsyncHandlerInterface + * @implements AsyncHandlerInterface */ final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface { use BatchHandlerTrait; /** - * @var AsyncHandlerInterface + * @var AsyncHandlerInterface */ private AsyncHandlerInterface $asyncHandler; /** - * @param null|AsyncHandlerInterface $asyncHandler + * @param null|AsyncHandlerInterface $asyncHandler */ public function __construct( private int $batchSize = 10, @@ -41,6 +41,7 @@ public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -53,12 +54,17 @@ public function async(AsyncEvent $event): void $this->handle($event, $ack); } + public function pool(PoolEvent $event): void + { + $this->asyncHandler->pool($event); + } + /** * PHPStan should normaly pass for method.unused * https://github.com/phpstan/phpstan/issues/6039 * https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c. * - * @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs + * @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs * * @phpstan-ignore method.unused */ diff --git a/src/AsyncHandler/DeferAsyncHandler.php b/src/AsyncHandler/DeferAsyncHandler.php index b36727a..3c0f1d9 100644 --- a/src/AsyncHandler/DeferAsyncHandler.php +++ b/src/AsyncHandler/DeferAsyncHandler.php @@ -7,6 +7,8 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; /** * @template T @@ -15,10 +17,21 @@ */ final class DeferAsyncHandler implements AsyncHandlerInterface { + /** + * @var IpPool + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -26,11 +39,19 @@ public function async(AsyncEvent $event): void { $ip = $event->getIp(); $job = $event->getJob(); + + $popIp = $this->ipPool->addIp($ip); $next = $job([$ip->data, $event->getDefer()]); - $next(static function ($result) use ($event) { + $next(static function ($result) use ($event, $popIp) { [$data] = $result; $callback = $event->getCallback(); $callback($data); + $popIp(); }); } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); + } } diff --git a/src/AsyncHandlerInterface.php b/src/AsyncHandlerInterface.php index c66ce8d..01a6a1f 100644 --- a/src/AsyncHandlerInterface.php +++ b/src/AsyncHandlerInterface.php @@ -5,6 +5,7 @@ namespace Flow; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** @@ -16,4 +17,9 @@ interface AsyncHandlerInterface extends EventSubscriberInterface * @param AsyncEvent $event */ public function async(AsyncEvent $event): void; + + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; } diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index c4b95b4..89d732b 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -34,6 +34,8 @@ */ class AmpDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct(?Driver $driver = null) @@ -121,18 +123,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { EventLoop::defer($loop); } else { EventLoop::getDriver()->stop(); diff --git a/src/Driver/DriverTrait.php b/src/Driver/DriverTrait.php new file mode 100644 index 0000000..ee4c223 --- /dev/null +++ b/src/Driver/DriverTrait.php @@ -0,0 +1,27 @@ + $dispatchers + */ + public function countIps(array $dispatchers): int + { + $count = 0; + foreach ($dispatchers as $dispatcher) { + $count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps()); + } + + return $count; + } +} diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 451297f..1706ba0 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -30,6 +30,8 @@ */ class FiberDriver implements DriverInterface { + use DriverTrait; + /** * @var array */ @@ -121,7 +123,7 @@ public function await(array &$stream): void $tick = 0; $fiberDatas = []; - while ($stream['ips'] > 0 or count($this->ticks) > 0) { + do { foreach ($this->ticks as [ 'interval' => $interval, 'callback' => $callback, @@ -136,6 +138,7 @@ public function await(array &$stream): void do { foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); + if ($nextIp !== null) { $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { return $async(false)($job); @@ -146,12 +149,10 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } @@ -168,7 +169,7 @@ public function await(array &$stream): void } $tick++; - } + } while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0); } public function delay(float $seconds): void diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 0d01656..2225079 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -34,6 +34,8 @@ */ class ReactDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private LoopInterface $eventLoop; @@ -114,18 +116,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { $this->eventLoop->futureTick($loop); } else { $this->eventLoop->stop(); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index b008e5d..80a4503 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -30,6 +30,8 @@ */ class SpatieDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private Pool $pool; @@ -89,7 +91,7 @@ public function await(array &$stream): void }; $nextIp = null; - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { do { foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); @@ -99,17 +101,15 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); } public function delay(float $seconds): void diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 413c99d..2c6e9e9 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -30,6 +30,8 @@ */ class SwooleDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct() @@ -85,7 +87,7 @@ public function await(array &$stream): void }; co::run(function () use (&$stream, $async, $defer) { - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { $nextIp = null; do { foreach ($stream['dispatchers'] as $index => $dispatcher) { @@ -96,18 +98,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } co::sleep(1); } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); }); } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 69a588b..c33d16a 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure; public function defer(Closure $callback): mixed; /** - * @param array{'ips': int, 'fnFlows': array, 'dispatchers': array} $stream + * @param array{'fnFlows': array, 'dispatchers': array} $stream */ public function await(array &$stream): void; diff --git a/src/Event.php b/src/Event.php index c8a8a02..fa1515f 100644 --- a/src/Event.php +++ b/src/Event.php @@ -41,4 +41,13 @@ final class Event * @Event("Flow\Event\PopEvent") */ public const POP = 'pop'; + + /** + * The POOL event occurs when Flow need to get the pool IPs to process. + * + * This event allows you to get the pool IPs to process. + * + * @Event("Flow\Event\PoolEvent") + */ + public const POOL = 'pool'; } diff --git a/src/Event/PoolEvent.php b/src/Event/PoolEvent.php new file mode 100644 index 0000000..bedf786 --- /dev/null +++ b/src/Event/PoolEvent.php @@ -0,0 +1,43 @@ + + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + + /** + * @param array> $ips + */ + public function addIps(array $ips): void + { + foreach ($ips as $ip) { + $this->ipPool->addIp($ip); + } + } + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ipPool->getIps(); + } +} diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 511bd24..a5f588f 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -37,7 +37,6 @@ class Flow implements FlowInterface * @var array */ private array $stream = [ - 'ips' => 0, 'fnFlows' => [], 'dispatchers' => [], ]; @@ -89,7 +88,6 @@ public function __construct( public function __invoke(Ip $ip): void { - $this->stream['ips']++; $this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH); } diff --git a/src/IpPool.php b/src/IpPool.php new file mode 100644 index 0000000..db052a0 --- /dev/null +++ b/src/IpPool.php @@ -0,0 +1,40 @@ +> + */ + private array $ips = []; + + /** + * @param Ip $ip + * + * @return callable A function that removes the added IP from the pool when called + */ + public function addIp(Ip $ip): callable + { + $this->ips[] = $ip; + + return function () use ($ip) { + $this->ips = array_filter($this->ips, static function ($iteratorIp) use ($ip) { + return $iteratorIp !== $ip; + }); + }; + } + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ips; + } +} diff --git a/src/IpStrategy/LinearIpStrategy.php b/src/IpStrategy/LinearIpStrategy.php index 775df2b..f965e5a 100644 --- a/src/IpStrategy/LinearIpStrategy.php +++ b/src/IpStrategy/LinearIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; @@ -27,6 +28,7 @@ public static function getSubscribedEvents(): array return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -45,4 +47,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_shift($this->ips)); } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ips); + } } diff --git a/src/IpStrategy/MaxIpStrategy.php b/src/IpStrategy/MaxIpStrategy.php index e323839..1eb9a4a 100644 --- a/src/IpStrategy/MaxIpStrategy.php +++ b/src/IpStrategy/MaxIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PopEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; @@ -47,6 +48,7 @@ public static function getSubscribedEvents() Event::PUSH => 'push', Event::PULL => 'pull', Event::POP => 'pop', + Event::POOL => 'pool', ]; } @@ -67,9 +69,8 @@ public function pull(PullEvent $event): void $ip = $this->dispatcher->dispatch($event, Event::PULL)->getIp(); if ($ip) { $this->processing++; + $event->setIp($ip); } - - $event->setIp($ip); } } @@ -81,4 +82,9 @@ public function pop(PopEvent $event): void $this->dispatcher->dispatch($event, Event::POP); $this->processing--; } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->dispatcher->dispatch($event, Event::POOL)->getIps()); + } } diff --git a/src/IpStrategy/StackIpStrategy.php b/src/IpStrategy/StackIpStrategy.php index af2c506..09c2357 100644 --- a/src/IpStrategy/StackIpStrategy.php +++ b/src/IpStrategy/StackIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; @@ -27,6 +28,7 @@ public static function getSubscribedEvents() return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -45,4 +47,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_pop($this->ips)); } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ips); + } } diff --git a/src/IpStrategyInterface.php b/src/IpStrategyInterface.php index 1953c28..13e106f 100644 --- a/src/IpStrategyInterface.php +++ b/src/IpStrategyInterface.php @@ -4,9 +4,16 @@ namespace Flow; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** * @template T */ -interface IpStrategyInterface extends EventSubscriberInterface {} +interface IpStrategyInterface extends EventSubscriberInterface +{ + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; +}