From dcdcd1872105f59de27df2296bee5885776112f5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Gr=C3=A9gory=20Planchat?= <gregory@kiboko.fr>
Date: Mon, 13 Nov 2023 14:30:00 +0100
Subject: [PATCH] Updated pipeline contracts to v0.5

---
 composer.json                       |   9 +-
 composer.lock                       |  72 +++++++++++--
 src/Extractor/IteratorExtractor.php |   1 +
 src/Pipeline.php                    |  32 ++++--
 src/PipelineRunner.php              |  12 +--
 src/StepCode.php                    |  25 +++++
 tests/unit/PipelineRunnerTest.php   |   4 +-
 tests/unit/PipelineTest.php         | 156 ++++++++++++++++------------
 8 files changed, 215 insertions(+), 96 deletions(-)
 create mode 100644 src/StepCode.php

diff --git a/composer.json b/composer.json
index 6c2ff58..ddd57ee 100644
--- a/composer.json
+++ b/composer.json
@@ -19,7 +19,7 @@
         "php": "^8.2",
         "psr/log": "^3.0",
         "php-etl/bucket": "*",
-        "php-etl/pipeline-contracts": "0.4.*",
+        "php-etl/pipeline-contracts": "0.5.*",
         "php-etl/bucket-contracts": "0.2.*"
     },
     "require-dev": {
@@ -31,7 +31,7 @@
         "rector/rector": "^0.15"
     },
     "provide": {
-        "php-etl/pipeline-implementation": "0.3.0"
+        "php-etl/pipeline-implementation": "0.5.0"
     },
     "autoload": {
         "psr-4": {
@@ -46,12 +46,13 @@
     "config": {
         "bin-dir": "bin",
         "allow-plugins": {
-            "infection/extension-installer": true
+            "infection/extension-installer": true,
+            "php-http/discovery": true
         }
     },
     "extra": {
         "branch-alias": {
-            "dev-main": "0.5.x-dev"
+            "dev-main": "0.6.x-dev"
         }
     }
 }
diff --git a/composer.lock b/composer.lock
index e1a3591..3b14bd4 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,7 +4,7 @@
         "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
         "This file is @generated automatically"
     ],
