Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.2.3 #57

Open
wants to merge 14 commits into
base: 1.x
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

# PHP
/composer.lock
/vendor/
/vendor/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## v1.2.x

- Add event Event::POOL occurs when Flow needs to count IPs to process.
- Add `Flow\IpPool` for managing pools of Ips.
- Update `Flow\Event\PullEvent` to pull multiple Ips instead one.
- Move `Flow::do` to `FlowFactory::create`
- Add `Flow\Driver\ParallelDriver`

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ composer require darkwood/flow
<?php

use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;

class D1 {
Expand All @@ -45,7 +46,7 @@ class D4 {
public function __construct(public int $n4) {}
}

$flow = Flow::do(static function() {
$flow = (new FlowFactory())->create(static function() {
yield fn (D1 $data1) => new D2($data1->n1 += 1);
yield fn (D2 $data2) => new D3($data2->n2 * 2);
yield function(D3 $data3) {
Expand Down
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ YFlow use YCombinator to provide recursion.

## Make your own Flow

You can make your custom Flow by implementing `Flow\FlowInterface`.
You can make your custom Flow by implementing `Flow\FlowInterface`.
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/ip-strategy.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ You can embed it by a custom strategy with is `LinearIpStrategy` by default.

## Make your Ip Strategy

You can make your custom Ip strategy by implementing `Flow\IpStrategyInterface`
You can make your custom Ip strategy by implementing `Flow\IpStrategyInterface`
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/license.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2 changes: 1 addition & 1 deletion docs/src/layouts/_default/section.sitemap.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@
{{ end -}}
{{ end -}}
{{ end -}}
</urlset>
</urlset>
44 changes: 44 additions & 0 deletions examples/Transport/Client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class Client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}
41 changes: 2 additions & 39 deletions examples/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,16 @@
require __DIR__ . '/../vendor/autoload.php';

use Doctrine\DBAL\DriverManager;
use Flow\Examples\Transport\Client;
use Flow\Examples\Transport\DoctrineIpTransport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}

$connection = DriverManager::getConnection([
'driver' => 'pdo_sqlite',
'path' => __DIR__ . '/flow.sqlite',
]);
$transport = new DoctrineIpTransport($connection, uniqid('transport_', true));

$client = new client($transport, $transport);
$client = new Client($transport, $transport);

$ip = long2ip(random_int(ip2long('10.0.0.0'), ip2long('10.255.255.255')));
for ($i = 0; $i < 3; $i++) {
Expand Down
8 changes: 5 additions & 3 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ParallelDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
Expand All @@ -14,7 +15,7 @@
use Flow\Examples\Model\DataC;
use Flow\Examples\Model\DataD;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\Job\ClosureJob;
Expand All @@ -25,6 +26,7 @@
3 => new ReactDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
// 6 => new ParallelDriver(),
};
printf("Use %s\n", $driver::class);
printf("Calculating:\n");
Expand All @@ -35,7 +37,7 @@
$job1 = static function (DataA $dataA) use ($driver): DataB {
printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b);

// simulating calculating some "light" operation from 0.1 to 1 seconds
// simulating calculating some "light" operation from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$d = $dataA->a + $dataA->b;
Expand Down Expand Up @@ -92,7 +94,7 @@
$asyncTask = static function ($job1, $job2, $job3, $errorJob1, $errorJob2, $driver) {
echo "begin - flow asynchronous\n";

$flow = Flow::do(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
$flow = (new FlowFactory())->create(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
yield [$job1, $errorJob1, new MaxIpStrategy(2)];
yield [$job2, $errorJob2, new MaxIpStrategy(2)];
yield $job3;
Expand Down
4 changes: 2 additions & 2 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
use Flow\Driver\SwooleDriver;
use Flow\Examples\Transport\DoctrineIpTransport;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Flow\TransportFlow;
use Flow\FlowFactory;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Symfony\Component\Messenger\Envelope;
Expand Down Expand Up @@ -74,7 +74,7 @@
printf("%s\n", $exception->getMessage());
};

$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
$flow = (new FlowFactory())->create(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
yield [$addOneJob, $errorJob, new MaxIpStrategy(1)];
yield [$multbyTwoJob, $errorJob, new MaxIpStrategy(3)];
yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)];
Expand Down
Loading
Loading