Skip to content

Commit

Permalink
Updated code to be compatible with php-etl/pipeline-contracts:0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
gplanchat committed Nov 21, 2023
1 parent 8932e8c commit 80dd3ed
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
19 changes: 18 additions & 1 deletion src/Rejection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Bunny\Channel;
use Bunny\Client;
use Kiboko\Contract\Pipeline\RejectionInterface;
use Kiboko\Contract\Pipeline\StepCodeInterface;

final readonly class Rejection implements RejectionInterface
{
Expand Down Expand Up @@ -70,7 +71,23 @@ public static function withAuthentication(
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
}

public function reject(object|array $rejection, ?\Throwable $exception = null): void
public function reject(StepCodeInterface $step, object|array $rejection, ?\Throwable $exception = null): void
{
$this->channel->publish(
json_encode([
'item' => $rejection,
'exception' => $exception,
'step' => $this->stepUuid,
], \JSON_THROW_ON_ERROR),
[
'content-type' => 'application/json',
],
$this->exchange,
$this->topic,
);
}

public function rejectWithReason(StepCodeInterface $step, object|array $rejection, string $reason, ?\Throwable $exception = null): void
{
$this->channel->publish(
json_encode([
Expand Down
36 changes: 13 additions & 23 deletions src/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Kiboko\Component\Flow\RabbitMQ;

use Kiboko\Contract\Pipeline\StateInterface;
use Kiboko\Contract\Pipeline\StepCodeInterface;
use Kiboko\Contract\Pipeline\StepStateInterface;

final class State implements StateInterface
final class State implements StepStateInterface
{
private array $steps = [];

Check failure on line 12 in src/State.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\State::$steps is never read, only written.

Check failure on line 12 in src/State.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\State::$steps type has no value type specified in iterable type array.

Check failure on line 12 in src/State.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\State::$steps is never read, only written.

Check failure on line 12 in src/State.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\State::$steps is never read, only written.
private int $acceptMetric = 0;
private int $rejectMetric = 0;
private int $errorMetric = 0;
Expand All @@ -19,37 +21,25 @@ public function __construct(
) {
}

public function initialize(int $start = 0): void
public function accept(int $count = 1): void
{
$this->acceptMetric = 0;
$this->rejectMetric = 0;
$this->errorMetric = 0;
}

public function accept(int $step = 1): void
{
$this->acceptMetric += $step;
$this->acceptMetric += $count;

$this->manager->trySend($step);
$this->manager->trySend($this->stepCode);
}

public function reject(int $step = 1): void
public function reject(int $count = 1): void
{
$this->rejectMetric += $step;
$this->rejectMetric += $count;

$this->manager->trySend($step);
$this->manager->trySend($this->stepCode);
}

public function error(int $step = 1): void
public function error(int $count = 1): void
{
$this->errorMetric += $step;
$this->errorMetric += $count;

$this->manager->trySend($step);
}

public function teardown(): void
{
$this->manager->teardown($this);
$this->manager->trySend($this->stepCode);
}

public function toArray(): array

Check failure on line 45 in src/State.php

View workflow job for this annotation

GitHub Actions / phpstan

Method Kiboko\Component\Flow\RabbitMQ\State::toArray() return type has no value type specified in iterable type array.
Expand Down
10 changes: 5 additions & 5 deletions src/StateManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

class StateManager
{
/** @var list<State> */
private array $states = [];

Check failure on line 13 in src/StateManager.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\StateManager::$states is never read, only written.

Check failure on line 13 in src/StateManager.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\StateManager::$states is never read, only written.

Check failure on line 13 in src/StateManager.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\StateManager::$states is never read, only written.
private array $steps = [];

Check failure on line 14 in src/StateManager.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\StateManager::$steps type has no value type specified in iterable type array.
private array $tearedDown = [];

Check failure on line 15 in src/StateManager.php

View workflow job for this annotation

GitHub Actions / phpstan

Property Kiboko\Component\Flow\RabbitMQ\StateManager::$tearedDown type has no value type specified in iterable type array.
private int $messageCount = 0;
Expand Down Expand Up @@ -37,11 +39,9 @@ public function __destruct()
$this->channel->close();
}

public function stepState(
string $stepCode,
string $stepLabel,
): State {
return $this->steps[] = new State($this, $stepCode, $stepLabel);
public function stepState(string $stepCode, string $stepLabel): State
{
return $this->steps[$stepCode] = new State($this, $stepCode, $stepLabel);
}

public function trySend($count): void
Expand Down

0 comments on commit 80dd3ed

Please sign in to comment.