Skip to content

Commit

Permalink
✨ Add event system for processing IpStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 26, 2023
1 parent 5363bae commit 8e57c14
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- array : constructor arguments for Flow instanciation
- array (view as shape) : configuration for Flow instanciation
- FlowInterface : the FlowInterface instance itself
- array : map of all possible above choices
- Add event system for processing IpStrategy

## v1.1.4

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"php"
],
"require": {
"php": ">=8.2"
"php": ">=8.2",
"symfony/event-dispatcher": "^6.3"
},
"require-dev": {
"amphp/amp": "^3.0",
Expand Down
35 changes: 35 additions & 0 deletions src/Event/PopEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Flow\Event;

use Flow\Ip;
use Symfony\Contracts\EventDispatcher\Event;

/**
* @template T
*/
final class PopEvent extends Event
{
/**
* @param Ip<T> $ip
*/
private Ip $ip;

Check failure on line 18 in src/Event/PopEvent.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

Property Flow\Event\PopEvent::$ip with generic class Flow\Ip does not specify its types: T

/**
* @param Ip<T> $ip
*/
public function __construct(Ip $ip)
{
$this->ip = $ip;
}

/**
* @return Ip<T>
*/
public function getIp(): Ip
{
return $this->ip;
}
}
35 changes: 35 additions & 0 deletions src/Event/PullEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Flow\Event;

use Flow\Ip;
use Symfony\Contracts\EventDispatcher\Event;

/**
* @template T
*/
final class PullEvent extends Event
{
/**
* @param null|Ip<T> $ip
*/
private ?Ip $ip = null;

Check failure on line 18 in src/Event/PullEvent.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

Property Flow\Event\PullEvent::$ip with generic class Flow\Ip does not specify its types: T

/**
* @return null|Ip<T>
*/
public function getIp(): ?Ip
{
return $this->ip;
}

/**
* @return null|Ip<T>
*/
public function setIp(?Ip $ip)

Check failure on line 31 in src/Event/PullEvent.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

Method Flow\Event\PullEvent::setIp() has parameter $ip with generic class Flow\Ip but does not specify its types: T
{
$this->ip = $ip;

Check failure on line 33 in src/Event/PullEvent.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

Method Flow\Event\PullEvent::setIp() should return Flow\Ip<T>|null but return statement is missing.
}
}
35 changes: 35 additions & 0 deletions src/Event/PushEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Flow\Event;

use Flow\Ip;
use Symfony\Contracts\EventDispatcher\Event;

/**
* @template T
*/
final class PushEvent extends Event
{
/**
* @param Ip<T> $ip
*/
private Ip $ip;

Check failure on line 18 in src/Event/PushEvent.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

Property Flow\Event\PushEvent::$ip with generic class Flow\Ip does not specify its types: T

/**
* @param Ip<T> $ip
*/
public function __construct(Ip $ip)
{
$this->ip = $ip;
}

/**
* @return Ip<T>
*/
public function getIp(): Ip
{
return $this->ip;
}
}
19 changes: 15 additions & 4 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
use Closure;
use Flow\Driver\FiberDriver;
use Flow\DriverInterface;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\LogicException;
use Flow\ExceptionInterface;
use Flow\FlowInterface;
use Flow\Ip;
use Flow\IpStrategy\LinearIpStrategy;
use Flow\IpStrategyEvent;
use Flow\IpStrategyInterface;
use Generator;
use SplObjectStorage;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;

use function array_key_exists;
use function count;
Expand Down Expand Up @@ -48,6 +54,8 @@ class Flow implements FlowInterface
*/
private DriverInterface $driver;

private EventDispatcherInterface $dispatcher;

/**
* @var SplObjectStorage<Ip<T1>, null|Closure(Ip<T1>): void>
*/
Expand All @@ -68,19 +76,22 @@ public function __construct(
Closure|array $jobs,
Closure|array $errorJobs = null,
IpStrategyInterface $ipStrategy = null,
DriverInterface $driver = null
DriverInterface $driver = null,
EventDispatcherInterface $dispatcher = null,
) {
$this->jobs = is_array($jobs) ? $jobs : [$jobs];
$this->errorJobs = $errorJobs ? (is_array($errorJobs) ? $errorJobs : [$errorJobs]) : [];
$this->ipStrategy = $ipStrategy ?? new LinearIpStrategy();
$this->driver = $driver ?? new FiberDriver();
$this->dispatcher = $dispatcher ?? new EventDispatcher();
$this->dispatcher->addSubscriber($this->ipStrategy);
$this->callbacks = new SplObjectStorage();
}

public function __invoke(Ip $ip, Closure $callback = null): void
{
$this->callbacks->offsetSet($ip, $callback);
$this->ipStrategy->push($ip);
$this->dispatcher->dispatch(new PushEvent($ip), IpStrategyEvent::PUSH);
$this->nextIpJob();
}

Expand Down Expand Up @@ -128,7 +139,7 @@ public function fn(array|Closure|FlowInterface $flow): FlowInterface

private function nextIpJob(): void
{
$ip = $this->ipStrategy->pop();
$ip = $this->dispatcher->dispatch(new PullEvent(), IpStrategyEvent::PULL)->getIp();
if (!$ip) {
return;
}
Expand All @@ -142,7 +153,7 @@ private function nextIpJob(): void
$count--;
if ($count === 0 || $value instanceof ExceptionInterface) {
$count = 0;
$this->ipStrategy->done($ip);
$this->dispatcher->dispatch(new PopEvent($ip), IpStrategyEvent::POP);
$this->nextIpJob();

if ($value instanceof ExceptionInterface) {
Expand Down
27 changes: 16 additions & 11 deletions src/IpStrategy/LinearIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

namespace Flow\IpStrategy;

use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Flow\IpStrategyInterface;

/**
* @template T
*
* @implements IpStrategyInterface<T>
* @implements IpStrategyInterface
*/
class LinearIpStrategy implements IpStrategyInterface

Check failure on line 19 in src/IpStrategy/LinearIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

PHPDoc tag @implements has invalid value (IpStrategyInterface): Unexpected token "\n ", expected '<' at offset 56
{
Expand All @@ -19,26 +23,27 @@ class LinearIpStrategy implements IpStrategyInterface
*/
private array $ips = [];

/**
* @param Ip<T> $ip
*/
public function push(Ip $ip): void
public static function getSubscribedEvents(): array
{
$this->ips[] = $ip;
return [
IpStrategyEvent::PUSH => 'push',
IpStrategyEvent::PULL => 'pull',
];
}

/**
* @return null|Ip<T>
* @param PushEvent<T> $event
*/
public function pop(): ?Ip
public function push(PushEvent $event): void
{
return array_shift($this->ips);
$this->ips[] = $event->getIp();
}

/**
* @param Ip<T> $ip
* @param PullEvent<T> $event
*/
public function done(Ip $ip): void
public function pull(PullEvent $event): void
{
$event->setIp(array_shift($this->ips));
}
}
33 changes: 23 additions & 10 deletions src/IpStrategy/MaxIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

namespace Flow\IpStrategy;

use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Flow\IpStrategyInterface;

/**
* @template T
*
* @implements IpStrategyInterface<T>
* @implements IpStrategyInterface
*/
class MaxIpStrategy implements IpStrategyInterface

Check failure on line 19 in src/IpStrategy/MaxIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.2)

PHPDoc tag @implements has invalid value (IpStrategyInterface): Unexpected token "\n ", expected '<' at offset 56
{
Expand All @@ -29,21 +33,30 @@ public function __construct(private int $max = 1, IpStrategyInterface $ipStrateg
$this->ipStrategy = $ipStrategy ?? new LinearIpStrategy();
}

public static function getSubscribedEvents()
{
return [
IpStrategyEvent::PUSH => 'push',
IpStrategyEvent::PULL => 'pull',
IpStrategyEvent::POP => 'pop',
];
}

/**
* @param Ip<T> $ip
* @param PushEvent<T> $even
*/
public function push(Ip $ip): void
public function push(PushEvent $event): void
{
$this->ipStrategy->push($ip);
$this->ipStrategy->push($event->getIp());
}

/**
* @return null|Ip<T>
* @return PullEvent<T>
*/
public function pop(): ?Ip
public function pull(PullEvent $event): ?Ip
{
if ($this->processing < $this->max) {
$ip = $this->ipStrategy->pop();
$ip = $this->ipStrategy->pull();
if ($ip) {
$this->processing++;
}
Expand All @@ -55,11 +68,11 @@ public function pop(): ?Ip
}

/**
* @param Ip<T> $ip
* @param PopEvent<T> $event
*/
public function done(Ip $ip): void
public function done(PopEvent $event): void
{
$this->ipStrategy->done($ip);
$this->ipStrategy->done($event->getIp());
$this->processing--;
}
}
25 changes: 14 additions & 11 deletions src/IpStrategy/StackIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

namespace Flow\IpStrategy;

use Flow\Event\PullEvent;
use Flow\Ip;
use Flow\IpStrategyEvent;
use Flow\IpStrategyInterface;

/**
* @template T
*
* @implements IpStrategyInterface<T>
* @implements IpStrategyInterface
*/
class StackIpStrategy implements IpStrategyInterface
{
Expand All @@ -19,26 +21,27 @@ class StackIpStrategy implements IpStrategyInterface
*/
private array $ips = [];

/**
* @param Ip<T> $ip
*/
public function push(Ip $ip): void
public static function getSubscribedEvents()
{
$this->ips[] = $ip;
return [
IpStrategyEvent::PUSH => 'push',
IpStrategyEvent::PULL => 'pull',
];
}

/**
* @return null|Ip<T> $ip
* @param PushEvent<T> $even
*/
public function pop(): ?Ip
public function push(Ip $ip): void
{
return array_pop($this->ips);
$this->ips[] = $ip;
}

/**
* @param Ip<T> $ip
* @return PullEvent<T> $event
*/
public function done(Ip $ip): void
public function pull(PullEvent $event): void
{
$event->setIp(array_pop($this->ips));
}
}
Loading

0 comments on commit 8e57c14

Please sign in to comment.