Skip to content

Commit

Permalink
♻️ Update Flow:do to FlowFactory:create
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Oct 27, 2024
1 parent 72efcb3 commit 8e1eff5
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 144 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Add event Event::POOL occurs when Flow needs to count IPs to process.
- Add `Flow\IpPool` for managing pools of Ips.
- Update `Flow\Event\PullEvent` to pull multiple Ips instead one.
- Move `Flow::do` to `FlowFactory::create`

## v1.2.2

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ composer require darkwood/flow
<?php

use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;

class D1 {
Expand All @@ -45,7 +46,7 @@ class D4 {
public function __construct(public int $n4) {}
}

$flow = Flow::do(static function() {
$flow = (new FlowFactory())->create(static function() {
yield fn (D1 $data1) => new D2($data1->n1 += 1);
yield fn (D2 $data2) => new D3($data2->n2 * 2);
yield function(D3 $data3) {
Expand Down
3 changes: 2 additions & 1 deletion examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Examples\Model\DataD;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\Job\ClosureJob;
Expand Down Expand Up @@ -92,7 +93,7 @@
$asyncTask = static function ($job1, $job2, $job3, $errorJob1, $errorJob2, $driver) {
echo "begin - flow asynchronous\n";

$flow = Flow::do(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
$flow = (new FlowFactory())->create(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
yield [$job1, $errorJob1, new MaxIpStrategy(2)];
yield [$job2, $errorJob2, new MaxIpStrategy(2)];
yield $job3;
Expand Down
3 changes: 2 additions & 1 deletion examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\Examples\Transport\DoctrineIpTransport;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Flow\TransportFlow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
Expand Down Expand Up @@ -74,7 +75,7 @@
printf("%s\n", $exception->getMessage());
};

$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
$flow = (new FlowFactory())->create(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
yield [$addOneJob, $errorJob, new MaxIpStrategy(1)];
yield [$multbyTwoJob, $errorJob, new MaxIpStrategy(3)];
yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)];
Expand Down
3 changes: 2 additions & 1 deletion examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Flow\Driver\SwooleDriver;
use Flow\Examples\Model\YFlowData;
use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Flow\YFlow;
use Flow\Ip;
use Flow\Job\YJob;
Expand Down Expand Up @@ -194,7 +195,7 @@
});
};

$flow = Flow::do(static function () use (
$flow = (new FlowFactory())->create(static function () use (
$factorialJob,
$factorialYJobBefore,
$factorialYJob,
Expand Down
92 changes: 2 additions & 90 deletions src/Flow/Flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@
use Flow\DriverInterface;
use Flow\Event;
use Flow\Event\PushEvent;
use Flow\Exception\LogicException;
use Flow\ExceptionInterface;
use Flow\FlowFactory;
use Flow\FlowInterface;
use Flow\Ip;
use Flow\IpStrategy\LinearIpStrategy;
use Flow\IpStrategyInterface;
use Flow\JobInterface;
use Generator;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

use function array_key_exists;
use function is_array;

/**
* @template T1
* @template T2
Expand Down Expand Up @@ -91,38 +87,9 @@ public function __invoke(Ip $ip): void
$this->stream['dispatchers'][0]->dispatch(new PushEvent($ip), Event::PUSH);
}

public static function do(callable $callable, ?array $config = null): FlowInterface
{
/**
* @var Closure|Generator $generator
*/
$generator = $callable();

if ($generator instanceof Generator) {
$flows = [];

while ($generator->valid()) {
$flow = self::flowUnwrap($generator->current(), $config);

$generator->send($flow);

$flows[] = $flow;
}

$return = $generator->getReturn();
if (!empty($return)) {
$flows[] = self::flowUnwrap($return, $config);
}

return self::flowMap($flows);
}

return self::flowUnwrap($generator, $config);
}

public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterface
{
$flow = self::flowUnwrap($flow, ['driver' => $this->driver]);
$flow = (new FlowFactory($this->driver))->createFlow($flow);

$this->stream['fnFlows'][] = [
'job' => $flow->job,
Expand All @@ -137,59 +104,4 @@ public function await(): void
{
$this->driver->await($this->stream);
}

/**
* @param array<mixed>|Closure|FlowInterface<mixed> $flow
* @param ?array<mixed> $config
*
* @return Flow<mixed, mixed>
*
* #param ?array{
* 0: Closure,
* 1?: Closure,
* 2?: IpStrategyInterface,
* 3?: EventDispatcherInterface,
* 4?: AsyncHandlerInterface,
* 5?: DriverInterface
* }|array{
* "ipStrategy"?: IpStrategyInterface,
* "dispatcher"?: EventDispatcherInterface,
* "asyncHandler"?: AsyncHandlerInterface,
* "driver"?: DriverInterface
* } $config
*/
private static function flowUnwrap($flow, ?array $config = null): FlowInterface
{
if ($flow instanceof Closure || $flow instanceof JobInterface) {
return new self(...[...['job' => $flow], ...($config ?? [])]);
}
if (is_array($flow)) {
if (array_key_exists(0, $flow) || array_key_exists('job', $flow)) {
return new self(...[...$flow, ...($config ?? [])]);
}

return self::flowMap($flow);
}

return $flow;
}

/**
* @param array<FlowInterface<mixed>> $flows
*
* @return FlowInterface<mixed>
*/
private static function flowMap(array $flows)
{
$flow = array_shift($flows);
if (null === $flow) {
throw new LogicException('Flow is empty');
}

foreach ($flows as $flowIt) {
$flow = $flow->fn($flowIt);
}

return $flow;
}
}
5 changes: 0 additions & 5 deletions src/Flow/FlowDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public function fn(array|Closure|FlowInterface|JobInterface $flow): FlowInterfac
return $this->flow->fn($flow);
}

public static function do(callable $callable, ?array $config = null): FlowInterface
{
return Flow::do($callable, $config);
}

public function await(): void
{
$this->flow->await();
Expand Down
158 changes: 158 additions & 0 deletions src/FlowFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
<?php

declare(strict_types=1);

namespace Flow;

use Closure;
use Flow\DriverInterface;
use Flow\Exception\LogicException;
use Flow\Flow\Flow;
use Flow\FlowInterface;
use Flow\JobInterface;
use Generator;

use function array_key_exists;
use function is_array;

class FlowFactory
{
/**
* @param null|DriverInterface<mixed,mixed> $driver
*/
public function __construct(
private ?DriverInterface $driver = null
) {}

/**
* Do-notation a.k.a. for-comprehension.
*
* Syntax sugar for sequential {@see FlowInterface::fn()} calls
*
* Syntax "$flow = yield $wrapedFlow" mean:
* 1) $wrapedFlow can be Closure as Job, array constructor arguments for Flow instanciation, array configuration for Flow instanciation or FlowInterface instance
* 2) $flow is assigned as FlowInterface instance
* 3) optionnaly you can return another wrapedFlow
*
* ```php
* $flow = (new FlowFactory())->create(static function() {
* yield new Flow(fn($a) => $a + 1);
* $flow = yield fn($b) => $b * 2;
* $flow = yield $flow->fn([fn($c) => $c * 4])
* return [$flow, [fn($d) => $d - 8]];
* });
* ```
* $config if provided will be the fallback array configuration for Flow instanciation
*
* @param callable(): Generator|Closure $callable
* @param ?array<mixed> $config
*
* #param ?array{
* 0: Closure|array,
* 1?: Closure|array,
* 2?: IpStrategyInterface<mixed>,
* 3?: EventDispatcherInterface,
* 4?: AsyncHandlerInterface,
* 5?: DriverInterface
* }|array{
* "jobs"?: JobInterface|Closure|array,
* "errorJobs"?: JobInterface|Closure|array,
* "ipStrategy"?: IpStrategyInterface<mixed>,
* "dispatcher"?: EventDispatcherInterface,
* "asyncHandler"?: AsyncHandlerInterface,
* "driver"?: DriverInterface
* } $config
*
* @return FlowInterface<mixed>
*/
public function create(callable $callable, ?array $config = null): FlowInterface
{
/**
* @var Closure|Generator $generator
*/
$generator = $callable();

if ($generator instanceof Generator) {
return $this->createFromGenerator($generator, $config);
}

return $this->createFlow($generator, $config);
}

/**
* @param array<mixed>|Closure|FlowInterface<mixed> $flow
* @param ?array<mixed> $config
*
* @return Flow<mixed, mixed>
*
* #param ?array{
* 0: Closure,
* 1?: Closure,
* 2?: IpStrategyInterface,
* 3?: EventDispatcherInterface,
* 4?: AsyncHandlerInterface,
* 5?: DriverInterface
* }|array{
* "ipStrategy"?: IpStrategyInterface,
* "dispatcher"?: EventDispatcherInterface,
* "asyncHandler"?: AsyncHandlerInterface,
* "driver"?: DriverInterface
* } $config
*/
public function createFlow($flow, ?array $config = null): Flow
{
if ($flow instanceof Closure || $flow instanceof JobInterface) {
return new Flow(...[...['job' => $flow, 'driver' => $this->driver], ...($config ?? [])]);
}
if (is_array($flow)) {
if (array_key_exists(0, $flow) || array_key_exists('job', $flow)) {
return new Flow(...[...$flow, ...['driver' => $this->driver], ...($config ?? [])]);
}

return $this->createFlowMap($flow);
}

return $flow;
}

/**
* @param ?array<mixed> $config
* @return FlowInterface<mixed>
*/
private function createFromGenerator(Generator $generator, ?array $config = null): FlowInterface
{
$flows = [];

while ($generator->valid()) {
$flow = $this->createFlow($generator->current(), $config);
$generator->send($flow);
$flows[] = $flow;
}

$return = $generator->getReturn();
if (!empty($return)) {
$flows[] = $this->createFlow($return, $config);
}

return $this->createFlowMap($flows);
}

/**
* @param array<Flow<mixed, mixed>> $flows
*
* @return Flow<mixed, mixed>
*/
private function createFlowMap(array $flows): Flow
{
$flow = array_shift($flows);
if (null === $flow) {
throw new LogicException('Flow is empty');
}

foreach ($flows as $flowIt) {
$flow = $flow->fn($flowIt);
}

return $flow;
}
}
Loading

0 comments on commit 8e1eff5

Please sign in to comment.