diff --git a/composer.json b/composer.json index 6c2ff58..ddd57ee 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ "php": "^8.2", "psr/log": "^3.0", "php-etl/bucket": "*", - "php-etl/pipeline-contracts": "0.4.*", + "php-etl/pipeline-contracts": "0.5.*", "php-etl/bucket-contracts": "0.2.*" }, "require-dev": { @@ -31,7 +31,7 @@ "rector/rector": "^0.15" }, "provide": { - "php-etl/pipeline-implementation": "0.3.0" + "php-etl/pipeline-implementation": "0.5.0" }, "autoload": { "psr-4": { @@ -46,12 +46,13 @@ "config": { "bin-dir": "bin", "allow-plugins": { - "infection/extension-installer": true + "infection/extension-installer": true, + "php-http/discovery": true } }, "extra": { "branch-alias": { - "dev-main": "0.5.x-dev" + "dev-main": "0.6.x-dev" } } } diff --git a/composer.lock b/composer.lock index e1a3591..3b14bd4 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "7a6f0d7130f4a8796f0f72c4b0ab06cd", + "content-hash": "2303e9c21649835612b5cdf4c66f5d0b", "packages": [ { "name": "php-etl/bucket", @@ -118,31 +118,33 @@ }, { "name": "php-etl/pipeline-contracts", - "version": "v0.4.0", + "version": "dev-main", "source": { "type": "git", "url": "https://github.com/php-etl/pipeline-contracts.git", - "reference": "47e1e32aedb425c4624c1f6d9acf5ae839287fdf" + "reference": "ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/php-etl/pipeline-contracts/zipball/47e1e32aedb425c4624c1f6d9acf5ae839287fdf", - "reference": "47e1e32aedb425c4624c1f6d9acf5ae839287fdf", + "url": "https://api.github.com/repos/php-etl/pipeline-contracts/zipball/ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f", + "reference": "ad3984db5bbf68104b1c01f3dc8a19c31a95ad4f", "shasum": "" }, "require": { "php": "^8.2", - "php-etl/bucket-contracts": "0.2.0" + "php-etl/bucket-contracts": "0.2.*", + "php-etl/satellite-contracts": "0.1.*" }, "require-dev": { "friendsofphp/php-cs-fixer": "^3.0", "phpstan/phpstan": "^1.10", "rector/rector": "^0.15" }, + "default-branch": true, "type": "library", "extra": { "branch-alias": { - "dev-main": "0.4.x-dev" + "dev-main": "0.5.x-dev" } }, "autoload": { @@ -167,9 +169,61 @@ "description": "This library describes contracts for the Extract-Transform-Load pattern.", "support": { "issues": "https://github.com/php-etl/pipeline-contracts/issues", - "source": "https://github.com/php-etl/pipeline-contracts/tree/v0.4.0" + "source": "https://github.com/php-etl/pipeline-contracts/tree/main" }, - "time": "2023-04-06T10:08:37+00:00" + "time": "2023-11-13T13:11:27+00:00" + }, + { + "name": "php-etl/satellite-contracts", + "version": "v0.1.0", + "source": { + "type": "git", + "url": "https://github.com/php-etl/satellite-contracts.git", + "reference": "1d2bc6822bfdb3efc6a1f490e706db995c99ef41" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-etl/satellite-contracts/zipball/1d2bc6822bfdb3efc6a1f490e706db995c99ef41", + "reference": "1d2bc6822bfdb3efc6a1f490e706db995c99ef41", + "shasum": "" + }, + "require": { + "php": "^8.2" + }, + "require-dev": { + "rector/rector": "^0.15.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "0.1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Kiboko\\Contract\\Satellite\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Kiboko SAS", + "homepage": "http://kiboko.fr" + }, + { + "name": "Grégory Planchat", + "email": "gregory@kiboko.fr" + } + ], + "description": "This library describes contracts for defining satellite formats", + "support": { + "issues": "https://github.com/php-etl/satellite-contracts/issues", + "source": "https://github.com/php-etl/satellite-contracts/tree/v0.1.0" + }, + "time": "2023-04-18T13:53:22+00:00" }, { "name": "psr/log", diff --git a/src/Extractor/IteratorExtractor.php b/src/Extractor/IteratorExtractor.php index 86de9fb..d628c56 100644 --- a/src/Extractor/IteratorExtractor.php +++ b/src/Extractor/IteratorExtractor.php @@ -4,6 +4,7 @@ namespace Kiboko\Component\Pipeline\Extractor; +use Kiboko\Component\Bucket\AcceptanceResultBucket; use Kiboko\Contract\Pipeline\ExtractorInterface; class IteratorExtractor implements ExtractorInterface diff --git a/src/Pipeline.php b/src/Pipeline.php index d52da51..c476d7a 100644 --- a/src/Pipeline.php +++ b/src/Pipeline.php @@ -14,6 +14,9 @@ use Kiboko\Contract\Pipeline\RejectionInterface; use Kiboko\Contract\Pipeline\RunnableInterface; use Kiboko\Contract\Pipeline\StateInterface; +use Kiboko\Contract\Pipeline\StepCodeInterface; +use Kiboko\Contract\Pipeline\StepRejectionInterface; +use Kiboko\Contract\Pipeline\StepStateInterface; use Kiboko\Contract\Pipeline\TransformerInterface; use Kiboko\Contract\Pipeline\TransformingInterface; use Kiboko\Contract\Pipeline\WalkableInterface; @@ -23,8 +26,11 @@ class Pipeline implements PipelineInterface, WalkableInterface, RunnableInterfac private readonly \AppendIterator $source; private iterable $subject; - public function __construct(private readonly PipelineRunnerInterface $runner, ?\Iterator $source = null) - { + public function __construct( + private readonly PipelineRunnerInterface $runner, + private readonly StateInterface $state, + ?\Iterator $source = null + ) { $this->source = new \AppendIterator(); $this->source->append($source ?? new \EmptyIterator()); @@ -39,14 +45,14 @@ public function feed(...$data): void private function passThroughCoroutine(): \Generator { $line = yield; - while ($line = yield $line) { - } + while ($line = yield $line); } public function extract( + StepCodeInterface $stepCode, ExtractorInterface $extractor, - RejectionInterface $rejection, - StateInterface $state, + StepRejectionInterface $rejection, + StepStateInterface $state, ): ExtractingInterface { $extract = $extractor->extract(); if (\is_array($extract)) { @@ -84,9 +90,10 @@ public function extract( } public function transform( + StepCodeInterface $stepCode, TransformerInterface $transformer, - RejectionInterface $rejection, - StateInterface $state, + StepRejectionInterface $rejection, + StepStateInterface $state, ): TransformingInterface { if ($transformer instanceof FlushableInterface) { $iterator = new \AppendIterator(); @@ -125,9 +132,10 @@ public function transform( } public function load( + StepCodeInterface $stepCode, LoaderInterface $loader, - RejectionInterface $rejection, - StateInterface $state, + StepRejectionInterface $rejection, + StepStateInterface $state, ): LoadingInterface { if ($loader instanceof FlushableInterface) { $iterator = new \AppendIterator(); @@ -168,7 +176,11 @@ public function load( public function walk(): \Iterator { + $this->state->initialize(); + yield from $this->subject; + + $this->state->teardown(); } public function run(int $interval = 1000): int diff --git a/src/PipelineRunner.php b/src/PipelineRunner.php index a61dbbf..10e2469 100644 --- a/src/PipelineRunner.php +++ b/src/PipelineRunner.php @@ -10,6 +10,8 @@ use Kiboko\Contract\Pipeline\PipelineRunnerInterface; use Kiboko\Contract\Pipeline\RejectionInterface; use Kiboko\Contract\Pipeline\StateInterface; +use Kiboko\Contract\Pipeline\StepRejectionInterface; +use Kiboko\Contract\Pipeline\StepStateInterface; use Psr\Log\LoggerInterface; use Psr\Log\LogLevel; use Psr\Log\NullLogger; @@ -23,12 +25,9 @@ public function __construct(private readonly LoggerInterface $logger = new NullL public function run( \Iterator $source, \Generator $coroutine, - RejectionInterface $rejection, - StateInterface $state, + StepRejectionInterface $rejection, + StepStateInterface $state, ): \Iterator { - $state->initialize(); - $rejection->initialize(); - $wrapper = new GeneratorWrapper(); $wrapper->rewind($source, $coroutine); @@ -69,8 +68,5 @@ public function run( $wrapper->next($source); } - - $state->teardown(); - $rejection->teardown(); } } diff --git a/src/StepCode.php b/src/StepCode.php new file mode 100644 index 0000000..2d92f90 --- /dev/null +++ b/src/StepCode.php @@ -0,0 +1,25 @@ +reference; + } +} diff --git a/tests/unit/PipelineRunnerTest.php b/tests/unit/PipelineRunnerTest.php index 9a21f6a..c60fae9 100644 --- a/tests/unit/PipelineRunnerTest.php +++ b/tests/unit/PipelineRunnerTest.php @@ -9,6 +9,8 @@ use Kiboko\Component\Pipeline\PipelineRunner; use Kiboko\Contract\Pipeline\NullRejection; use Kiboko\Contract\Pipeline\NullState; +use Kiboko\Contract\Pipeline\NullStepRejection; +use Kiboko\Contract\Pipeline\NullStepState; use PHPUnit\Framework\TestResult; use Psr\Log\NullLogger; @@ -115,7 +117,7 @@ public function testRun(\Iterator $source, callable $callback, array $expected): { $run = new PipelineRunner(new NullLogger()); - $it = $run->run($source, $callback(), new NullRejection(), new NullState()); + $it = $run->run($source, $callback(), new NullStepRejection(), new NullStepState()); $this->assertIteration(new \ArrayIterator($expected), $it); } diff --git a/tests/unit/PipelineTest.php b/tests/unit/PipelineTest.php index 7a368d1..d5f104c 100644 --- a/tests/unit/PipelineTest.php +++ b/tests/unit/PipelineTest.php @@ -7,12 +7,15 @@ use Kiboko\Component\Bucket\AcceptanceResultBucket; use Kiboko\Component\Pipeline\Pipeline; use Kiboko\Component\Pipeline\PipelineRunner; +use Kiboko\Component\Pipeline\StepCode; use Kiboko\Contract\Bucket\ResultBucketInterface; use Kiboko\Contract\Pipeline\ExtractorInterface; use Kiboko\Contract\Pipeline\FlushableInterface; use Kiboko\Contract\Pipeline\LoaderInterface; use Kiboko\Contract\Pipeline\NullRejection; use Kiboko\Contract\Pipeline\NullState; +use Kiboko\Contract\Pipeline\NullStepRejection; +use Kiboko\Contract\Pipeline\NullStepState; use Kiboko\Contract\Pipeline\TransformerInterface; use Psr\Log\NullLogger; @@ -23,16 +26,21 @@ final class PipelineTest extends IterableTestCase { public function testExtractorWithoutFlush(): void { - $pipeline = new Pipeline(new PipelineRunner(new NullLogger())); - - $pipeline->extract(new class() implements ExtractorInterface { - public function extract(): iterable - { - yield new AcceptanceResultBucket('lorem'); - yield new AcceptanceResultBucket('ipsum'); - yield new AcceptanceResultBucket('dolor'); - } - }, new NullRejection(), new NullState()); + $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState()); + + $pipeline->extract( + StepCode::fromString('extractor'), + new class() implements ExtractorInterface { + public function extract(): iterable + { + yield new AcceptanceResultBucket('lorem'); + yield new AcceptanceResultBucket('ipsum'); + yield new AcceptanceResultBucket('dolor'); + } + }, + new NullStepRejection(), + new NullStepState() + ); $this->assertIteration( new \ArrayIterator(['lorem', 'ipsum', 'dolor']), @@ -42,98 +50,118 @@ public function extract(): iterable public function testTransformerWithoutFlush(): void { - $pipeline = new Pipeline(new PipelineRunner(new NullLogger())); + $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState()); $pipeline->feed(['lorem'], ['ipsum'], ['dolor']); - $pipeline->transform(new class() implements TransformerInterface { - public function transform(): \Generator - { - $line = yield; - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - yield new AcceptanceResultBucket(str_rot13((string) $line)); - } - }, new NullRejection(), new NullState()); + $pipeline->transform( + StepCode::fromString('transformer'), + new class() implements TransformerInterface { + public function transform(): \Generator + { + $line = yield; + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + } + }, + new NullStepRejection(), + new NullStepState() + ); $this->assertIteration( - new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']), + new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]), $pipeline->walk() ); } public function testTransformerWithFlush(): void { - $pipeline = new Pipeline(new PipelineRunner(new NullLogger())); + $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState()); $pipeline->feed(['lorem'], ['ipsum'], ['dolor']); - $pipeline->transform(new class() implements TransformerInterface, FlushableInterface { - public function transform(): \Generator - { - $line = yield; - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - yield new AcceptanceResultBucket(str_rot13((string) $line)); - } - - public function flush(): ResultBucketInterface - { - return new AcceptanceResultBucket(str_rot13('sit amet')); - } - }, new NullRejection(), new NullState()); + $pipeline->transform( + StepCode::fromString('transformer'), + new class() implements TransformerInterface, FlushableInterface { + public function transform(): \Generator + { + $line = yield; + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + } + + public function flush(): ResultBucketInterface + { + return new AcceptanceResultBucket([str_rot13('sit amet')]); + } + }, + new NullStepRejection(), + new NullStepState() + ); $this->assertIteration( - new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']), + new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]), $pipeline->walk() ); } public function testLoaderWithoutFlush(): void { - $pipeline = new Pipeline(new PipelineRunner(new NullLogger())); + $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState()); $pipeline->feed(['lorem'], ['ipsum'], ['dolor']); - $pipeline->load(new class() implements LoaderInterface { - public function load(): \Generator - { - $line = yield; - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - yield new AcceptanceResultBucket(str_rot13((string) $line)); - } - }, new NullRejection(), new NullState()); + $pipeline->load( + StepCode::fromString('loader'), + new class() implements LoaderInterface { + public function load(): \Generator + { + $line = yield; + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + } + }, + new NullStepRejection(), + new NullStepState() + ); $this->assertIteration( - new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']), + new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe']]), $pipeline->walk() ); } public function testLoaderWithFlush(): void { - $pipeline = new Pipeline(new PipelineRunner(new NullLogger())); + $pipeline = new Pipeline(new PipelineRunner(new NullLogger()), new NullState()); $pipeline->feed(['lorem'], ['ipsum'], ['dolor']); - $pipeline->load(new class() implements LoaderInterface, FlushableInterface { - public function load(): \Generator - { - $line = yield; - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - $line = yield new AcceptanceResultBucket(str_rot13((string) $line)); - yield new AcceptanceResultBucket(str_rot13((string) $line)); - } - - public function flush(): ResultBucketInterface - { - return new AcceptanceResultBucket(str_rot13('sit amet')); - } - }, new NullRejection(), new NullState()); + $pipeline->load( + StepCode::fromString('loader'), + new class() implements LoaderInterface, FlushableInterface { + public function load(): \Generator + { + $line = yield; + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + $line = yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + yield new AcceptanceResultBucket(array_map(fn (string $item) => str_rot13($item), $line)); + } + + public function flush(): ResultBucketInterface + { + return new AcceptanceResultBucket([str_rot13('sit amet')]); + } + }, + new NullStepRejection(), + new NullStepState() + ); $this->assertIteration( - new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']), + new \ArrayIterator([['yberz'], ['vcfhz'], ['qbybe'], ['fvg nzrg']]), $pipeline->walk() ); }