Skip to content

Commit

Permalink
✨ Add count event
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 3, 2024
1 parent 3cd42cf commit 54f5eea
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.2.x

- Add event Event::COUNT occurs when Flow need to count IPs to process.

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
6 changes: 3 additions & 3 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class AmpDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct(?Driver $driver = null)
Expand Down Expand Up @@ -121,18 +123,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
EventLoop::defer($loop);
} else {
EventLoop::getDriver()->stop();
Expand Down
25 changes: 25 additions & 0 deletions src/Driver/DriverTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Flow\Driver;

use Flow\Event;
use Flow\Event\CountEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

trait DriverTrait
{
/**
* @param array<EventDispatcherInterface> $dispatchers
*/
public function countIps(array $dispatchers): int
{
$count = 0;
foreach ($dispatchers as $dispatcher) {
$count += $dispatcher->dispatch(new CountEvent(), Event::COUNT)->getCount();
}

return $count;
}
}
8 changes: 4 additions & 4 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class FiberDriver implements DriverInterface
{
use DriverTrait;

/**
* @var array<mixed>
*/
Expand Down Expand Up @@ -121,7 +123,7 @@ public function await(array &$stream): void

$tick = 0;
$fiberDatas = [];
while ($stream['ips'] > 0 or count($this->ticks) > 0) {
do {
foreach ($this->ticks as [
'interval' => $interval,
'callback' => $callback,
Expand All @@ -146,12 +148,10 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
Expand All @@ -168,7 +168,7 @@ public function await(array &$stream): void
}

$tick++;
}
} while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0);
}

public function delay(float $seconds): void
Expand Down
6 changes: 3 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class ReactDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private LoopInterface $eventLoop;
Expand Down Expand Up @@ -114,18 +116,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
$this->eventLoop->futureTick($loop);
} else {
$this->eventLoop->stop();
Expand Down
8 changes: 4 additions & 4 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class SpatieDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private Pool $pool;
Expand Down Expand Up @@ -89,7 +91,7 @@ public function await(array &$stream): void
};

$nextIp = null;
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
Expand All @@ -99,17 +101,15 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
}
} while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0);
}

public function delay(float $seconds): void
Expand Down
8 changes: 4 additions & 4 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class SwooleDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct()
Expand Down Expand Up @@ -85,7 +87,7 @@ public function await(array &$stream): void
};

co::run(function () use (&$stream, $async, $defer) {
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
Expand All @@ -96,18 +98,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
co::sleep(1);
} while ($nextIp !== null);
}
} while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0);
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure;
public function defer(Closure $callback): mixed;

/**
* @param array{'ips': int, 'fnFlows': array<mixed>, 'dispatchers': array<mixed>} $stream
* @param array{'fnFlows': array<mixed>, 'dispatchers': array<mixed>} $stream
*/
public function await(array &$stream): void;

Expand Down
9 changes: 9 additions & 0 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ final class Event
*/
public const PUSH = 'push';

/**
* The COUNT event occurs when Flow need to count IPs to process.
*
* This event loop the event loop.
*
* @Event("Flow\Event\CountEvent")
*/
public const COUNT = 'count';

/**
* The ASYNC event occurs when Flow dispatch async process execution.
*
Expand Down
22 changes: 22 additions & 0 deletions src/Event/CountEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Flow\Event;

use Symfony\Contracts\EventDispatcher\Event;

final class CountEvent extends Event
{
private int $count = 0;

public function getCount(): int
{
return $this->count;
}

public function setCount(int $count): void
{
$this->count = $count;
}
}
2 changes: 0 additions & 2 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Flow implements FlowInterface
* @var array<mixed>
*/
private array $stream = [
'ips' => 0,
'fnFlows' => [],
'dispatchers' => [],
];
Expand Down Expand Up @@ -89,7 +88,6 @@ public function __construct(

public function __invoke(Ip $ip): void
{
$this->stream['ips']++;
$this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH);
}

Expand Down
9 changes: 9 additions & 0 deletions src/IpStrategy/LinearIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
namespace Flow\IpStrategy;

use Flow\Event;
use Flow\Event\CountEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Ip;
use Flow\IpStrategyInterface;

use function count;

/**
* @template T
*
Expand All @@ -27,6 +30,7 @@ public static function getSubscribedEvents(): array
return [
Event::PUSH => 'push',
Event::PULL => 'pull',
Event::COUNT => 'count',
];
}

Expand All @@ -45,4 +49,9 @@ public function pull(PullEvent $event): void
{
$event->setIp(array_shift($this->ips));
}

public function count(CountEvent $event): void
{
$event->setCount(count($this->ips));
}
}
10 changes: 8 additions & 2 deletions src/IpStrategy/MaxIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\IpStrategy;

use Flow\Event;
use Flow\Event\CountEvent;
use Flow\Event\PopEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
Expand Down Expand Up @@ -47,6 +48,7 @@ public static function getSubscribedEvents()
Event::PUSH => 'push',
Event::PULL => 'pull',
Event::POP => 'pop',
Event::COUNT => 'count',
];
}

Expand All @@ -67,9 +69,8 @@ public function pull(PullEvent $event): void
$ip = $this->dispatcher->dispatch($event, Event::PULL)->getIp();
if ($ip) {
$this->processing++;
$event->setIp($ip);
}

$event->setIp($ip);
}
}

Expand All @@ -81,4 +82,9 @@ public function pop(PopEvent $event): void
$this->dispatcher->dispatch($event, Event::POP);
$this->processing--;
}

public function count(CountEvent $event): void
{
$event->setCount($this->processing);
}
}
9 changes: 9 additions & 0 deletions src/IpStrategy/StackIpStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
namespace Flow\IpStrategy;

use Flow\Event;
use Flow\Event\CountEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Ip;
use Flow\IpStrategyInterface;

use function count;

/**
* @template T
*
Expand All @@ -27,6 +30,7 @@ public static function getSubscribedEvents()
return [
Event::PUSH => 'push',
Event::PULL => 'pull',
Event::COUNT => 'count',
];
}

Expand All @@ -45,4 +49,9 @@ public function pull(PullEvent $event): void
{
$event->setIp(array_pop($this->ips));
}

public function count(CountEvent $event): void
{
$event->setCount(count($this->ips));
}
}

0 comments on commit 54f5eea

Please sign in to comment.