Skip to content

Commit

Permalink
✨ Add count event
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 3, 2024
1 parent 3cd42cf commit b8f656a
Show file tree
Hide file tree
Showing 21 changed files with 239 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.2.x

- Add event Event::POOL occurs when Flow need to count IPs to process.

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
2 changes: 1 addition & 1 deletion examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
$job1 = static function (DataA $dataA) use ($driver): DataB {
printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b);

// simulating calculating some "light" operation from 0.1 to 1 seconds
// simulating calculating some "light" operation from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$d = $dataA->a + $dataA->b;
Expand Down
26 changes: 25 additions & 1 deletion src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Flow\IpPool;

/**
* @template T
Expand All @@ -15,10 +17,21 @@
*/
final class AsyncHandler implements AsyncHandlerInterface
{
/**
* @var IpPool<T>
*/
private IpPool $ipPool;

public function __construct()
{
$this->ipPool = new IpPool();
}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand All @@ -27,7 +40,18 @@ public function async(AsyncEvent $event): void
$ip = $event->getIp();
$async = $event->getAsync();
$asyncJob = $async($event->getJob());

$popIp = $this->ipPool->addIp($ip);

$next = $asyncJob($ip->data);
$next($event->getCallback());
$next(static function ($data) use ($event, $popIp) {
$event->getCallback()($data);
$popIp();
});
}

public function pool(PoolEvent $event): void
{
$event->addIps($this->ipPool->getIps());
}
}
18 changes: 12 additions & 6 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

/**
* @template T1
* @template T2
* @template T
*
* @implements AsyncHandlerInterface<T1>
* @implements AsyncHandlerInterface<T>
*/
final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

/**
* @var AsyncHandlerInterface<T2>
* @var AsyncHandlerInterface<T>
*/
private AsyncHandlerInterface $asyncHandler;

/**
* @param null|AsyncHandlerInterface<T2> $asyncHandler
* @param null|AsyncHandlerInterface<T> $asyncHandler
*/
public function __construct(
private int $batchSize = 10,
Expand All @@ -41,6 +41,7 @@ public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand All @@ -53,12 +54,17 @@ public function async(AsyncEvent $event): void
$this->handle($event, $ack);
}

public function pool(PoolEvent $event): void
{
$this->asyncHandler->pool($event);
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @param array{0: AsyncEvent<T1>, 1: Acknowledger}[] $jobs
* @param array{0: AsyncEvent<T>, 1: Acknowledger}[] $jobs
*
* @phpstan-ignore method.unused
*/
Expand Down
23 changes: 22 additions & 1 deletion src/AsyncHandler/DeferAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Flow\IpPool;

/**
* @template T
Expand All @@ -15,22 +17,41 @@
*/
final class DeferAsyncHandler implements AsyncHandlerInterface
{
/**
* @var IpPool<T>
*/
private IpPool $ipPool;

public function __construct()
{
$this->ipPool = new IpPool();
}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

public function async(AsyncEvent $event): void
{
$ip = $event->getIp();
$job = $event->getJob();

$popIp = $this->ipPool->addIp($ip);
$next = $job([$ip->data, $event->getDefer()]);
$next(static function ($result) use ($event) {
$next(static function ($result) use ($event, $popIp) {
[$data] = $result;
$callback = $event->getCallback();
$callback($data);
$popIp();
});
}

public function pool(PoolEvent $event): void
{
$event->addIps($this->ipPool->getIps());
}
}
6 changes: 6 additions & 0 deletions src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow;

use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
Expand All @@ -16,4 +17,9 @@ interface AsyncHandlerInterface extends EventSubscriberInterface
* @param AsyncEvent<T> $event
*/
public function async(AsyncEvent $event): void;

/**
* @param PoolEvent<T> $event
*/
public function pool(PoolEvent $event): void;
}
6 changes: 3 additions & 3 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class AmpDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct(?Driver $driver = null)
Expand Down Expand Up @@ -121,18 +123,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
EventLoop::defer($loop);
} else {
EventLoop::getDriver()->stop();
Expand Down
27 changes: 27 additions & 0 deletions src/Driver/DriverTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\Driver;

use Flow\Event;
use Flow\Event\PoolEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

use function count;

trait DriverTrait
{
/**
* @param array<EventDispatcherInterface> $dispatchers
*/
public function countIps(array $dispatchers): int
{
$count = 0;
foreach ($dispatchers as $dispatcher) {
$count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps());
}

return $count;
}
}
9 changes: 5 additions & 4 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class FiberDriver implements DriverInterface
{
use DriverTrait;

/**
* @var array<mixed>
*/
Expand Down Expand Up @@ -121,7 +123,7 @@ public function await(array &$stream): void

$tick = 0;
$fiberDatas = [];
while ($stream['ips'] > 0 or count($this->ticks) > 0) {
do {
foreach ($this->ticks as [
'interval' => $interval,
'callback' => $callback,
Expand All @@ -136,6 +138,7 @@ public function await(array &$stream): void
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();

if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
Expand All @@ -146,12 +149,10 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
Expand All @@ -168,7 +169,7 @@ public function await(array &$stream): void
}

$tick++;
}
} while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0);
}

public function delay(float $seconds): void
Expand Down
6 changes: 3 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class ReactDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private LoopInterface $eventLoop;
Expand Down Expand Up @@ -114,18 +116,16 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
$this->eventLoop->futureTick($loop);
} else {
$this->eventLoop->stop();
Expand Down
8 changes: 4 additions & 4 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class SpatieDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

private Pool $pool;
Expand Down Expand Up @@ -89,7 +91,7 @@ public function await(array &$stream): void
};

$nextIp = null;
while ($stream['ips'] > 0 or $this->ticks > 0) {
do {
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
Expand All @@ -99,17 +101,15 @@ public function await(array &$stream): void
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
}
} while ($nextIp !== null);
}
} while ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0);
}

public function delay(float $seconds): void
Expand Down
Loading

0 comments on commit b8f656a

Please sign in to comment.