Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.2.2 #55

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
- Add Symfony Bridge
- new `Flow\Attribute\AsJob` attribute allows cast job on function or class and embed it's name and description

## v1.2.1

- Add new Interface Flow\AsyncHandlerInterface to control the Event::SYNC step when processing an IP
Expand Down
4 changes: 2 additions & 2 deletions docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"keywords": [
"flow"
],
"version": "1.2.1",
"version": "1.2.2",
"browserslist": [
"defaults"
],
Expand Down Expand Up @@ -91,4 +91,4 @@
}
]
}
}
}
30 changes: 30 additions & 0 deletions docs/src/content/en/docs/getting-started/job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "Job"
description: "Job."
lead: "Job."
date: 2020-10-13T15:21:01+02:00
lastmod: 2020-10-13T15:21:01+02:00
draft: false
images: []
menu:
docs:
parent: "getting-started"
weight: 10
toc: true
---

# Job

When using Flow, you can pass Closure or JobInterface, it's useful when you want to specialize your Job, that come with dependecy injection.

## ClosureJob

ClosureJob simplifies job handling by allowing the use of closures or custom job classes, providing a versatile solution for managing jobs in your application.

## YJob

The YJob class defines the Y combinator to recursively apply the job function, making it particularly useful in scenarios where you need to perform recursive tasks without explicitly writing recursive functions.

## Make your own Job

You can make your custom Job by implementing `Flow\JobInterface`.
1 change: 1 addition & 0 deletions docs/src/content/en/docs/getting-started/ressources.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Video of Y-Combinator : [https://www.youtube.com/watch?v=QSS_ZcO8Q1g](https://ww
- Combinator : [https://github.com/loophp/combinator](https://github.com/loophp/combinator)
- Lambda-php : [https://github.com/igorw/lambda-php](https://github.com/igorw/lambda-php)
- Deriving the y combinator in 7 easy steps : [https://gist.github.com/igstan/388351](https://gist.github.com/igstan/388351)
- Y combinator real life application: recursive memoization in clojure : [https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html](https://blog.klipse.tech/lambda/2016/08/10/y-combinator-app.html)

## Messaging approach with East oriented code from [Frédéric Hardy](https://twitter.com/mageekguy)

Expand Down
5 changes: 3 additions & 2 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\Job\ClosureJob;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
Expand Down Expand Up @@ -49,7 +50,7 @@
return new DataB($dataA->id, $d, $dataA->c);
};

$job2 = static function (DataB $dataB) use ($driver): DataC {
$job2 = new ClosureJob(static function (DataB $dataB) use ($driver): DataC {
printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
Expand All @@ -65,7 +66,7 @@
printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay);

return new DataC($dataB->id, $f);
};
});

$job3 = static function (DataC $dataC): DataD {
printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f);
Expand Down
23 changes: 13 additions & 10 deletions examples/yflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use Flow\Flow\Flow;
use Flow\Flow\YFlow;
use Flow\Ip;
use Flow\Job\YJob;
use Flow\JobInterface;

$driver = match (random_int(1, 4)) {
1 => new AmpDriver(),
Expand All @@ -29,12 +31,14 @@ function factorial(int $n): int
return ($n <= 1) ? 1 : $n * factorial($n - 1);
}

function Ywrap(callable $func, callable $wrapperFunc): Closure
/**
* @return JobInterface<mixed, mixed>
*/
function Ywrap(callable $func, callable $wrapperFunc): JobInterface
{
$U = static fn ($f) => $f($f);
$Y = static fn (callable $f, callable $g) => $U(static fn (Closure $x) => $f($g(static fn ($y) => $U($x)($y))));
$wrappedFunc = static fn ($recurse) => $wrapperFunc(static fn (...$args) => $func($recurse)(...$args));

return $Y($func, $wrapperFunc);
return new YJob($wrappedFunc);
}

function memoWrapperGenerator(callable $f): Closure
Expand All @@ -50,7 +54,10 @@ function memoWrapperGenerator(callable $f): Closure
};
}

function Ymemo(callable $f): Closure
/**
* @return JobInterface<mixed, mixed>
*/
function Ymemo(callable $f): JobInterface
{
return Ywrap($f, 'memoWrapperGenerator');
}
Expand Down Expand Up @@ -110,17 +117,13 @@ function factorialYMemo(int $n): int
return new YFlowData($data->id, $data->number);
};

