diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b17280..dde900f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.2.x + +- Add event Event::POOL occurs when Flow need to count IPs to process. + ## v1.2.2 - Flow can now use `Flow\JobInterface` as job input diff --git a/src/AsyncHandler/AsyncHandler.php b/src/AsyncHandler/AsyncHandler.php index c81c5cc..c876a1b 100644 --- a/src/AsyncHandler/AsyncHandler.php +++ b/src/AsyncHandler/AsyncHandler.php @@ -7,6 +7,8 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; /** * @template T @@ -15,10 +17,18 @@ */ final class AsyncHandler implements AsyncHandlerInterface { + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -27,7 +37,17 @@ public function async(AsyncEvent $event): void $ip = $event->getIp(); $async = $event->getAsync(); $asyncJob = $async($event->getJob()); + + $popIp = $this->ipPool->addIp($ip); $next = $asyncJob($ip->data); - $next($event->getCallback()); + $next(static function ($result) use ($event, $popIp) { + $event->getCallback()($result); + $popIp(); + }); + } + + public function pool(PoolEvent $event): void + { + $event->setIps($this->ipPool->getIps()); } } diff --git a/src/AsyncHandler/BatchAsyncHandler.php b/src/AsyncHandler/BatchAsyncHandler.php index 5462508..43da785 100644 --- a/src/AsyncHandler/BatchAsyncHandler.php +++ b/src/AsyncHandler/BatchAsyncHandler.php @@ -7,6 +7,7 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; use Symfony\Component\Messenger\Handler\BatchHandlerTrait; @@ -41,6 +42,7 @@ public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -53,6 +55,11 @@ public function async(AsyncEvent $event): void $this->handle($event, $ack); } + public function pool(PoolEvent $event): void + { + $this->asyncHandler->pool($event); + } + /** * PHPStan should normaly pass for method.unused * https://github.com/phpstan/phpstan/issues/6039 diff --git a/src/AsyncHandler/DeferAsyncHandler.php b/src/AsyncHandler/DeferAsyncHandler.php index b36727a..523fa7b 100644 --- a/src/AsyncHandler/DeferAsyncHandler.php +++ b/src/AsyncHandler/DeferAsyncHandler.php @@ -7,6 +7,10 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; + + // Add this import /** * @template T @@ -15,10 +19,18 @@ */ final class DeferAsyncHandler implements AsyncHandlerInterface { + private IpPool $ipPool; + + public function __construct(IpPool $ipPool) + { + $this->ipPool = $ipPool; + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -26,11 +38,19 @@ public function async(AsyncEvent $event): void { $ip = $event->getIp(); $job = $event->getJob(); + + $popIp = $this->ipPool->addIp($ip); $next = $job([$ip->data, $event->getDefer()]); - $next(static function ($result) use ($event) { + $next(static function ($result) use ($event, $popIp) { [$data] = $result; $callback = $event->getCallback(); $callback($data); + $popIp(); }); } + + public function pool(PoolEvent $event): void + { + $event->setIps($this->ipPool->getIps()); + } } diff --git a/src/AsyncHandlerInterface.php b/src/AsyncHandlerInterface.php index c66ce8d..01a6a1f 100644 --- a/src/AsyncHandlerInterface.php +++ b/src/AsyncHandlerInterface.php @@ -5,6 +5,7 @@ namespace Flow; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** @@ -16,4 +17,9 @@ interface AsyncHandlerInterface extends EventSubscriberInterface * @param AsyncEvent $event */ public function async(AsyncEvent $event): void; + + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; } diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index c4b95b4..89d732b 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -34,6 +34,8 @@ */ class AmpDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct(?Driver $driver = null) @@ -121,18 +123,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { EventLoop::defer($loop); } else { EventLoop::getDriver()->stop(); diff --git a/src/Driver/DriverTrait.php b/src/Driver/DriverTrait.php new file mode 100644 index 0000000..ee4c223 --- /dev/null +++ b/src/Driver/DriverTrait.php @@ -0,0 +1,27 @@ + $dispatchers + */ + public function countIps(array $dispatchers): int + { + $count = 0; + foreach ($dispatchers as $dispatcher) { + $count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps()); + } + + return $count; + } +} diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 451297f..b13c552 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -30,6 +30,8 @@ */ class FiberDriver implements DriverInterface { + use DriverTrait; + /** * @var array */ @@ -121,7 +123,7 @@ public function await(array &$stream): void $tick = 0; $fiberDatas = []; - while ($stream['ips'] > 0 or count($this->ticks) > 0) { + do { foreach ($this->ticks as [ 'interval' => $interval, 'callback' => $callback, @@ -146,12 +148,10 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } @@ -168,7 +168,7 @@ public function await(array &$stream): void } $tick++; - } + } while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0); } public function delay(float $seconds): void diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 0d01656..2225079 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -34,6 +34,8 @@ */ class ReactDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private LoopInterface $eventLoop; @@ -114,18 +116,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { $this->eventLoop->futureTick($loop); } else { $this->eventLoop->stop(); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index b008e5d..80a4503 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -30,6 +30,8 @@ */ class SpatieDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private Pool $pool; @@ -89,7 +91,7 @@ public function await(array &$stream): void }; $nextIp = null; - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { do { foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); @@ -99,17 +101,15 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); } public function delay(float $seconds): void diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 413c99d..2c6e9e9 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -30,6 +30,8 @@ */ class SwooleDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct() @@ -85,7 +87,7 @@ public function await(array &$stream): void }; co::run(function () use (&$stream, $async, $defer) { - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { $nextIp = null; do { foreach ($stream['dispatchers'] as $index => $dispatcher) { @@ -96,18 +98,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } co::sleep(1); } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); }); } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 69a588b..c33d16a 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure; public function defer(Closure $callback): mixed; /** - * @param array{'ips': int, 'fnFlows': array, 'dispatchers': array} $stream + * @param array{'fnFlows': array, 'dispatchers': array} $stream */ public function await(array &$stream): void; diff --git a/src/Event.php b/src/Event.php index c8a8a02..fa1515f 100644 --- a/src/Event.php +++ b/src/Event.php @@ -41,4 +41,13 @@ final class Event * @Event("Flow\Event\PopEvent") */ public const POP = 'pop'; + + /** + * The POOL event occurs when Flow need to get the pool IPs to process. + * + * This event allows you to get the pool IPs to process. + * + * @Event("Flow\Event\PoolEvent") + */ + public const POOL = 'pool'; } diff --git a/src/Event/PoolEvent.php b/src/Event/PoolEvent.php new file mode 100644 index 0000000..3c1d4eb --- /dev/null +++ b/src/Event/PoolEvent.php @@ -0,0 +1,35 @@ +> + */ + private array $ips; + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ips; + } + + /** + * @param array> $ips + */ + public function setIps(array $ips): void + { + $this->ips = $ips; + } +} diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 511bd24..a5f588f 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -37,7 +37,6 @@ class Flow implements FlowInterface * @var array */ private array $stream = [ - 'ips' => 0, 'fnFlows' => [], 'dispatchers' => [], ]; @@ -89,7 +88,6 @@ public function __construct( public function __invoke(Ip $ip): void { - $this->stream['ips']++; $this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH); } diff --git a/src/IpPool.php b/src/IpPool.php new file mode 100644 index 0000000..c4f1d54 --- /dev/null +++ b/src/IpPool.php @@ -0,0 +1,38 @@ +> + */ + private array $ips = []; + + /** + * @return callable A function that removes the added IP from the pool when called + */ + public function addIp(Ip $ip): callable + { + $this->ips[] = $ip; + + return function () use ($ip) { + $this->ips = array_filter($this->ips, static function ($iteratorIp) use ($ip) { + return $iteratorIp !== $ip; + }); + }; + } + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ips; + } +} diff --git a/src/IpStrategy/LinearIpStrategy.php b/src/IpStrategy/LinearIpStrategy.php index 775df2b..cce2846 100644 --- a/src/IpStrategy/LinearIpStrategy.php +++ b/src/IpStrategy/LinearIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; @@ -27,6 +28,7 @@ public static function getSubscribedEvents(): array return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -45,4 +47,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_shift($this->ips)); } + + public function pool(PoolEvent $event): void + { + $event->setIps($this->ips); + } } diff --git a/src/IpStrategy/MaxIpStrategy.php b/src/IpStrategy/MaxIpStrategy.php index e323839..e990259 100644 --- a/src/IpStrategy/MaxIpStrategy.php +++ b/src/IpStrategy/MaxIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PopEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; @@ -47,6 +48,7 @@ public static function getSubscribedEvents() Event::PUSH => 'push', Event::PULL => 'pull', Event::POP => 'pop', + Event::POOL => 'pool', ]; } @@ -67,9 +69,8 @@ public function pull(PullEvent $event): void $ip = $this->dispatcher->dispatch($event, Event::PULL)->getIp(); if ($ip) { $this->processing++; + $event->setIp($ip); } - - $event->setIp($ip); } } @@ -81,4 +82,9 @@ public function pop(PopEvent $event): void $this->dispatcher->dispatch($event, Event::POP); $this->processing--; } + + public function pool(PoolEvent $event): void + { + $event->setIps($this->dispatcher->dispatch($event, Event::POOL)->getIps()); + } } diff --git a/src/IpStrategy/StackIpStrategy.php b/src/IpStrategy/StackIpStrategy.php index af2c506..ce377d9 100644 --- a/src/IpStrategy/StackIpStrategy.php +++ b/src/IpStrategy/StackIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; @@ -27,6 +28,7 @@ public static function getSubscribedEvents() return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -45,4 +47,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_pop($this->ips)); } + + public function pool(PoolEvent $event): void + { + $event->setIps($this->ips); + } } diff --git a/src/IpStrategyInterface.php b/src/IpStrategyInterface.php index 1953c28..13e106f 100644 --- a/src/IpStrategyInterface.php +++ b/src/IpStrategyInterface.php @@ -4,9 +4,16 @@ namespace Flow; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** * @template T */ -interface IpStrategyInterface extends EventSubscriberInterface {} +interface IpStrategyInterface extends EventSubscriberInterface +{ + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; +}