Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add count event #56

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.2.x

- Add event Event::POOL occurs when Flow needs to count IPs to process.
- Add `Flow\IpPool` for managing pools of Ips.
- Update `Flow\Event\PullEvent` to pull multiple Ips instead one.

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
2 changes: 1 addition & 1 deletion examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
$job1 = static function (DataA $dataA) use ($driver): DataB {
printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b);

// simulating calculating some "light" operation from 0.1 to 1 seconds
// simulating calculating some "light" operation from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$d = $dataA->a + $dataA->b;
Expand Down
25 changes: 24 additions & 1 deletion src/AsyncHandler/AsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Flow\IpPool;

/**
* @template T
Expand All @@ -15,10 +17,21 @@
*/
final class AsyncHandler implements AsyncHandlerInterface
{
/**
* @var IpPool<T>
*/
private IpPool $ipPool;

public function __construct()
{
$this->ipPool = new IpPool();
}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand All @@ -27,7 +40,17 @@ public function async(AsyncEvent $event): void
$ip = $event->getIp();
$async = $event->getAsync();
$asyncJob = $async($event->getJob());

$popIp = $this->ipPool->addIp($ip);
$next = $asyncJob($ip->data);
$next($event->getCallback());
$next(static function ($data) use ($event, $popIp) {
$event->getCallback()($data);
$popIp();
});
}

public function pool(PoolEvent $event): void
{
$event->addIps($this->ipPool->getIps());
}
}
18 changes: 12 additions & 6 deletions src/AsyncHandler/BatchAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Throwable;

/**
* @template T1
* @template T2
* @template T
*
* @implements AsyncHandlerInterface<T1>
* @implements AsyncHandlerInterface<T>
*/
final class BatchAsyncHandler implements BatchHandlerInterface, AsyncHandlerInterface
{
use BatchHandlerTrait;

/**
* @var AsyncHandlerInterface<T2>
* @var AsyncHandlerInterface<T>
*/
private AsyncHandlerInterface $asyncHandler;

/**
* @param null|AsyncHandlerInterface<T2> $asyncHandler
* @param null|AsyncHandlerInterface<T> $asyncHandler
*/
public function __construct(
private int $batchSize = 10,
Expand All @@ -41,6 +41,7 @@ public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

Expand All @@ -53,12 +54,17 @@ public function async(AsyncEvent $event): void
$this->handle($event, $ack);
}

public function pool(PoolEvent $event): void
{
$this->asyncHandler->pool($event);
}

/**
* PHPStan should normaly pass for method.unused
* https://github.com/phpstan/phpstan/issues/6039
* https://phpstan.org/r/8f7de023-9888-4dcb-b12c-e2fcf9547b6c.
*
* @param array{0: AsyncEvent<T1>, 1: Acknowledger}[] $jobs
* @param array{0: AsyncEvent<T>, 1: Acknowledger}[] $jobs
*
* @phpstan-ignore method.unused
*/
Expand Down
23 changes: 22 additions & 1 deletion src/AsyncHandler/DeferAsyncHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Flow\AsyncHandlerInterface;
use Flow\Event;
use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Flow\IpPool;

/**
* @template T
Expand All @@ -15,22 +17,41 @@
*/
final class DeferAsyncHandler implements AsyncHandlerInterface
{
/**
* @var IpPool<T>
*/
private IpPool $ipPool;

public function __construct()
{
$this->ipPool = new IpPool();
}

public static function getSubscribedEvents()
{
return [
Event::ASYNC => 'async',
Event::POOL => 'pool',
];
}

public function async(AsyncEvent $event): void
{
$ip = $event->getIp();
$job = $event->getJob();

$popIp = $this->ipPool->addIp($ip);
$next = $job([$ip->data, $event->getDefer()]);
$next(static function ($result) use ($event) {
$next(static function ($result) use ($event, $popIp) {
[$data] = $result;
$callback = $event->getCallback();
$callback($data);
$popIp();
});
}

public function pool(PoolEvent $event): void
{
$event->addIps($this->ipPool->getIps());
}
}
6 changes: 6 additions & 0 deletions src/AsyncHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow;

use Flow\Event\AsyncEvent;
use Flow\Event\PoolEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
Expand All @@ -16,4 +17,9 @@ interface AsyncHandlerInterface extends EventSubscriberInterface
* @param AsyncEvent<T> $event
*/
public function async(AsyncEvent $event): void;

/**
* @param PoolEvent<T> $event
*/
public function pool(PoolEvent $event): void;
}
41 changes: 19 additions & 22 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
class AmpDriver implements DriverInterface
{
use DriverTrait;

private int $ticks = 0;

public function __construct(?Driver $driver = null)
Expand Down Expand Up @@ -109,30 +111,25 @@ public function await(array &$stream): void
};

$loop = function () use (&$loop, &$stream, $async, $defer) {
$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$job = $stream['fnFlows'][$index]['job'];

$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps();
foreach ($nextIps as $nextIp) {
$job = $stream['fnFlows'][$index]['job'];

$stream['dispatchers'][$index]->dispatch(new AsyncEvent($async, $defer, $job, $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) && $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
}), Event::ASYNC);
}
} while ($nextIp !== null);
}

if ($stream['ips'] > 0 or $this->ticks > 0) {
if ($this->countIps($stream['dispatchers']) > 0 or $this->ticks > 0) {
EventLoop::defer($loop);
} else {
EventLoop::getDriver()->stop();
Expand Down
27 changes: 27 additions & 0 deletions src/Driver/DriverTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\Driver;

use Flow\Event;
use Flow\Event\PoolEvent;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

use function count;

trait DriverTrait
{
/**
* @param array<EventDispatcherInterface> $dispatchers
*/
public function countIps(array $dispatchers): int
{
$count = 0;
foreach ($dispatchers as $dispatcher) {
$count += count($dispatcher->dispatch(new PoolEvent(), Event::POOL)->getIps());
}

return $count;
}
}
47 changes: 22 additions & 25 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
class FiberDriver implements DriverInterface
{
use DriverTrait;

/**
* @var array<mixed>
*/
Expand Down Expand Up @@ -121,7 +123,7 @@ public function await(array &$stream): void

$tick = 0;
$fiberDatas = [];
while ($stream['ips'] > 0 or count($this->ticks) > 0) {
do {
foreach ($this->ticks as [
'interval' => $interval,
'callback' => $callback,
Expand All @@ -132,30 +134,25 @@ public function await(array &$stream): void
}
}

$nextIp = null;
do {
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['ips']++;
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
$stream['ips']--;
}), Event::ASYNC);
}
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIps = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIps();
foreach ($nextIps as $nextIp) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
$stream['fnFlows'][$index]['errorJob']($data);
} elseif (array_key_exists($index + 1, $stream['fnFlows'])) {
$ip = new Ip($data);
$stream['dispatchers'][$index + 1]->dispatch(new PushEvent($ip), Event::PUSH);
}

$stream['dispatchers'][$index]->dispatch(new PopEvent($nextIp), Event::POP);
}), Event::ASYNC);
}
} while ($nextIp !== null);
}

foreach ($fiberDatas as $i => $fiberData) { // @phpstan-ignore-line see https://github.com/phpstan/phpstan/issues/11468
if (!$fiberData['fiber']->isTerminated() and $fiberData['fiber']->isSuspended()) {
Expand All @@ -168,7 +165,7 @@ public function await(array &$stream): void
}

$tick++;
}
} while ($this->countIps($stream['dispatchers']) > 0 or count($this->ticks) > 0);
}

public function delay(float $seconds): void
Expand Down
Loading
Loading