diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b17280b..f2e8e89a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v1.2.x + +- Add event Event::POOL occurs when Flow needs to count IPs to process. +- Add `Flow\IpPool` for managing pools of Ips. +- Update `Flow\Event\PullEvent` to pull multiple Ips instead one. + ## v1.2.2 - Flow can now use `Flow\JobInterface` as job input diff --git a/examples/flow.php b/examples/flow.php index 3a4ec3cf..63fb476e 100644 --- a/examples/flow.php +++ b/examples/flow.php @@ -35,7 +35,7 @@ $job1 = static function (DataA $dataA) use ($driver): DataB { printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b); - // simulating calculating some "light" operation from 0.1 to 1 seconds + // simulating calculating some "light" operation from 1 to 3 seconds $delay = random_int(1, 3); $driver->delay($delay); $d = $dataA->a + $dataA->b; diff --git a/src/AsyncHandler/AsyncHandler.php b/src/AsyncHandler/AsyncHandler.php index c81c5cc7..941258d7 100644 --- a/src/AsyncHandler/AsyncHandler.php +++ b/src/AsyncHandler/AsyncHandler.php @@ -7,6 +7,8 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; /** * @template T @@ -15,10 +17,21 @@ */ final class AsyncHandler implements AsyncHandlerInterface { + /** + * @var IpPool + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -27,7 +40,17 @@ public function async(AsyncEvent $event): void $ip = $event->getIp(); $async = $event->getAsync(); $asyncJob = $async($event->getJob()); + + $popIp = $this->ipPool->addIp($ip); $next = $asyncJob($ip->data); - $next($event->getCallback()); + $next(static function ($data) use ($event, $popIp) { + $event->getCallback()($data); + $popIp(); + }); + } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); } } diff --git a/src/AsyncHandler/BatchAsyncHandler.php b/src/AsyncHandler/BatchAsyncHandler.php index 54625085..9f674677 100644 --- a/src/AsyncHandler/BatchAsyncHandler.php +++ b/src/AsyncHandler/BatchAsyncHandler.php @@ -7,28 +7,28 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; use Symfony\Component\Messenger\Handler\BatchHandlerTrait; use Throwable; /** - * @template T1 - * @template T2 + * @template T * - * @implements AsyncHandlerInterface + * @implements AsyncHandlerInterface */ final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface { use BatchHandlerTrait; /** - * @var AsyncHandlerInterface + * @var AsyncHandlerInterface */ private AsyncHandlerInterface $asyncHandler; /** - * @param null|AsyncHandlerInterface $asyncHandler + * @param null|AsyncHandlerInterface $asyncHandler */ public function __construct( private int $batchSize = 10, @@ -41,6 +41,7 @@ public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -53,12 +54,17 @@ public function async(AsyncEvent $event): void $this->handle($event, $ack); } + public function pool(PoolEvent $event): void + { + $this->asyncHandler->pool($event); + } + /** * PHPStan should normaly pass for method.unused * https://github.com/phpstan/phpstan/issues/6039 * https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c. * - * @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs + * @param array{0: AsyncEvent, 1: Acknowledger}[] $jobs * * @phpstan-ignore method.unused */ diff --git a/src/AsyncHandler/DeferAsyncHandler.php b/src/AsyncHandler/DeferAsyncHandler.php index b36727a9..3c0f1d9e 100644 --- a/src/AsyncHandler/DeferAsyncHandler.php +++ b/src/AsyncHandler/DeferAsyncHandler.php @@ -7,6 +7,8 @@ use Flow\AsyncHandlerInterface; use Flow\Event; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; +use Flow\IpPool; /** * @template T @@ -15,10 +17,21 @@ */ final class DeferAsyncHandler implements AsyncHandlerInterface { + /** + * @var IpPool + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + public static function getSubscribedEvents() { return [ Event::ASYNC => 'async', + Event::POOL => 'pool', ]; } @@ -26,11 +39,19 @@ public function async(AsyncEvent $event): void { $ip = $event->getIp(); $job = $event->getJob(); + + $popIp = $this->ipPool->addIp($ip); $next = $job([$ip->data, $event->getDefer()]); - $next(static function ($result) use ($event) { + $next(static function ($result) use ($event, $popIp) { [$data] = $result; $callback = $event->getCallback(); $callback($data); + $popIp(); }); } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); + } } diff --git a/src/AsyncHandlerInterface.php b/src/AsyncHandlerInterface.php index c66ce8df..01a6a1fa 100644 --- a/src/AsyncHandlerInterface.php +++ b/src/AsyncHandlerInterface.php @@ -5,6 +5,7 @@ namespace Flow; use Flow\Event\AsyncEvent; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** @@ -16,4 +17,9 @@ interface AsyncHandlerInterface extends EventSubscriberInterface * @param AsyncEvent $event */ public function async(AsyncEvent $event): void; + + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; } diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index c4b95b45..14650814 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -34,6 +34,8 @@ */ class AmpDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct(?Driver $driver = null) @@ -109,30 +111,25 @@ public function await(array &$stream): void }; $loop = function () use (&$loop, &$stream, $async, $defer) { - $nextIp = null; - do { - foreach ($stream['dispatchers'] as $index => $dispatcher) { - $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); - if ($nextIp !== null) { - $job = $stream['fnFlows'][$index]['job']; - - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) { - if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { - $stream['fnFlows'][$index]['errorJob']($data); - } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; - }), Event::ASYNC); - } + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $job = $stream['fnFlows'][$index]['job']; + + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); } - } while ($nextIp !== null); + } - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { EventLoop::defer($loop); } else { EventLoop::getDriver()->stop(); diff --git a/src/Driver/DriverTrait.php b/src/Driver/DriverTrait.php new file mode 100644 index 00000000..ee4c2233 --- /dev/null +++ b/src/Driver/DriverTrait.php @@ -0,0 +1,27 @@ + $dispatchers + */ + public function countIps(array $dispatchers): int + { + $count = 0; + foreach ($dispatchers as $dispatcher) { + $count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps()); + } + + return $count; + } +} diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 451297fe..2fdb3cef 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -30,6 +30,8 @@ */ class FiberDriver implements DriverInterface { + use DriverTrait; + /** * @var array */ @@ -121,7 +123,7 @@ public function await(array &$stream): void $tick = 0; $fiberDatas = []; - while ($stream['ips'] > 0 or count($this->ticks) > 0) { + do { foreach ($this->ticks as [ 'interval' => $interval, 'callback' => $callback, @@ -132,30 +134,25 @@ public function await(array &$stream): void } } - $nextIp = null; - do { - foreach ($stream['dispatchers'] as $index => $dispatcher) { - $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); - if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { - return $async(false)($job); - }, static function (Closure|JobInterface $job) use ($defer) { - return $defer(false)($job); - }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { - if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) { - $stream['fnFlows'][$index]['errorJob']($data); - } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; - }), Event::ASYNC); - } + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) { + return $async(false)($job); + }, static function (Closure|JobInterface $job) use ($defer) { + return $defer(false)($job); + }, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); } - } while ($nextIp !== null); + } foreach ($fiberDatas as $i => $fiberData) { // @phpstan-ignore-line see https://github.com/phpstan/phpstan/issues/11468 if (!$fiberData['fiber']->isTerminated() and $fiberData['fiber']->isSuspended()) { @@ -168,7 +165,7 @@ public function await(array &$stream): void } $tick++; - } + } while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0); } public function delay(float $seconds): void diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 0d016563..96c21fb4 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -34,6 +34,8 @@ */ class ReactDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private LoopInterface $eventLoop; @@ -102,30 +104,25 @@ public function await(array &$stream): void }; $loop = function () use (&$loop, &$stream, $async, $defer) { - $nextIp = null; - do { - foreach ($stream['dispatchers'] as $index => $dispatcher) { - $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); - if ($nextIp !== null) { - $job = $stream['fnFlows'][$index]['job']; - - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) { - if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { - $stream['fnFlows'][$index]['errorJob']($data); - } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; - }), Event::ASYNC); - } + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $job = $stream['fnFlows'][$index]['job']; + + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); } - } while ($nextIp !== null); + } - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { $this->eventLoop->futureTick($loop); } else { $this->eventLoop->stop(); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index b008e5d2..75d3c097 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -30,6 +30,8 @@ */ class SpatieDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private Pool $pool; @@ -88,28 +90,23 @@ public function await(array &$stream): void }; }; - $nextIp = null; - while ($stream['ips'] > 0 or $this->ticks > 0) { - do { - foreach ($stream['dispatchers'] as $index => $dispatcher) { - $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); - if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { - if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { - $stream['fnFlows'][$index]['errorJob']($data); - } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; - }), Event::ASYNC); - } + do { + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); } - } while ($nextIp !== null); - } + } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); } public function delay(float $seconds): void diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 413c99dd..46340479 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -30,6 +30,8 @@ */ class SwooleDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct() @@ -85,29 +87,24 @@ public function await(array &$stream): void }; co::run(function () use (&$stream, $async, $defer) { - while ($stream['ips'] > 0 or $this->ticks > 0) { - $nextIp = null; - do { - foreach ($stream['dispatchers'] as $index => $dispatcher) { - $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); - if ($nextIp !== null) { - $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { - if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { - $stream['fnFlows'][$index]['errorJob']($data); - } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { - $ip = new Ip($data); - $stream['ips']++; - $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); - } - - $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; - }), Event::ASYNC); - } + do { + foreach ($stream['dispatchers'] as $index => $dispatcher) { + $nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($nextIps as $nextIp) { + $stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) { + if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) { + $stream['fnFlows'][$index]['errorJob']($data); + } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { + $ip = new Ip($data); + $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); + } + + $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); + }), Event::ASYNC); } - co::sleep(1); - } while ($nextIp !== null); - } + } + co::sleep(1); + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); }); } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 69a588b7..c33d16ad 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure; public function defer(Closure $callback): mixed; /** - * @param array{'ips': int, 'fnFlows': array, 'dispatchers': array} $stream + * @param array{'fnFlows': array, 'dispatchers': array} $stream */ public function await(array &$stream): void; diff --git a/src/Event.php b/src/Event.php index c8a8a02b..560597e4 100644 --- a/src/Event.php +++ b/src/Event.php @@ -27,7 +27,7 @@ final class Event /** * The PULL event occurs when Flow need a next IP to async process. * - * This event allows you to choose what IP come next from your pushed IPs and will be used for async process execution. + * This event allows you to choose what IPs come next from your pushed IPs and will be used for async process execution. * * @Event("Flow\Event\PullEvent") */ @@ -41,4 +41,13 @@ final class Event * @Event("Flow\Event\PopEvent") */ public const POP = 'pop'; + + /** + * The POOL event occurs when Flow need to get the pool IPs to process. + * + * This event allows you to get the pool IPs to process. + * + * @Event("Flow\Event\PoolEvent") + */ + public const POOL = 'pool'; } diff --git a/src/Event/PoolEvent.php b/src/Event/PoolEvent.php new file mode 100644 index 00000000..bedf7860 --- /dev/null +++ b/src/Event/PoolEvent.php @@ -0,0 +1,43 @@ + + */ + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } + + /** + * @param array> $ips + */ + public function addIps(array $ips): void + { + foreach ($ips as $ip) { + $this->ipPool->addIp($ip); + } + } + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ipPool->getIps(); + } +} diff --git a/src/Event/PullEvent.php b/src/Event/PullEvent.php index f9c429fa..e401542a 100644 --- a/src/Event/PullEvent.php +++ b/src/Event/PullEvent.php @@ -5,6 +5,7 @@ namespace Flow\Event; use Flow\Ip; +use Flow\IpPool; use Symfony\Contracts\EventDispatcher\Event; /** @@ -13,23 +14,28 @@ final class PullEvent extends Event { /** - * @var null|Ip + * @var IpPool */ - private ?Ip $ip = null; + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } /** - * @return null|Ip + * @return Ip[] */ - public function getIp(): ?Ip + public function getIps(): array { - return $this->ip; + return $this->ipPool->getIps(); } /** - * @param null|Ip $ip + * @param Ip $ip */ - public function setIp(?Ip $ip): void + public function addIp(Ip $ip): void { - $this->ip = $ip; + $this->ipPool->addIp($ip); } } diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 511bd24a..a5f588fc 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -37,7 +37,6 @@ class Flow implements FlowInterface * @var array */ private array $stream = [ - 'ips' => 0, 'fnFlows' => [], 'dispatchers' => [], ]; @@ -89,7 +88,6 @@ public function __construct( public function __invoke(Ip $ip): void { - $this->stream['ips']++; $this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH); } diff --git a/src/IpPool.php b/src/IpPool.php new file mode 100644 index 00000000..0ee63c5a --- /dev/null +++ b/src/IpPool.php @@ -0,0 +1,56 @@ +> + */ + private array $ips = []; + + /** + * @param Ip $ip + * + * @return callable A function that removes the added IP from the pool when called + */ + public function addIp(Ip $ip): callable + { + $this->ips[] = $ip; + + return function () use ($ip) { + $this->ips = array_filter($this->ips, static function ($iteratorIp) use ($ip) { + return $iteratorIp !== $ip; + }); + }; + } + + /** + * @return array> + */ + public function getIps(): array + { + return $this->ips; + } + + /** + * @return null|Ip + */ + public function shiftIp(): ?Ip + { + return array_shift($this->ips); + } + + /** + * @return null|Ip + */ + public function popIp(): ?Ip + { + return array_pop($this->ips); + } +} diff --git a/src/IpStrategy/LinearIpStrategy.php b/src/IpStrategy/LinearIpStrategy.php index 775df2b1..133536d6 100644 --- a/src/IpStrategy/LinearIpStrategy.php +++ b/src/IpStrategy/LinearIpStrategy.php @@ -5,9 +5,10 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; -use Flow\Ip; +use Flow\IpPool; use Flow\IpStrategyInterface; /** @@ -18,15 +19,21 @@ class LinearIpStrategy implements IpStrategyInterface { /** - * @var array> + * @var IpPool */ - private array $ips = []; + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } public static function getSubscribedEvents(): array { return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -35,7 +42,7 @@ public static function getSubscribedEvents(): array */ public function push(PushEvent $event): void { - $this->ips[] = $event->getIp(); + $this->ipPool->addIp($event->getIp()); } /** @@ -43,6 +50,14 @@ public function push(PushEvent $event): void */ public function pull(PullEvent $event): void { - $event->setIp(array_shift($this->ips)); + $ip = $this->ipPool->shiftIp(); + if ($ip !== null) { + $event->addIp($ip); + } + } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); } } diff --git a/src/IpStrategy/MaxIpStrategy.php b/src/IpStrategy/MaxIpStrategy.php index e3238399..f789a1af 100644 --- a/src/IpStrategy/MaxIpStrategy.php +++ b/src/IpStrategy/MaxIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PopEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; @@ -47,6 +48,7 @@ public static function getSubscribedEvents() Event::PUSH => 'push', Event::PULL => 'pull', Event::POP => 'pop', + Event::POOL => 'pool', ]; } @@ -63,13 +65,14 @@ public function push(PushEvent $event): void */ public function pull(PullEvent $event): void { - if ($this->processing < $this->max) { - $ip = $this->dispatcher->dispatch($event, Event::PULL)->getIp(); - if ($ip) { + $ips = $this->dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps(); + foreach ($ips as $ip) { + if ($this->processing < $this->max) { $this->processing++; + $event->addIp($ip); + } else { + $this->dispatcher->dispatch(new PushEvent($ip), Event::PUSH); } - - $event->setIp($ip); } } @@ -81,4 +84,9 @@ public function pop(PopEvent $event): void $this->dispatcher->dispatch($event, Event::POP); $this->processing--; } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->dispatcher->dispatch($event, Event::POOL)->getIps()); + } } diff --git a/src/IpStrategy/StackIpStrategy.php b/src/IpStrategy/StackIpStrategy.php index af2c506d..132573fe 100644 --- a/src/IpStrategy/StackIpStrategy.php +++ b/src/IpStrategy/StackIpStrategy.php @@ -5,9 +5,10 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\PoolEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; -use Flow\Ip; +use Flow\IpPool; use Flow\IpStrategyInterface; /** @@ -18,15 +19,21 @@ class StackIpStrategy implements IpStrategyInterface { /** - * @var array> + * @var IpPool */ - private array $ips = []; + private IpPool $ipPool; + + public function __construct() + { + $this->ipPool = new IpPool(); + } public static function getSubscribedEvents() { return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::POOL => 'pool', ]; } @@ -35,7 +42,7 @@ public static function getSubscribedEvents() */ public function push(PushEvent $event): void { - $this->ips[] = $event->getIp(); + $this->ipPool->addIp($event->getIp()); } /** @@ -43,6 +50,14 @@ public function push(PushEvent $event): void */ public function pull(PullEvent $event): void { - $event->setIp(array_pop($this->ips)); + $ip = $this->ipPool->popIp(); + if ($ip !== null) { + $event->addIp($ip); + } + } + + public function pool(PoolEvent $event): void + { + $event->addIps($this->ipPool->getIps()); } } diff --git a/src/IpStrategyInterface.php b/src/IpStrategyInterface.php index 1953c285..13e106fe 100644 --- a/src/IpStrategyInterface.php +++ b/src/IpStrategyInterface.php @@ -4,9 +4,16 @@ namespace Flow; +use Flow\Event\PoolEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; /** * @template T */ -interface IpStrategyInterface extends EventSubscriberInterface {} +interface IpStrategyInterface extends EventSubscriberInterface +{ + /** + * @param PoolEvent $event + */ + public function pool(PoolEvent $event): void; +} diff --git a/tests/AsyncHandler/AsyncHandlerTest.php b/tests/AsyncHandler/AsyncHandlerTest.php index a1f6cf78..6bddb264 100644 --- a/tests/AsyncHandler/AsyncHandlerTest.php +++ b/tests/AsyncHandler/AsyncHandlerTest.php @@ -24,8 +24,8 @@ static function ($data) use (&$result) { [$n1, $n2] = $data; $result = $n1 + $n2; - return static function ($callback) { - $callback(); + return static function ($callback) use ($result) { + $callback($result); }; }, new Ip([2, 6]), diff --git a/tests/AsyncHandler/BatchAsyncHandlerTest.php b/tests/AsyncHandler/BatchAsyncHandlerTest.php index bc2bad52..5ec6d01f 100644 --- a/tests/AsyncHandler/BatchAsyncHandlerTest.php +++ b/tests/AsyncHandler/BatchAsyncHandlerTest.php @@ -24,8 +24,8 @@ static function ($data) use (&$result1) { [$n1, $n2] = $data; $result1 = $n1 + $n2; - return static function ($callback) { - $callback(); + return static function ($callback) use ($result1) { + $callback($result1); }; }, new Ip([2, 6]), @@ -41,8 +41,8 @@ static function ($data) use (&$result2) { [$n1, $n2] = $data; $result2 = $n1 + $n2; - return static function ($callback) { - $callback(); + return static function ($callback) use ($result2) { + $callback($result2); }; }, new Ip([6, 10]), diff --git a/tests/IpStrategy/LinearIpStrategyTest.php b/tests/IpStrategy/LinearIpStrategyTest.php index 10e44a07..ea5d6b22 100644 --- a/tests/IpStrategy/LinearIpStrategyTest.php +++ b/tests/IpStrategy/LinearIpStrategyTest.php @@ -23,16 +23,16 @@ public function testStrategy(): void $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNotNull($pullEvent->getIp()); - self::assertSame($ip1, $pullEvent->getIp()); + self::assertNotEmpty($pullEvent->getIps()); + self::assertSame($ip1, $pullEvent->getIps()[0]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNotNull($pullEvent->getIp()); - self::assertSame($ip2, $pullEvent->getIp()); + self::assertNotEmpty($pullEvent->getIps()); + self::assertSame($ip2, $pullEvent->getIps()[0]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNull($pullEvent->getIp()); + self::assertEmpty($pullEvent->getIps()); } } diff --git a/tests/IpStrategy/MaxIpStrategyTest.php b/tests/IpStrategy/MaxIpStrategyTest.php index 0ccb61c4..7363be3c 100644 --- a/tests/IpStrategy/MaxIpStrategyTest.php +++ b/tests/IpStrategy/MaxIpStrategyTest.php @@ -27,24 +27,24 @@ public function testStrategy(int $doneIndex): void $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - $ips[] = $pullEvent->getIp(); + $ips[] = $pullEvent->getIps()[0] ?? null; self::assertNotNull($ips[0]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - $ips[] = $pullEvent->getIp(); + $ips[] = $pullEvent->getIps()[0] ?? null; self::assertNotNull($ips[1]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - $ips[] = $pullEvent->getIp(); + $ips[] = $pullEvent->getIps()[0] ?? null; self::assertNull($ips[2]); $strategy->pop(new PopEvent($ips[$doneIndex])); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - $ips[] = $pullEvent->getIp(); + $ips[] = $pullEvent->getIps()[0] ?? null; self::assertNotNull($ips[3]); } diff --git a/tests/IpStrategy/StackIpStrategyTest.php b/tests/IpStrategy/StackIpStrategyTest.php index e3b72a60..738e5cdc 100644 --- a/tests/IpStrategy/StackIpStrategyTest.php +++ b/tests/IpStrategy/StackIpStrategyTest.php @@ -23,16 +23,16 @@ public function testStrategy(): void $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNotNull($pullEvent->getIp()); - self::assertSame($ip2, $pullEvent->getIp()); + self::assertNotEmpty($pullEvent->getIps()); + self::assertSame($ip2, $pullEvent->getIps()[0]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNotNull($pullEvent->getIp()); - self::assertSame($ip1, $pullEvent->getIp()); + self::assertNotEmpty($pullEvent->getIps()); + self::assertSame($ip1, $pullEvent->getIps()[0]); $pullEvent = new PullEvent(); $strategy->pull($pullEvent); - self::assertNull($pullEvent->getIp()); + self::assertEmpty($pullEvent->getIps()); } }