-    "content-hash": "7a6f0d7130f4a8796f0f72c4b0ab06cd",
+    "content-hash": "2303e9c21649835612b5cdf4c66f5d0b",
     "packages": [
         {
             "name": "php-etl/bucket",
@@ -118,31 +118,33 @@
         },
         {
             "name": "php-etl/pipeline-contracts",
-            "version": "v0.4.0",
+            "version": "dev-main",
             "source": {
                 "type": "git",
                 "url": "https://github.com/php-etl/pipeline-contracts.git",
-                "reference": "47e1e32aedb425c4624c1f6d9acf5ae839287fdf"
+                "reference": "ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f"
             },
             "dist": {
                 "type": "zip",
-                "url": "https://api.github.com/repos/php-etl/pipeline-contracts/zipball/47e1e32aedb425c4624c1f6d9acf5ae839287fdf",
-                "reference": "47e1e32aedb425c4624c1f6d9acf5ae839287fdf",
+                "url": "https://api.github.com/repos/php-etl/pipeline-contracts/zipball/ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f",
+                "reference": "ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f",
                 "shasum": ""
             },
             "require": {
                 "php": "^8.2",
-                "php-etl/bucket-contracts": "0.2.0"
+                "php-etl/bucket-contracts": "0.2.*",
+                "php-etl/satellite-contracts": "0.1.*"
             },
             "require-dev": {
                 "friendsofphp/php-cs-fixer": "^3.0",
                 "phpstan/phpstan": "^1.10",
                 "rector/rector": "^0.15"
             },
+            "default-branch": true,
             "type": "library",
             "extra": {
                 "branch-alias": {
-                    "dev-main": "0.4.x-dev"
+                    "dev-main": "0.5.x-dev"
                 }
             },
             "autoload": {
@@ -167,9 +169,61 @@
             "description": "This library describes contracts for the Extract-Transform-Load pattern.",
             "support": {
                 "issues": "https://github.com/php-etl/pipeline-contracts/issues",
-                "source": "https://github.com/php-etl/pipeline-contracts/tree/v0.4.0"
+                "source": "https://github.com/php-etl/pipeline-contracts/tree/main"
             },
-            "time": "2023-04-06T10:08:37+00:00"
+            "time": "2023-11-13T13:11:27+00:00"
+        },
+        {
+            "name": "php-etl/satellite-contracts",
+            "version": "v0.1.0",
+            "source": {
+                "type": "git",
+                "url": "https://github.com/php-etl/satellite-contracts.git",
+                "reference": "1d2bc6822bfdb3efc6a1f490e706db995c99ef41"
+            },
+            "dist": {
+                "type": "zip",
+                "url": "https://api.github.com/repos/php-etl/satellite-contracts/zipball/1d2bc6822bfdb3efc6a1f490e706db995c99ef41",
+                "reference": "1d2bc6822bfdb3efc6a1f490e706db995c99ef41",
+                "shasum": ""
+            },
+            "require": {
+                "php": "^8.2"
+            },
+            "require-dev": {
+                "rector/rector": "^0.15.0"
+            },
+            "type": "library",
+            "extra": {
+                "branch-alias": {
+                    "dev-main": "0.1.x-dev"
+                }
+            },
+            "autoload": {
+                "psr-4": {
+                    "Kiboko\\Contract\\Satellite\\": "src/"
+                }
+            },
+            "notification-url": "https://packagist.org/downloads/",
+            "license": [
+                "MIT"
+            ],
+            "authors": [
+                {
+                    "name": "Kiboko SAS",
+                    "homepage": "http://kiboko.fr"
+                },
+                {
+                    "name": "Grégory Planchat",
+                    "email": "gregory@kiboko.fr"
+                }
+            ],
+            "description": "This library describes contracts for defining satellite formats",
+            "support": {
+                "issues": "https://github.com/php-etl/satellite-contracts/issues",
+                "source": "https://github.com/php-etl/satellite-contracts/tree/v0.1.0"
+            },
+            "time": "2023-04-18T13:53:22+00:00"
         },
         {
             "name": "psr/log",
diff --git a/src/Extractor/IteratorExtractor.php b/src/Extractor/IteratorExtractor.php
index 86de9fb..d628c56 100644
--- a/src/Extractor/IteratorExtractor.php
+++ b/src/Extractor/IteratorExtractor.php
@@ -4,6 +4,7 @@
 
 namespace Kiboko\Component\Pipeline\Extractor;
 
+use Kiboko\Component\Bucket\AcceptanceResultBucket;
 use Kiboko\Contract\Pipeline\ExtractorInterface;
 
 class IteratorExtractor implements ExtractorInterface
diff --git a/src/Pipeline.php b/src/Pipeline.php
index d52da51..c476d7a 100644
--- a/src/Pipeline.php
+++ b/src/Pipeline.php
@@ -14,6 +14,9 @@
 use Kiboko\Contract\Pipeline\RejectionInterface;
 use Kiboko\Contract\Pipeline\RunnableInterface;
 use Kiboko\Contract\Pipeline\StateInterface;
+use Kiboko\Contract\Pipeline\StepCodeInterface;
+use Kiboko\Contract\Pipeline\StepRejectionInterface;
+use Kiboko\Contract\Pipeline\StepStateInterface;
 use Kiboko\Contract\Pipeline\TransformerInterface;
 use Kiboko\Contract\Pipeline\TransformingInterface;
 use Kiboko\Contract\Pipeline\WalkableInterface;
@@ -23,8 +26,11 @@ class Pipeline implements PipelineInterface, WalkableInterface, RunnableInterfac
     private readonly \AppendIterator $source;
     private iterable $subject;
 
-    public function __construct(private readonly PipelineRunnerInterface $runner, ?\Iterator $source = null)
-    {
+    public function __construct(
+        private readonly PipelineRunnerInterface $runner,
+        private readonly StateInterface $state,
+        ?\Iterator $source = null
+    ) {
         $this->source = new \AppendIterator();
         $this->source->append($source ?? new \EmptyIterator());
 
@@ -39,14 +45,14 @@ public function feed(...$data): void
     private function passThroughCoroutine(): \Generator
     {
         $line = yield;
-        while ($line = yield $line) {
-        }
+        while ($line = yield $line);
     }
 
     public function extract(
+        StepCodeInterface $stepCode,
         ExtractorInterface $extractor,
-        RejectionInterface $rejection,
-        StateInterface $state,
+        StepRejectionInterface $rejection,
+        StepStateInterface $state,
     ): ExtractingInterface {
         $extract = $extractor->extract();
         if (\is_array($extract)) {
@@ -84,9 +90,10 @@ public function extract(
     }
 
     public function transform(
+        StepCodeInterface $stepCode,
         TransformerInterface $transformer,
-        RejectionInterface $rejection,
-        StateInterface $state,
+        StepRejectionInterface $rejection,
+        StepStateInterface $state,
     ): TransformingInterface {
         if ($transformer instanceof FlushableInterface) {
             $iterator = new \AppendIterator();
@@ -125,9 +132,10 @@ public function transform(
     }
 
     public function load(
+        StepCodeInterface $stepCode,
         LoaderInterface $loader,
-        RejectionInterface $rejection,
-        StateInterface $state,
+        StepRejectionInterface $rejection,
+        StepStateInterface $state,
     ): LoadingInterface {
         if ($loader instanceof FlushableInterface) {
             $iterator = new \AppendIterator();
@@ -168,7 +176,11 @@ public function load(
 
     public function walk(): \Iterator
     {
+        $this->state->initialize();
+
         yield from $this->subject;
+
+        $this->state->teardown();
     }
 
     public function run(int $interval = 1000): int
diff --git a/src/PipelineRunner.php b/src/PipelineRunner.php
index a61dbbf..10e2469 100644
--- a/src/PipelineRunner.php
+++ b/src/PipelineRunner.php
@@ -10,6 +10,8 @@
 use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
 use Kiboko\Contract\Pipeline\RejectionInterface;
 use Kiboko\Contract\Pipeline\StateInterface;
+use Kiboko\Contract\Pipeline\StepRejectionInterface;
+use Kiboko\Contract\Pipeline\StepStateInterface;
 use Psr\Log\LoggerInterface;
 use Psr\Log\LogLevel;
 use Psr\Log\NullLogger;
@@ -23,12 +25,9 @@ public function __construct(private readonly LoggerInterface $logger = new NullL
     public function run(
         \Iterator $source,
         \Generator $coroutine,
-        RejectionInterface $rejection,
-        StateInterface $state,
+        StepRejectionInterface $rejection,
+        StepStateInterface $state,
     ): \Iterator {
-        $state->initialize();
-        $rejection->initialize();
-
         $wrapper = new GeneratorWrapper();
         $wrapper->rewind($source, $coroutine);
 
@@ -69,8 +68,5 @@ public function run(
 
             $wrapper->next($source);
         }
-
-        $state->teardown();
-        $rejection->teardown();
     }
 }
diff --git a/src/StepCode.php b/src/StepCode.php
new file mode 100644
index 0000000..2d92f90
--- /dev/null
+++ b/src/StepCode.php
@@ -0,0 +1,25 @@
+<?php
+
+declare(strict_types=1);
+
+namespace Kiboko\Component\Pipeline;
+
+use Kiboko\Contract\Pipeline\StepCodeInterface;
+
+final class StepCode implements StepCodeInterface
+{
+    private function __construct(
+        private readonly string $reference,
+    ) {
+    }
+
+    public static function fromString(string $reference): self
+    {
+        return new self($reference);
+    }
+
+    public function __toString(): string
+    {
+        return $this->reference;
+    }
+}
diff --git a/tests/unit/PipelineRunnerTest.php b/tests/unit/PipelineRunnerTest.php
index 9a21f6a..c60fae9 100644
--- a/tests/unit/PipelineRunnerTest.php
+++ b/tests/unit/PipelineRunnerTest.php
@@ -9,6 +9,8 @@
 use Kiboko\Component\Pipeline\PipelineRunner;
 use Kiboko\Contract\Pipeline\NullRejection;
 use Kiboko\Contract\Pipeline\NullState;
+use Kiboko\Contract\Pipeline\NullStepRejection;
+use Kiboko\Contract\Pipeline\NullStepState;
 use PHPUnit\Framework\TestResult;
 use Psr\Log\NullLogger;
 
@@ -115,7 +117,7 @@ public function testRun(\Iterator $source, callable $callback, array $expected):
     {
         $run = new PipelineRunner(new NullLogger());
 
-        $it = $run->run($source, $callback(), new NullRejection(), new NullState());
+        $it = $run->run($source, $callback(), new NullStepRejection(), new NullStepState());
 
         $this->assertIteration(new \ArrayIterator($expected), $it);
     }
diff --git a/tests/unit/PipelineTest.php b/tests/unit/PipelineTest.php
index 7a368d1..d5f104c 100644
--- a/tests/unit/PipelineTest.php
+++ b/tests/unit/PipelineTest.php
@@ -7,12 +7,15 @@
 use Kiboko\Component\Bucket\AcceptanceResultBucket;
 use Kiboko\Component\Pipeline\Pipeline;
 use Kiboko\Component\Pipeline\PipelineRunner;
+use Kiboko\Component\Pipeline\StepCode;
 use Kiboko\Contract\Bucket\ResultBucketInterface;
 use Kiboko\Contract\Pipeline\ExtractorInterface;
 use Kiboko\Contract\Pipeline\FlushableInterface;
 use Kiboko\Contract\Pipeline\LoaderInterface;
 use Kiboko\Contract\Pipeline\NullRejection;
 use Kiboko\Contract\Pipeline\NullState;
+use Kiboko\Contract\Pipeline\NullStepRejection;
+use Kiboko\Contract\Pipeline\NullStepState;
 use Kiboko\Contract\Pipeline\TransformerInterface;
 use Psr\Log\NullLogger;
 
@@ -23,16 +26,21 @@ final class PipelineTest extends IterableTestCase
 {
     public function testExtractorWithoutFlush(): void
     {
-        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
-
-        $pipeline->extract(new class() implements ExtractorInterface {
-            public function extract(): iterable
-            {
-                yield new AcceptanceResultBucket('lorem');
-                yield new AcceptanceResultBucket('ipsum');
-                yield new AcceptanceResultBucket('dolor');
-            }
-        }, new NullRejection(), new NullState());
+        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());
+
+        $pipeline->extract(
+            StepCode::fromString('extractor'),
+            new class() implements ExtractorInterface {
+                public function extract(): iterable
+                {
+                    yield new AcceptanceResultBucket('lorem');
+                    yield new AcceptanceResultBucket('ipsum');
+                    yield new AcceptanceResultBucket('dolor');
+                }
+            },
+            new NullStepRejection(),
+            new NullStepState()
+        );
 
         $this->assertIteration(
             new \ArrayIterator(['lorem', 'ipsum', 'dolor']),
@@ -42,98 +50,118 @@ public function extract(): iterable
 
     public function testTransformerWithoutFlush(): void
     {
-        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
+        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());
 
         $pipeline->feed(['lorem'], ['ipsum'], ['dolor']);
 
-        $pipeline->transform(new class() implements TransformerInterface {
-            public function transform(): \Generator
-            {
-                $line = yield;
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                yield new AcceptanceResultBucket(str_rot13((string) $line));
-            }
-        }, new NullRejection(), new NullState());
+        $pipeline->transform(
+            StepCode::fromString('transformer'),
+            new class() implements TransformerInterface {
+                public function transform(): \Generator
+                {
+                    $line = yield;
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                }
+            },
+            new NullStepRejection(),
+            new NullStepState()
+        );
 
         $this->assertIteration(
-            new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
+            new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]),
             $pipeline->walk()
         );
     }
 
     public function testTransformerWithFlush(): void
     {
-        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
+        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());
 
         $pipeline->feed(['lorem'], ['ipsum'], ['dolor']);
 
-        $pipeline->transform(new class() implements TransformerInterface, FlushableInterface {
-            public function transform(): \Generator
-            {
-                $line = yield;
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                yield new AcceptanceResultBucket(str_rot13((string) $line));
-            }
-
-            public function flush(): ResultBucketInterface
-            {
-                return new AcceptanceResultBucket(str_rot13('sit amet'));
-            }
-        }, new NullRejection(), new NullState());
+        $pipeline->transform(
+            StepCode::fromString('transformer'),
+            new class() implements TransformerInterface, FlushableInterface {
+                public function transform(): \Generator
+                {
+                    $line = yield;
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                }
+
+                public function flush(): ResultBucketInterface
+                {
+                    return new AcceptanceResultBucket([str_rot13('sit amet')]);
+                }
+            },
+            new NullStepRejection(),
+            new NullStepState()
+        );
 
         $this->assertIteration(
-            new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
+            new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]),
             $pipeline->walk()
         );
     }
 
     public function testLoaderWithoutFlush(): void
     {
-        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
+        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());
 
         $pipeline->feed(['lorem'], ['ipsum'], ['dolor']);
 
-        $pipeline->load(new class() implements LoaderInterface {
-            public function load(): \Generator
-            {
-                $line = yield;
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                yield new AcceptanceResultBucket(str_rot13((string) $line));
-            }
-        }, new NullRejection(), new NullState());
+        $pipeline->load(
+            StepCode::fromString('loader'),
+            new class() implements LoaderInterface {
+                public function load(): \Generator
+                {
+                    $line = yield;
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                }
+            },
+            new NullStepRejection(),
+            new NullStepState()
+        );
 
         $this->assertIteration(
-            new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
+            new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]),
             $pipeline->walk()
         );
     }
 
     public function testLoaderWithFlush(): void
     {
-        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()));
+        $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState());
 
         $pipeline->feed(['lorem'], ['ipsum'], ['dolor']);
 
-        $pipeline->load(new class() implements LoaderInterface, FlushableInterface {
-            public function load(): \Generator
-            {
-                $line = yield;
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                $line = yield new AcceptanceResultBucket(str_rot13((string) $line));
-                yield new AcceptanceResultBucket(str_rot13((string) $line));
-            }
-
-            public function flush(): ResultBucketInterface
-            {
-                return new AcceptanceResultBucket(str_rot13('sit amet'));
-            }
-        }, new NullRejection(), new NullState());
+        $pipeline->load(
+            StepCode::fromString('loader'),
+            new class() implements LoaderInterface, FlushableInterface {
+                public function load(): \Generator
+                {
+                    $line = yield;
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                    yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line));
+                }
+
+                public function flush(): ResultBucketInterface
+                {
+                    return new AcceptanceResultBucket([str_rot13('sit amet')]);
+                }
+            },
+            new NullStepRejection(),
+            new NullStepState()
+        );
 
         $this->assertIteration(
-            new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
+            new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]),
             $pipeline->walk()
         );
     }