From 54f5eeac8d3bfaf7c8b4c2ab18796d6e186a2006 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Tue, 3 Sep 2024 02:04:55 +0200 Subject: [PATCH] :sparkles: Add count event --- CHANGELOG.md | 4 ++++ src/Driver/AmpDriver.php | 6 +++--- src/Driver/DriverTrait.php | 25 +++++++++++++++++++++++++ src/Driver/FiberDriver.php | 8 ++++---- src/Driver/ReactDriver.php | 6 +++--- src/Driver/SpatieDriver.php | 8 ++++---- src/Driver/SwooleDriver.php | 8 ++++---- src/DriverInterface.php | 2 +- src/Event.php | 9 +++++++++ src/Event/CountEvent.php | 22 ++++++++++++++++++++++ src/Flow/Flow.php | 2 -- src/IpStrategy/LinearIpStrategy.php | 9 +++++++++ src/IpStrategy/MaxIpStrategy.php | 10 ++++++++-- src/IpStrategy/StackIpStrategy.php | 9 +++++++++ 14 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 src/Driver/DriverTrait.php create mode 100644 src/Event/CountEvent.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b17280..f54be40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.2.x + +- Add event Event::COUNT occurs when Flow need to count IPs to process. + ## v1.2.2 - Flow can now use `Flow\JobInterface` as job input diff --git a/src/Driver/AmpDriver.php b/src/Driver/AmpDriver.php index c4b95b4..89d732b 100644 --- a/src/Driver/AmpDriver.php +++ b/src/Driver/AmpDriver.php @@ -34,6 +34,8 @@ */ class AmpDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct(?Driver $driver = null) @@ -121,18 +123,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { EventLoop::defer($loop); } else { EventLoop::getDriver()->stop(); diff --git a/src/Driver/DriverTrait.php b/src/Driver/DriverTrait.php new file mode 100644 index 0000000..bc9d364 --- /dev/null +++ b/src/Driver/DriverTrait.php @@ -0,0 +1,25 @@ + $dispatchers + */ + public function countIps(array $dispatchers): int + { + $count = 0; + foreach ($dispatchers as $dispatcher) { + $count += $dispatcher->dispatch(new CountEvent(), Event::COUNT)->getCount(); + } + + return $count; + } +} diff --git a/src/Driver/FiberDriver.php b/src/Driver/FiberDriver.php index 451297f..b13c552 100644 --- a/src/Driver/FiberDriver.php +++ b/src/Driver/FiberDriver.php @@ -30,6 +30,8 @@ */ class FiberDriver implements DriverInterface { + use DriverTrait; + /** * @var array */ @@ -121,7 +123,7 @@ public function await(array &$stream): void $tick = 0; $fiberDatas = []; - while ($stream['ips'] > 0 or count($this->ticks) > 0) { + do { foreach ($this->ticks as [ 'interval' => $interval, 'callback' => $callback, @@ -146,12 +148,10 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } @@ -168,7 +168,7 @@ public function await(array &$stream): void } $tick++; - } + } while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0); } public function delay(float $seconds): void diff --git a/src/Driver/ReactDriver.php b/src/Driver/ReactDriver.php index 0d01656..2225079 100644 --- a/src/Driver/ReactDriver.php +++ b/src/Driver/ReactDriver.php @@ -34,6 +34,8 @@ */ class ReactDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private LoopInterface $eventLoop; @@ -114,18 +116,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - if ($stream['ips'] > 0 or $this->ticks > 0) { + if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) { $this->eventLoop->futureTick($loop); } else { $this->eventLoop->stop(); diff --git a/src/Driver/SpatieDriver.php b/src/Driver/SpatieDriver.php index b008e5d..80a4503 100644 --- a/src/Driver/SpatieDriver.php +++ b/src/Driver/SpatieDriver.php @@ -30,6 +30,8 @@ */ class SpatieDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; private Pool $pool; @@ -89,7 +91,7 @@ public function await(array &$stream): void }; $nextIp = null; - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { do { foreach ($stream['dispatchers'] as $index => $dispatcher) { $nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp(); @@ -99,17 +101,15 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); } public function delay(float $seconds): void diff --git a/src/Driver/SwooleDriver.php b/src/Driver/SwooleDriver.php index 413c99d..2c6e9e9 100644 --- a/src/Driver/SwooleDriver.php +++ b/src/Driver/SwooleDriver.php @@ -30,6 +30,8 @@ */ class SwooleDriver implements DriverInterface { + use DriverTrait; + private int $ticks = 0; public function __construct() @@ -85,7 +87,7 @@ public function await(array &$stream): void }; co::run(function () use (&$stream, $async, $defer) { - while ($stream['ips'] > 0 or $this->ticks > 0) { + do { $nextIp = null; do { foreach ($stream['dispatchers'] as $index => $dispatcher) { @@ -96,18 +98,16 @@ public function await(array &$stream): void $stream['fnFlows'][$index]['errorJob']($data); } elseif (array_key_exists($index + 1, $stream['fnFlows'])) { $ip = new Ip($data); - $stream['ips']++; $stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH); } $stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP); - $stream['ips']--; }), Event::ASYNC); } } co::sleep(1); } while ($nextIp !== null); - } + } while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0); }); } diff --git a/src/DriverInterface.php b/src/DriverInterface.php index 69a588b..c33d16a 100644 --- a/src/DriverInterface.php +++ b/src/DriverInterface.php @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure; public function defer(Closure $callback): mixed; /** - * @param array{'ips': int, 'fnFlows': array, 'dispatchers': array} $stream + * @param array{'fnFlows': array, 'dispatchers': array} $stream */ public function await(array &$stream): void; diff --git a/src/Event.php b/src/Event.php index c8a8a02..34c0781 100644 --- a/src/Event.php +++ b/src/Event.php @@ -15,6 +15,15 @@ final class Event */ public const PUSH = 'push'; + /** + * The COUNT event occurs when Flow need to count IPs to process. + * + * This event loop the event loop. + * + * @Event("Flow\Event\CountEvent") + */ + public const COUNT = 'count'; + /** * The ASYNC event occurs when Flow dispatch async process execution. * diff --git a/src/Event/CountEvent.php b/src/Event/CountEvent.php new file mode 100644 index 0000000..c6f842d --- /dev/null +++ b/src/Event/CountEvent.php @@ -0,0 +1,22 @@ +count; + } + + public function setCount(int $count): void + { + $this->count = $count; + } +} diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 511bd24..a5f588f 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -37,7 +37,6 @@ class Flow implements FlowInterface * @var array */ private array $stream = [ - 'ips' => 0, 'fnFlows' => [], 'dispatchers' => [], ]; @@ -89,7 +88,6 @@ public function __construct( public function __invoke(Ip $ip): void { - $this->stream['ips']++; $this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH); } diff --git a/src/IpStrategy/LinearIpStrategy.php b/src/IpStrategy/LinearIpStrategy.php index 775df2b..5880a7f 100644 --- a/src/IpStrategy/LinearIpStrategy.php +++ b/src/IpStrategy/LinearIpStrategy.php @@ -5,11 +5,14 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\CountEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; use Flow\IpStrategyInterface; +use function count; + /** * @template T * @@ -27,6 +30,7 @@ public static function getSubscribedEvents(): array return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::COUNT => 'count', ]; } @@ -45,4 +49,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_shift($this->ips)); } + + public function count(CountEvent $event): void + { + $event->setCount(count($this->ips)); + } } diff --git a/src/IpStrategy/MaxIpStrategy.php b/src/IpStrategy/MaxIpStrategy.php index e323839..e8d9eb6 100644 --- a/src/IpStrategy/MaxIpStrategy.php +++ b/src/IpStrategy/MaxIpStrategy.php @@ -5,6 +5,7 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\CountEvent; use Flow\Event\PopEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; @@ -47,6 +48,7 @@ public static function getSubscribedEvents() Event::PUSH => 'push', Event::PULL => 'pull', Event::POP => 'pop', + Event::COUNT => 'count', ]; } @@ -67,9 +69,8 @@ public function pull(PullEvent $event): void $ip = $this->dispatcher->dispatch($event, Event::PULL)->getIp(); if ($ip) { $this->processing++; + $event->setIp($ip); } - - $event->setIp($ip); } } @@ -81,4 +82,9 @@ public function pop(PopEvent $event): void $this->dispatcher->dispatch($event, Event::POP); $this->processing--; } + + public function count(CountEvent $event): void + { + $event->setCount($this->processing); + } } diff --git a/src/IpStrategy/StackIpStrategy.php b/src/IpStrategy/StackIpStrategy.php index af2c506..3f2a08e 100644 --- a/src/IpStrategy/StackIpStrategy.php +++ b/src/IpStrategy/StackIpStrategy.php @@ -5,11 +5,14 @@ namespace Flow\IpStrategy; use Flow\Event; +use Flow\Event\CountEvent; use Flow\Event\PullEvent; use Flow\Event\PushEvent; use Flow\Ip; use Flow\IpStrategyInterface; +use function count; + /** * @template T * @@ -27,6 +30,7 @@ public static function getSubscribedEvents() return [ Event::PUSH => 'push', Event::PULL => 'pull', + Event::COUNT => 'count', ]; } @@ -45,4 +49,9 @@ public function pull(PullEvent $event): void { $event->setIp(array_pop($this->ips)); } + + public function count(CountEvent $event): void + { + $event->setCount(count($this->ips)); + } }