Skip to content

Commit

Permalink
✨ Add Symfony Bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Aug 17, 2024
1 parent d619267 commit 67b2ebd
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
- Flow\Flow\YFlow rework
- Add more exemples in `examples/yflow.php` to play with Y-Combinators
- Update DX for Flow\DriverInterface : add `defer` to gain much granular control on asynchronous callbacks
- Add Symfony Bridge
- Flow can now use `Flow\JobInterface` as job input
- new `Flow\Attribute\AsJob` attribute allows cast job on function or class and embed it's name and description

## v1.2.0

Expand Down
20 changes: 20 additions & 0 deletions docs/src/content/en/docs/getting-started/job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
title: "Job"
description: "Job."
lead: "Job."
date: 2020-10-13T15:21:01+02:00
lastmod: 2020-10-13T15:21:01+02:00
draft: false
images: []
menu:
docs:
parent: "getting-started"
weight: 10
toc: true
---

# Job

## Make your own Job

You can make your custom Job by implementing `Flow\JobInterface`.
29 changes: 29 additions & 0 deletions examples/Job/GenericJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Job;

use Closure;
use Flow\JobInterface;

/**
* @template TArgs
* @template TReturn
*
* @implements JobInterface<TArgs,TReturn>
*/
class GenericJob implements JobInterface
{
/**
* @param Closure(TArgs): TReturn $job
*/
public function __construct(private Closure $job) {}

public function __invoke($data): mixed
{
$job = $this->job;

return $job($data);
}
}
5 changes: 3 additions & 2 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Job\GenericJob;
use Flow\Examples\Model\DataA;
use Flow\Examples\Model\DataB;
use Flow\Examples\Model\DataC;
Expand Down Expand Up @@ -49,7 +50,7 @@
return new DataB($dataA->id, $d, $dataA->c);
};

$job2 = static function (DataB $dataB) use ($driver): DataC {
$job2 = new GenericJob(static function (DataB $dataB) use ($driver): DataC {
printf(".* #%d - Job 2 Calculating %d * %d\n", $dataB->id, $dataB->d, $dataB->e);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
Expand All @@ -65,7 +66,7 @@
printf(".* #%d - Job 2 Result for %d * %d = %d and took %.01f seconds\n", $dataB->id, $dataB->d, $dataB->e, $f, $delay);

return new DataC($dataB->id, $f);
};
});

