From 8e57c14efb1b5331198ef97c51301c1eb4e9e1a5 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Fri, 8 Sep 2023 13:30:01 +0200 Subject: [PATCH] :sparkles: Add event system for processing IpStrategy --- CHANGELOG.md | 2 ++ composer.json | 3 ++- src/Event/PopEvent.php | 35 +++++++++++++++++++++++++++++ src/Event/PullEvent.php | 35 +++++++++++++++++++++++++++++ src/Event/PushEvent.php | 35 +++++++++++++++++++++++++++++ src/Flow/Flow.php | 19 ++++++++++++---- src/IpStrategy/LinearIpStrategy.php | 27 +++++++++++++--------- src/IpStrategy/MaxIpStrategy.php | 33 ++++++++++++++++++--------- src/IpStrategy/StackIpStrategy.php | 25 ++++++++++++--------- src/IpStrategyEvent.php | 35 +++++++++++++++++++++++++++++ src/IpStrategyInterface.php | 21 +++-------------- 11 files changed, 215 insertions(+), 55 deletions(-) create mode 100644 src/Event/PopEvent.php create mode 100644 src/Event/PullEvent.php create mode 100644 src/Event/PushEvent.php create mode 100644 src/IpStrategyEvent.php diff --git a/CHANGELOG.md b/CHANGELOG.md index fb344aed..35b53eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ - array : constructor arguments for Flow instanciation - array (view as shape) : configuration for Flow instanciation - FlowInterface : the FlowInterface instance itself + - array : map of all possible above choices +- Add event system for processing IpStrategy ## v1.1.4 diff --git a/composer.json b/composer.json index e4dbe47f..1e7670f6 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,8 @@ "php" ], "require": { - "php": ">=8.2" + "php": ">=8.2", + "symfony/event-dispatcher": "^6.3" }, "require-dev": { "amphp/amp": "^3.0", diff --git a/src/Event/PopEvent.php b/src/Event/PopEvent.php new file mode 100644 index 00000000..8d993659 --- /dev/null +++ b/src/Event/PopEvent.php @@ -0,0 +1,35 @@ + $ip + */ + private Ip $ip; + + /** + * @param Ip $ip + */ + public function __construct(Ip $ip) + { + $this->ip = $ip; + } + + /** + * @return Ip + */ + public function getIp(): Ip + { + return $this->ip; + } +} diff --git a/src/Event/PullEvent.php b/src/Event/PullEvent.php new file mode 100644 index 00000000..04b1c831 --- /dev/null +++ b/src/Event/PullEvent.php @@ -0,0 +1,35 @@ + $ip + */ + private ?Ip $ip = null; + + /** + * @return null|Ip + */ + public function getIp(): ?Ip + { + return $this->ip; + } + + /** + * @return null|Ip + */ + public function setIp(?Ip $ip) + { + $this->ip = $ip; + } +} diff --git a/src/Event/PushEvent.php b/src/Event/PushEvent.php new file mode 100644 index 00000000..6f0f1b69 --- /dev/null +++ b/src/Event/PushEvent.php @@ -0,0 +1,35 @@ + $ip + */ + private Ip $ip; + + /** + * @param Ip $ip + */ + public function __construct(Ip $ip) + { + $this->ip = $ip; + } + + /** + * @return Ip + */ + public function getIp(): Ip + { + return $this->ip; + } +} diff --git a/src/Flow/Flow.php b/src/Flow/Flow.php index 8a21f963..c9bbb905 100644 --- a/src/Flow/Flow.php +++ b/src/Flow/Flow.php @@ -7,14 +7,20 @@ use Closure; use Flow\Driver\FiberDriver; use Flow\DriverInterface; +use Flow\Event\PopEvent; +use Flow\Event\PullEvent; +use Flow\Event\PushEvent; use Flow\Exception\LogicException; use Flow\ExceptionInterface; use Flow\FlowInterface; use Flow\Ip; use Flow\IpStrategy\LinearIpStrategy; +use Flow\IpStrategyEvent; use Flow\IpStrategyInterface; use Generator; use SplObjectStorage; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\EventDispatcher\EventDispatcher; use function array_key_exists; use function count; @@ -48,6 +54,8 @@ class Flow implements FlowInterface */ private DriverInterface $driver; + private EventDispatcherInterface $dispatcher; + /** * @var SplObjectStorage, null|Closure(Ip): void> */ @@ -68,19 +76,22 @@ public function __construct( Closure|array $jobs, Closure|array $errorJobs = null, IpStrategyInterface $ipStrategy = null, - DriverInterface $driver = null + DriverInterface $driver = null, + EventDispatcherInterface $dispatcher = null, ) { $this->jobs = is_array($jobs) ? $jobs : [$jobs]; $this->errorJobs = $errorJobs ? (is_array($errorJobs) ? $errorJobs : [$errorJobs]) : []; $this->ipStrategy = $ipStrategy ?? new LinearIpStrategy(); $this->driver = $driver ?? new FiberDriver(); + $this->dispatcher = $dispatcher ?? new EventDispatcher(); + $this->dispatcher->addSubscriber($this->ipStrategy); $this->callbacks = new SplObjectStorage(); } public function __invoke(Ip $ip, Closure $callback = null): void { $this->callbacks->offsetSet($ip, $callback); - $this->ipStrategy->push($ip); + $this->dispatcher->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH); $this->nextIpJob(); } @@ -128,7 +139,7 @@ public function fn(array|Closure|FlowInterface $flow): FlowInterface private function nextIpJob(): void { - $ip = $this->ipStrategy->pop(); + $ip = $this->dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp(); if (!$ip) { return; } @@ -142,7 +153,7 @@ private function nextIpJob(): void $count--; if ($count === 0 || $value instanceof ExceptionInterface) { $count = 0; - $this->ipStrategy->done($ip); + $this->dispatcher->dispatch(new PopEvent($ip), IpStrategyEvent::POP); $this->nextIpJob(); if ($value instanceof ExceptionInterface) { diff --git a/src/IpStrategy/LinearIpStrategy.php b/src/IpStrategy/LinearIpStrategy.php index d923a07e..ef033816 100644 --- a/src/IpStrategy/LinearIpStrategy.php +++ b/src/IpStrategy/LinearIpStrategy.php @@ -4,13 +4,17 @@ namespace Flow\IpStrategy; +use Flow\Event\PopEvent; +use Flow\Event\PullEvent; +use Flow\Event\PushEvent; use Flow\Ip; +use Flow\IpStrategyEvent; use Flow\IpStrategyInterface; /** * @template T * - * @implements IpStrategyInterface + * @implements IpStrategyInterface */ class LinearIpStrategy implements IpStrategyInterface { @@ -19,26 +23,27 @@ class LinearIpStrategy implements IpStrategyInterface */ private array $ips = []; - /** - * @param Ip $ip - */ - public function push(Ip $ip): void + public static function getSubscribedEvents(): array { - $this->ips[] = $ip; + return [ + IpStrategyEvent::PUSH => 'push', + IpStrategyEvent::PULL => 'pull', + ]; } /** - * @return null|Ip + * @param PushEvent $event */ - public function pop(): ?Ip + public function push(PushEvent $event): void { - return array_shift($this->ips); + $this->ips[] = $event->getIp(); } /** - * @param Ip $ip + * @param PullEvent $event */ - public function done(Ip $ip): void + public function pull(PullEvent $event): void { + $event->setIp(array_shift($this->ips)); } } diff --git a/src/IpStrategy/MaxIpStrategy.php b/src/IpStrategy/MaxIpStrategy.php index 1c065292..d955ffd6 100644 --- a/src/IpStrategy/MaxIpStrategy.php +++ b/src/IpStrategy/MaxIpStrategy.php @@ -4,13 +4,17 @@ namespace Flow\IpStrategy; +use Flow\Event\PopEvent; +use Flow\Event\PullEvent; +use Flow\Event\PushEvent; use Flow\Ip; +use Flow\IpStrategyEvent; use Flow\IpStrategyInterface; /** * @template T * - * @implements IpStrategyInterface + * @implements IpStrategyInterface */ class MaxIpStrategy implements IpStrategyInterface { @@ -29,21 +33,30 @@ public function __construct(private int $max = 1, IpStrategyInterface $ipStrateg $this->ipStrategy = $ipStrategy ?? new LinearIpStrategy(); } + public static function getSubscribedEvents() + { + return [ + IpStrategyEvent::PUSH => 'push', + IpStrategyEvent::PULL => 'pull', + IpStrategyEvent::POP => 'pop', + ]; + } + /** - * @param Ip $ip + * @param PushEvent $even */ - public function push(Ip $ip): void + public function push(PushEvent $event): void { - $this->ipStrategy->push($ip); + $this->ipStrategy->push($event->getIp()); } /** - * @return null|Ip + * @return PullEvent */ - public function pop(): ?Ip + public function pull(PullEvent $event): ?Ip { if ($this->processing < $this->max) { - $ip = $this->ipStrategy->pop(); + $ip = $this->ipStrategy->pull(); if ($ip) { $this->processing++; } @@ -55,11 +68,11 @@ public function pop(): ?Ip } /** - * @param Ip $ip + * @param PopEvent $event */ - public function done(Ip $ip): void + public function done(PopEvent $event): void { - $this->ipStrategy->done($ip); + $this->ipStrategy->done($event->getIp()); $this->processing--; } } diff --git a/src/IpStrategy/StackIpStrategy.php b/src/IpStrategy/StackIpStrategy.php index ce68100e..cb86d4ac 100644 --- a/src/IpStrategy/StackIpStrategy.php +++ b/src/IpStrategy/StackIpStrategy.php @@ -4,13 +4,15 @@ namespace Flow\IpStrategy; +use Flow\Event\PullEvent; use Flow\Ip; +use Flow\IpStrategyEvent; use Flow\IpStrategyInterface; /** * @template T * - * @implements IpStrategyInterface + * @implements IpStrategyInterface */ class StackIpStrategy implements IpStrategyInterface { @@ -19,26 +21,27 @@ class StackIpStrategy implements IpStrategyInterface */ private array $ips = []; - /** - * @param Ip $ip - */ - public function push(Ip $ip): void + public static function getSubscribedEvents() { - $this->ips[] = $ip; + return [ + IpStrategyEvent::PUSH => 'push', + IpStrategyEvent::PULL => 'pull', + ]; } /** - * @return null|Ip $ip + * @param PushEvent $even */ - public function pop(): ?Ip + public function push(Ip $ip): void { - return array_pop($this->ips); + $this->ips[] = $ip; } /** - * @param Ip $ip + * @return PullEvent $event */ - public function done(Ip $ip): void + public function pull(PullEvent $event): void { + $event->setIp(array_pop($this->ips)); } } diff --git a/src/IpStrategyEvent.php b/src/IpStrategyEvent.php new file mode 100644 index 00000000..a59ad596 --- /dev/null +++ b/src/IpStrategyEvent.php @@ -0,0 +1,35 @@ + $ip - */ - public function push(Ip $ip): void; - - /** - * @return null|Ip - */ - public function pop(): ?Ip; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; - /** - * @param Ip $ip - */ - public function done(Ip $ip): void; +interface IpStrategyInterface extends EventSubscriberInterface +{ }