diff --git a/src/Rejection.php b/src/Rejection.php index f7f6859..418522d 100644 --- a/src/Rejection.php +++ b/src/Rejection.php @@ -7,6 +7,7 @@ use Bunny\Channel; use Bunny\Client; use Kiboko\Contract\Pipeline\RejectionInterface; +use Kiboko\Contract\Pipeline\StepCodeInterface; final readonly class Rejection implements RejectionInterface { @@ -70,7 +71,23 @@ public static function withAuthentication( return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange); } - public function reject(object|array $rejection, ?\Throwable $exception = null): void + public function reject(StepCodeInterface $step, object|array $rejection, ?\Throwable $exception = null): void + { + $this->channel->publish( + json_encode([ + 'item' => $rejection, + 'exception' => $exception, + 'step' => $this->stepUuid, + ], \JSON_THROW_ON_ERROR), + [ + 'content-type' => 'application/json', + ], + $this->exchange, + $this->topic, + ); + } + + public function rejectWithReason(StepCodeInterface $step, object|array $rejection, string $reason, ?\Throwable $exception = null): void { $this->channel->publish( json_encode([ diff --git a/src/State.php b/src/State.php index 9f82696..03363e0 100644 --- a/src/State.php +++ b/src/State.php @@ -4,10 +4,12 @@ namespace Kiboko\Component\Flow\RabbitMQ; -use Kiboko\Contract\Pipeline\StateInterface; +use Kiboko\Contract\Pipeline\StepCodeInterface; +use Kiboko\Contract\Pipeline\StepStateInterface; -final class State implements StateInterface +final class State implements StepStateInterface { + private array $steps = []; private int $acceptMetric = 0; private int $rejectMetric = 0; private int $errorMetric = 0; @@ -19,37 +21,25 @@ public function __construct( ) { } - public function initialize(int $start = 0): void + public function accept(int $count = 1): void { - $this->acceptMetric = 0; - $this->rejectMetric = 0; - $this->errorMetric = 0; - } - - public function accept(int $step = 1): void - { - $this->acceptMetric += $step; + $this->acceptMetric += $count; - $this->manager->trySend($step); + $this->manager->trySend($this->stepCode); } - public function reject(int $step = 1): void + public function reject(int $count = 1): void { - $this->rejectMetric += $step; + $this->rejectMetric += $count; - $this->manager->trySend($step); + $this->manager->trySend($this->stepCode); } - public function error(int $step = 1): void + public function error(int $count = 1): void { - $this->errorMetric += $step; + $this->errorMetric += $count; - $this->manager->trySend($step); - } - - public function teardown(): void - { - $this->manager->teardown($this); + $this->manager->trySend($this->stepCode); } public function toArray(): array diff --git a/src/StateManager.php b/src/StateManager.php index 77df561..de5ccda 100644 --- a/src/StateManager.php +++ b/src/StateManager.php @@ -9,6 +9,8 @@ class StateManager { + /** @var list */ + private array $states = []; private array $steps = []; private array $tearedDown = []; private int $messageCount = 0; @@ -37,11 +39,9 @@ public function __destruct() $this->channel->close(); } - public function stepState( - string $stepCode, - string $stepLabel, - ): State { - return $this->steps[] = new State($this, $stepCode, $stepLabel); + public function stepState(string $stepCode, string $stepLabel): State + { + return $this->steps[$stepCode] = new State($this, $stepCode, $stepLabel); } public function trySend($count): void