Skip to content

Commit

Permalink
✨ Add count event
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 authored and Mathieu Ledru committed Sep 3, 2024
1 parent 3cd42cf commit f180d6d
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.2.x

- Add event Event::POOL occurs when Flow need to count IPs to process.

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
26 changes: 26 additions & 0 deletions src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Flow\Ip;

/**
* @template T
Expand All @@ -15,19 +17,43 @@
*/
final class AsyncHandler implements AsyncHandlerInterface
{
/**
* @var array<Ip<T>>
*/
private array $ips = [];

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

public function async(AsyncEvent $event): void
{
$wrapPoolIp = function (Ip $ip) {
$this->ips[] = $ip;

return function () use ($ip) {
$this->ips = array_filter($this->ips, function ($iteratorIp) use ($ip) {
return $iteratorIp !== $ip;
});
};
};

$ip = $event->getIp();
$async = $event->getAsync();
$asyncJob = $async($event->getJob());
$next = $asyncJob($ip->data);
$next($event->getCallback());
}

/**
* @param PoolEvent<T> $event
*/
public function pool(PoolEvent $event): void
{
$event->setIps($this->ips);
}
}
10 changes: 10 additions & 0 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;
use Flow\Event\PoolEvent;

/**
* @template T1
Expand Down Expand Up @@ -41,6 +42,7 @@ public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand Down Expand Up @@ -80,4 +82,12 @@ private function getBatchSize(): int
{
return $this->batchSize;
}

/**
* @param PoolEvent<T> $event
*/
public function pool(PoolEvent $event): void

Check failure on line 89 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Parameter $event of method Flow\AsyncHandler\BatchAsyncHandler::pool() has invalid type Flow\AsyncHandler\T.
{
$event->setIps($this->ips);

Check failure on line 91 in src/AsyncHandler/BatchAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Access to an undefined property Flow\AsyncHandler\BatchAsyncHandler<T1, T2>::$ips.
}
}
10 changes: 10 additions & 0 deletions src/AsyncHandler/DeferAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;

/**
* @template T
Expand All @@ -19,6 +20,7 @@ public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand All @@ -33,4 +35,12 @@ public function async(AsyncEvent $event): void
$callback($data);
});
}

/**
* @param PoolEvent<T> $event
*/
public function pool(PoolEvent $event): void
{
$event->setIps($this->ips);

Check failure on line 44 in src/AsyncHandler/DeferAsyncHandler.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Access to an undefined property Flow\AsyncHandler\DeferAsyncHandler<T>::$ips.
}
}
6 changes: 3 additions & 3 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class AmpDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct(?Driver $driver = null)
Expand Down Expand Up @@ -121,18 +123,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
EventLoop::defer($loop);
} else {
EventLoop::getDriver()->stop();
Expand Down
27 changes: 27 additions & 0 deletions src/Driver/DriverTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\Driver;

use Flow\Event;
use Flow\Event\PoolEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

use function count;

trait DriverTrait
{
/**
* @param array<EventDispatcherInterface> $dispatchers
*/
public function countIps(array $dispatchers): int
{
$count = 0;
foreach ($dispatchers as $dispatcher) {
$count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps());
}

return $count;
}
}
8 changes: 4 additions & 4 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class FiberDriver implements DriverInterface
{
use DriverTrait;

/**
* @var array<mixed>
*/
Expand Down Expand Up @@ -121,7 +123,7 @@ public function await(array &$stream): void

$tick = 0;
$fiberDatas = [];
while ($stream['ips'] > 0 or count($this->ticks) > 0) {
do {
foreach ($this->ticks as [
'interval' => $interval,
'callback' => $callback,
Expand All @@ -146,12 +148,10 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
Expand All @@ -168,7 +168,7 @@ public function await(array &$stream): void
}

$tick++;
}
} while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0);
}

public function delay(float $seconds): void
Expand Down
6 changes: 3 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class ReactDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private LoopInterface $eventLoop;
Expand Down Expand Up @@ -114,18 +116,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
$this->eventLoop->futureTick($loop);
} else {
$this->eventLoop->stop();
Expand Down
8 changes: 4 additions & 4 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class SpatieDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private Pool $pool;
Expand Down Expand Up @@ -89,7 +91,7 @@ public function await(array &$stream): void
};

$nextIp = null;
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
Expand All @@ -99,17 +101,15 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
}
} while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0);
}

public function delay(float $seconds): void
Expand Down
8 changes: 4 additions & 4 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class SwooleDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct()
Expand Down Expand Up @@ -85,7 +87,7 @@ public function await(array &$stream): void
};

co::run(function () use (&$stream, $async, $defer) {
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
Expand All @@ -96,18 +98,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
co::sleep(1);
} while ($nextIp !== null);
}
} while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0);
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function async(Closure|JobInterface $callback): Closure;
public function defer(Closure $callback): mixed;

/**
* @param array{'ips': int, 'fnFlows': array<mixed>, 'dispatchers': array<mixed>} $stream
* @param array{'fnFlows': array<mixed>, 'dispatchers': array<mixed>} $stream
*/
public function await(array &$stream): void;

Expand Down
9 changes: 9 additions & 0 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ final class Event
* @Event("Flow\Event\PopEvent")
*/
public const POP = 'pop';

/**
* The POOL event occurs when Flow need to get the pool IPs to process.
*
* This event allows you to get the pool IPs to process.
*
* @Event("Flow\Event\PoolEvent")
*/
public const POOL = 'pool';
}
35 changes: 35 additions & 0 deletions src/Event/PoolEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Flow\Event;

use Flow\Ip;
use Symfony\Contracts\EventDispatcher\Event;

/**
* @template T
*/
final class PoolEvent extends Event
{
/**
* @var array<Ip<T>>
*/
private array $ips;

/**
* @return array<Ip<T>>
*/
public function getIps(): array
{
return $this->ips;
}

/**
* @param array<Ip<T>> $ips
*/
public function setIps(array $ips): void
{
$this->ips = $ips;
}
}
2 changes: 0 additions & 2 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Flow implements FlowInterface
* @var array<mixed>
*/
private array $stream = [
'ips' => 0,
'fnFlows' => [],
'dispatchers' => [],
];
Expand Down Expand Up @@ -89,7 +88,6 @@ public function __construct(

public function __invoke(Ip $ip): void
{
$this->stream['ips']++;
$this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH);
}

Expand Down
Loading

0 comments on commit f180d6d

Please sign in to comment.