Skip to content

Commit

Permalink
🐛 Fix client server examples
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 17, 2024
1 parent 3cd42cf commit dd1710c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 86 deletions.
5 changes: 4 additions & 1 deletion examples/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public function wait(array $handlers): void
}
}

$connection = DriverManager::getConnection(['url' => 'mysql://flow:[email protected]:3306/flow?serverVersion=8.1']);
$connection = DriverManager::getConnection([
'driver' => 'pdo_sqlite',
'path' => __DIR__ . '/flow.sqlite',
]);
$transport = new DoctrineIpTransport($connection, uniqid('transport_', true));

$client = new client($transport, $transport);
Expand Down
45 changes: 29 additions & 16 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
use Flow\Examples\Transport\DoctrineIpTransport;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Flow\TransportFlow;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Symfony\Component\Messenger\Envelope;

$driver = match (random_int(1, 4)) {
$driver = match (random_int(1, 1)) {
1 => new AmpDriver(),
2 => new FiberDriver(),

Check failure on line 23 in examples/server.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Match arm is unreachable because previous comparison is always true.
3 => new ReactDriver(),

Check failure on line 24 in examples/server.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Match arm is unreachable because previous comparison is always true.
Expand All @@ -25,44 +27,51 @@
};
printf("Use %s\n", $driver::class);

$addOneJob = static function (object $data) use ($driver): void {
printf("Client %s #%d : Calculating %d + 1\n", $data['client'], $data['id'], $data['number']);
$addOneJob = static function (Envelope $envelope) use ($driver): Envelope {
$message = $envelope->getMessage();
printf("Client %s #%d : Calculating %d + 1\n", $message['client'], $message['id'], $message['number']);

// simulating calculating some "light" operation from 0.1 to 1 seconds
$delay = random_int(1, 10) / 10;
$driver->delay($delay);
$data['number']++;
$message['number']++;

return $envelope;
};

$multbyTwoJob = static function (object $data) use ($driver): void {
printf("Client %s #%d : Calculating %d * 2\n", $data['client'], $data['id'], $data['number']);
$multbyTwoJob = static function (Envelope $envelope) use ($driver): Envelope {
$message = $envelope->getMessage();
printf("Client %s #%d : Calculating %d * 2\n", $message['client'], $message['id'], $message['number']);

// simulating calculating some "heavy" operation from from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);

// simulating 1 chance on 3 to produce an exception from the "heavy" operation
if (1 === random_int(1, 3)) {
throw new Error('Failure when processing "Mult by two"');
throw new Error(sprintf('Client %s #%d : Exception Failure when processing "Mult by two"', $message['client'], $message['id']));
}

$data['number'] *= 2;
$message['number'] *= 2;

return $envelope;
};

$minusThreeJob = static function (object $data): void {
printf("Client %s #%d : Calculating %d - 3\n", $data['client'], $data['id'], $data['number']);
$minusThreeJob = static function (Envelope $envelope): Envelope {
$message = $envelope->getMessage();
printf("Client %s #%d : Calculating %d - 3\n", $message['client'], $message['id'], $message['number']);

// simulating calculating some "instant" operation
$data['number'] -= 3;
$message['number'] -= 3;

return $envelope;
};

/**
* @param Ip<ArrayObject> $ip
*/
$errorJob = static function (Ip $ip, Throwable $exception): void {
printf("Client %s #%d : Exception %s\n", $ip->data['client'], $ip->data['id'], $exception->getMessage());

$ip->data->offsetSet('number', null);
$errorJob = static function (ExceptionInterface $exception): void {
printf("%s\n", $exception->getMessage());
};

$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
Expand All @@ -71,7 +80,10 @@
yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)];
}, ['driver' => $driver]);

$connection = DriverManager::getConnection(['url' => 'mysql://flow:[email protected]:3306/flow?serverVersion=8.1']);
$connection = DriverManager::getConnection([
'driver' => 'pdo_sqlite',
'path' => __DIR__ . '/flow.sqlite',
]);
$transport = new DoctrineIpTransport($connection);

$transportFlow = new TransportFlow(
Expand All @@ -81,3 +93,4 @@
$driver
);
$transportFlow->pull(1);
$flow->await();
24 changes: 0 additions & 24 deletions src/EnvelopeTrait.php

This file was deleted.

61 changes: 16 additions & 45 deletions src/Flow/TransportFlow.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Exception;
use Flow\Driver\FiberDriver;
use Flow\DriverInterface;
use Flow\EnvelopeTrait;
use Flow\FlowInterface;
use Flow\Ip;
use SplObjectStorage;
Expand All @@ -24,18 +23,6 @@
*/
class TransportFlow extends FlowDecorator
{
use EnvelopeTrait;

/**
* @var SplObjectStorage<Ip<T1>, Envelope>
*/
private SplObjectStorage $envelopePool;

/**
* @var array<int, Envelope>
*/
private array $envelopeIds;

/**
* @var DriverInterface<T1,T2>
*/
Expand All @@ -51,8 +38,21 @@ public function __construct(
?DriverInterface $driver = null
) {
parent::__construct($flow);
$this->fn(function (Envelope $envelope) {
try {
$this->consumer->send(Envelope::wrap($envelope->getMessage(), array_reduce($envelope->all(), static function (array $all, array $stamps) {
foreach ($stamps as $stamp) {
$all[] = $stamp;
}

return $all;
}, [])));
$this->producer->ack($envelope);
} catch (Exception $e) {
$this->producer->reject($envelope);
}
});

$this->envelopePool = new SplObjectStorage();
$this->driver = $driver ?? new FiberDriver();
}

Expand All @@ -73,36 +73,7 @@ public function pull(int $interval): Closure

private function emit(Envelope $envelope): void
{
$id = $this->getEnvelopeId($envelope);
if (!isset($this->envelopeIds[$id])) {
$this->envelopeIds[$id] = $envelope;
$ip = new Ip($envelope->getMessage());
$this->envelopePool->offsetSet($ip, $envelope);

try {
$this->flow->fn(function ($ip) {
$envelope = $this->envelopePool->offsetGet($ip);
$this->consumer->send(Envelope::wrap($ip->data, array_reduce($envelope->all(), static function (array $all, array $stamps) {
foreach ($stamps as $stamp) {
$all[] = $stamp;
}

return $all;
}, [])));
$this->producer->ack($envelope);

$this->envelopePool->offsetUnset($ip);
$id = $this->getEnvelopeId($envelope);
unset($this->envelopeIds[$id]);
});
($this->flow)($ip);
} catch (Exception $e) {
$envelope = $this->envelopePool->offsetGet($ip);
$this->producer->reject($envelope);
$this->envelopePool->offsetUnset($ip);
$id = $this->getEnvelopeId($envelope);
unset($this->envelopeIds[$id]);
}
}
$ip = new Ip($envelope);
($this->flow)($ip);
}
}

0 comments on commit dd1710c

Please sign in to comment.