// Define the Y-Combinator
$U = static fn (Closure $f) => $f($f);
$Y = static fn (Closure $f) => $U(static fn (Closure $x) => $f(static fn ($y) => $U($x)($y)));

$factorialYJobDeferBefore = static function (YFlowData $data) {
printf("...* #%d - Job 4 : Calculating factorialYJobDefer(%d)\n", $data->id, $data->number);

return new YFlowData($data->id, $data->number, $data->number);
};

$factorialYJobDefer = $Y(static function ($factorial) {
$factorialYJobDefer = new YJob(static function ($factorial) {
return static function ($args) use ($factorial) {
[$data, $defer] = $args;

Expand Down
16 changes: 16 additions & 0 deletions src/Attribute/AsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_FUNCTION | Attribute::TARGET_METHOD)]
class AsJob
{
public function __construct(
public string $name = '',
public string $description = '',
) {}
}
16 changes: 16 additions & 0 deletions src/Bridge/Symfony/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\DependencyInjection;

use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

class Configuration implements ConfigurationInterface
{
public function getConfigTreeBuilder(): TreeBuilder
{
return new TreeBuilder('flow');
}
}
13 changes: 13 additions & 0 deletions src/Bridge/Symfony/DependencyInjection/FlowExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\DependencyInjection;

use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\Extension;

class FlowExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container): void {}
}
9 changes: 9 additions & 0 deletions src/Bridge/Symfony/FlowBundle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony;

use Symfony\Component\HttpKernel\Bundle\Bundle;

class FlowBundle extends Bundle {}
7 changes: 7 additions & 0 deletions src/Bridge/Symfony/Resources/config/services.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

return static function (ContainerConfigurator $container) {};
9 changes: 5 additions & 4 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand Down Expand Up @@ -49,10 +50,10 @@ public function __construct(?Driver $driver = null)
/**
* @return Closure(TArgs): Future<TReturn>
*/
public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function (Closure $callback, array $args) {
return async(static function (Closure|JobInterface $callback, array $args) {
try {
return $callback(...$args, ...($args = []));
} catch (Throwable $exception) {
Expand Down Expand Up @@ -86,7 +87,7 @@ public function defer(Closure $callback): Future

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -99,7 +100,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $map) use ($job) {
/** @var Closure(TReturn): mixed $map */
$future = $this->defer($job);
Expand Down
11 changes: 6 additions & 5 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Throwable;

use function array_key_exists;
Expand All @@ -34,7 +35,7 @@ class FiberDriver implements DriverInterface
*/
private array $ticks = [];

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return new Fiber(static function () use ($callback, $args) {
Expand Down Expand Up @@ -69,7 +70,7 @@ public function defer(Closure $callback): mixed
public function await(array &$stream): void
{
$async = function ($isTick) use (&$fiberDatas) {
return function (Closure $job) use (&$fiberDatas, $isTick) {
return function (Closure|JobInterface $job) use (&$fiberDatas, $isTick) {
return function (mixed $data) use (&$fiberDatas, $isTick, $job) {
$async = $this->async($job);

Expand Down Expand Up @@ -97,7 +98,7 @@ public function await(array &$stream): void
};

$defer = static function ($isTick) {
return static function (Closure $job) use ($isTick) {
return static function (Closure|JobInterface $job) use ($isTick) {
return static function (Closure $next) use ($isTick, $job) {
$fiber = new Fiber(static function () use ($isTick, $job, $next) {
try {
Expand Down Expand Up @@ -136,9 +137,9 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure $job) use ($defer) {
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
Expand Down Expand Up @@ -46,7 +47,7 @@ public function __construct(?LoopInterface $eventLoop = null)
$this->eventLoop = $eventLoop ?? Loop::get();
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function () use ($callback, $args) {
Expand Down Expand Up @@ -81,7 +82,7 @@ public function defer(Closure $callback): Promise

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -93,7 +94,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function ($then) use ($job) {
$promise = $this->defer($job);
$promise->then($then);
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;
Expand Down Expand Up @@ -45,7 +46,7 @@ public function __construct()
}
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return function (...$args) use ($callback) {
return function ($onResolve) use ($callback, $args) {
Expand All @@ -67,15 +68,15 @@ public function defer(Closure $callback): mixed

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

return $async($data);
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $onResolve) use ($job) {
$this->pool->add(static function () use ($job, $onResolve) {
return $job($onResolve, static function ($fn, $next) {
Expand Down
Loading
Loading