$job3 = static function (DataC $dataC): DataD {
printf("** #%d - Job 3 Result is %d\n", $dataC->id, $dataC->f);
Expand Down
16 changes: 16 additions & 0 deletions src/Attribute/AsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_CLASS | Attribute::TARGET_FUNCTION)]
class AsJob
{
public function __construct(
public string $name = '',
public string $description = '',
) {}
}
9 changes: 5 additions & 4 deletions src/Driver/AmpDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver;
use RuntimeException as NativeRuntimeException;
Expand Down Expand Up @@ -49,10 +50,10 @@ public function __construct(?Driver $driver = null)
/**
* @return Closure(TArgs): Future<TReturn>
*/
public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function (Closure $callback, array $args) {
return async(static function (Closure|JobInterface $callback, array $args) {
try {
return $callback(...$args, ...($args = []));
} catch (Throwable $exception) {
Expand Down Expand Up @@ -86,7 +87,7 @@ public function defer(Closure $callback): Future

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -99,7 +100,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $map) use ($job) {
/** @var Closure(TReturn): mixed $map */
$future = $this->defer($job);
Expand Down
11 changes: 6 additions & 5 deletions src/Driver/FiberDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use Throwable;

use function array_key_exists;
Expand All @@ -34,7 +35,7 @@ class FiberDriver implements DriverInterface
*/
private array $ticks = [];

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return new Fiber(static function () use ($callback, $args) {
Expand Down Expand Up @@ -69,7 +70,7 @@ public function defer(Closure $callback): mixed
public function await(array &$stream): void
{
$async = function ($isTick) use (&$fiberDatas) {
return function (Closure $job) use (&$fiberDatas, $isTick) {
return function (Closure|JobInterface $job) use (&$fiberDatas, $isTick) {
return function (mixed $data) use (&$fiberDatas, $isTick, $job) {
$async = $this->async($job);

Expand Down Expand Up @@ -97,7 +98,7 @@ public function await(array &$stream): void
};

$defer = static function ($isTick) {
return static function (Closure $job) use ($isTick) {
return static function (Closure|JobInterface $job) use ($isTick) {
return static function (Closure $next) use ($isTick, $job) {
$fiber = new Fiber(static function () use ($isTick, $job, $next) {
try {
Expand Down Expand Up @@ -136,9 +137,9 @@ public function await(array &$stream): void
foreach ($stream['dispatchers'] as $index => $dispatcher) {
$nextIp = $dispatcher->dispatch(new PullEvent(), Event::PULL)->getIp();
if ($nextIp !== null) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure $job) use ($async) {
$stream['dispatchers'][$index]->dispatch(new AsyncEvent(static function (Closure|JobInterface $job) use ($async) {
return $async(false)($job);
}, static function (Closure $job) use ($defer) {
}, static function (Closure|JobInterface $job) use ($defer) {
return $defer(false)($job);
}, $stream['fnFlows'][$index]['job'], $nextIp, static function ($data) use (&$stream, $index, $nextIp) {
if ($data instanceof RuntimeException and array_key_exists($index, $stream['fnFlows']) and $stream['fnFlows'][$index]['errorJob'] !== null) {
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/ReactDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
Expand Down Expand Up @@ -46,7 +47,7 @@ public function __construct(?LoopInterface $eventLoop = null)
$this->eventLoop = $eventLoop ?? Loop::get();
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return async(static function () use ($callback, $args) {
Expand Down Expand Up @@ -81,7 +82,7 @@ public function defer(Closure $callback): Promise

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

Expand All @@ -93,7 +94,7 @@ public function await(array &$stream): void
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function ($then) use ($job) {
$promise = $this->defer($job);
$promise->then($then);
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/SpatieDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use RuntimeException as NativeRuntimeException;
use Spatie\Async\Pool;
use Throwable;
Expand Down Expand Up @@ -45,7 +46,7 @@ public function __construct()
}
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return function (...$args) use ($callback) {
return function ($onResolve) use ($callback, $args) {
Expand All @@ -67,15 +68,15 @@ public function defer(Closure $callback): mixed

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

return $async($data);
};
};

$defer = function (Closure $job) {
$defer = function (Closure|JobInterface $job) {
return function (Closure $onResolve) use ($job) {
$this->pool->add(static function () use ($job, $onResolve) {
return $job($onResolve, static function ($fn, $next) {
Expand Down
7 changes: 4 additions & 3 deletions src/Driver/SwooleDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Flow\Event\PushEvent;
use Flow\Exception\RuntimeException;
use Flow\Ip;
use Flow\JobInterface;
use OpenSwoole\Timer;
use RuntimeException as NativeRuntimeException;
use Throwable;
Expand All @@ -38,7 +39,7 @@ public function __construct()
}
}

public function async(Closure $callback): Closure
public function async(Closure|JobInterface $callback): Closure
{
return static function (...$args) use ($callback) {
return static function ($onResolve) use ($callback, $args) {
Expand All @@ -61,15 +62,15 @@ public function defer(Closure $callback): mixed

public function await(array &$stream): void
{
$async = function (Closure $job) {
$async = function (Closure|JobInterface $job) {
return function (mixed $data) use ($job) {
$async = $this->async($job);

return $async($data);
};
};

$defer = static function (Closure $job) {
$defer = static function (Closure|JobInterface $job) {
return static function (Closure $onResolve) use ($job) {
go(static function () use ($job, $onResolve) {
try {
Expand Down
6 changes: 3 additions & 3 deletions src/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
interface DriverInterface
{
/**
* #return Closure(TArgs): void when called this start async $callback.
* #return JobInterface<TArgs,void>|Closure(TArgs): void when called this start async $callback.
*
* @param Closure(TArgs): TReturn $callback
* @param Closure(TArgs): TReturn|JobInterface<TArgs,TReturn> $callback
*/
public function async(Closure $callback): Closure;
public function async(Closure|JobInterface $callback): Closure;

/**
* This allow more granular control on async
Expand Down
15 changes: 10 additions & 5 deletions src/Event/AsyncEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@

use Closure;
use Flow\Ip;
use Flow\JobInterface;
use Symfony\Contracts\EventDispatcher\Event;

/**
* @template T
* @template T1
*/
final class AsyncEvent extends Event
{
/**
* @param Ip<T> $ip
* @param Closure|JobInterface<mixed,mixed> $job
* @param Ip<T1> $ip
*/
public function __construct(
private Closure $async,
private Closure $defer,
private Closure $job,
private Closure|JobInterface $job,
private Ip $ip,
private Closure $callback
) {}
Expand All @@ -34,13 +36,16 @@ public function getDefer(): Closure
return $this->defer;
}

public function getJob(): Closure
/**
* @return Closure|JobInterface<mixed,mixed>
*/
public function getJob(): Closure|JobInterface
{
return $this->job;
}

/**
* @return Ip<T>
* @return Ip<T1>
*/
public function getIp(): Ip
{
Expand Down
Loading

0 comments on commit 67b2ebd

Please sign in to comment.