From 2b162dd1ae40762eaab7df15af649eb4455cbb9a Mon Sep 17 00:00:00 2001
From: jbcr <51637606+jbcr@users.noreply.github.com>
Date: Thu, 10 Oct 2019 11:36:26 +0200
Subject: [PATCH] Initial commit with :heart:
---
.gitignore | 5 +
.php_cs.dist | 12 +
CONTRIBUTING.md | 72 ++++
LICENSE | 19 +
README.md | 385 ++++++++++++++++++
.../Manager/ScheduledDataflowManagerTest.php | 89 ++++
Tests/Registry/DataflowTypeRegistryTest.php | 47 +++
Tests/Runner/PendingDataflowRunnerTest.php | 144 +++++++
.../Constraints/FrequencyValidatorTest.php | 75 ++++
Tests/bootstrap.php | 11 +
composer.json | 57 +++
phpunit.xml | 32 ++
src/CodeRhapsodieDataflowBundle.php | 25 ++
src/Command/AddScheduledDataflowCommand.php | 132 ++++++
src/Command/ChangeScheduleStatusCommand.php | 81 ++++
src/Command/ExecuteDataflowCommand.php | 66 +++
src/Command/JobShowCommand.php | 113 +++++
src/Command/RunPendingDataflowsCommand.php | 69 ++++
src/Command/ScheduleListCommand.php | 59 +++
src/DataflowType/AbstractDataflowType.php | 36 ++
src/DataflowType/Dataflow/Dataflow.php | 111 +++++
.../Dataflow/DataflowInterface.php | 20 +
src/DataflowType/DataflowBuilder.php | 70 ++++
src/DataflowType/DataflowTypeInterface.php | 14 +
src/DataflowType/Result.php | 123 ++++++
src/DataflowType/Writer/PortWriterAdapter.php | 31 ++
src/DataflowType/Writer/WriterInterface.php | 14 +
.../CodeRhapsodieDataflowExtension.php | 25 ++
.../Compiler/DataflowTypeCompilerPass.php | 31 ++
src/Entity/Job.php | 325 +++++++++++++++
src/Entity/ScheduledDataflow.php | 213 ++++++++++
src/Event/Events.php | 11 +
src/Event/ProcessingEvent.php | 35 ++
.../InterruptedProcessingException.php | 12 +
.../UnknownDataflowTypeException.php | 12 +
src/Manager/ScheduledDataflowManager.php | 74 ++++
.../ScheduledDataflowManagerInterface.php | 16 +
src/Registry/DataflowTypeRegistry.php | 55 +++
.../DataflowTypeRegistryInterface.php | 36 ++
src/Repository/JobRepository.php | 65 +++
.../ScheduledDataflowRepository.php | 61 +++
src/Resources/config/services.yaml | 62 +++
src/Runner/PendingDataflowRunner.php | 89 ++++
src/Runner/PendingDataflowRunnerInterface.php | 10 +
src/Validator/Constraints/Frequency.php | 17 +
.../Constraints/FrequencyValidator.php | 49 +++
46 files changed, 3110 insertions(+)
create mode 100644 .gitignore
create mode 100644 .php_cs.dist
create mode 100644 CONTRIBUTING.md
create mode 100644 LICENSE
create mode 100644 README.md
create mode 100644 Tests/Manager/ScheduledDataflowManagerTest.php
create mode 100644 Tests/Registry/DataflowTypeRegistryTest.php
create mode 100644 Tests/Runner/PendingDataflowRunnerTest.php
create mode 100644 Tests/Validator/Constraints/FrequencyValidatorTest.php
create mode 100644 Tests/bootstrap.php
create mode 100644 composer.json
create mode 100644 phpunit.xml
create mode 100644 src/CodeRhapsodieDataflowBundle.php
create mode 100644 src/Command/AddScheduledDataflowCommand.php
create mode 100644 src/Command/ChangeScheduleStatusCommand.php
create mode 100644 src/Command/ExecuteDataflowCommand.php
create mode 100644 src/Command/JobShowCommand.php
create mode 100644 src/Command/RunPendingDataflowsCommand.php
create mode 100644 src/Command/ScheduleListCommand.php
create mode 100644 src/DataflowType/AbstractDataflowType.php
create mode 100644 src/DataflowType/Dataflow/Dataflow.php
create mode 100644 src/DataflowType/Dataflow/DataflowInterface.php
create mode 100644 src/DataflowType/DataflowBuilder.php
create mode 100644 src/DataflowType/DataflowTypeInterface.php
create mode 100644 src/DataflowType/Result.php
create mode 100644 src/DataflowType/Writer/PortWriterAdapter.php
create mode 100644 src/DataflowType/Writer/WriterInterface.php
create mode 100644 src/DependencyInjection/CodeRhapsodieDataflowExtension.php
create mode 100644 src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
create mode 100644 src/Entity/Job.php
create mode 100644 src/Entity/ScheduledDataflow.php
create mode 100644 src/Event/Events.php
create mode 100644 src/Event/ProcessingEvent.php
create mode 100644 src/Exceptions/InterruptedProcessingException.php
create mode 100644 src/Exceptions/UnknownDataflowTypeException.php
create mode 100644 src/Manager/ScheduledDataflowManager.php
create mode 100644 src/Manager/ScheduledDataflowManagerInterface.php
create mode 100644 src/Registry/DataflowTypeRegistry.php
create mode 100644 src/Registry/DataflowTypeRegistryInterface.php
create mode 100644 src/Repository/JobRepository.php
create mode 100644 src/Repository/ScheduledDataflowRepository.php
create mode 100644 src/Resources/config/services.yaml
create mode 100644 src/Runner/PendingDataflowRunner.php
create mode 100644 src/Runner/PendingDataflowRunnerInterface.php
create mode 100644 src/Validator/Constraints/Frequency.php
create mode 100644 src/Validator/Constraints/FrequencyValidator.php
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..19615b0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+vendor
+composer.lock
+.phpunit.result.cache
+.php_cs.cache
+.php_cs
diff --git a/.php_cs.dist b/.php_cs.dist
new file mode 100644
index 0000000..d4e742b
--- /dev/null
+++ b/.php_cs.dist
@@ -0,0 +1,12 @@
+in(__DIR__.'/src');
+
+return PhpCsFixer\Config::create()
+ ->setRules([
+ '@Symfony' => true,
+ 'declare_strict_types' => true,
+ ])
+ ->setFinder($finder)
+ ->setRiskyAllowed(true)
+;
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..524a6f2
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,72 @@
+Contributing
+============
+
+Thank you for contributing to this project!
+
+Bug reports
+-----------
+
+If you find a bug, please submit an issue. Try to be as detailed as possible
+in your problem description to help us fix the bug.
+
+Feature requests
+----------------
+
+If you wish to propose a feature, please submit an issue. Try to explain your
+use case as fully as possible to help us understand why you think the feature
+should be added.
+
+Creating a pull request (PR)
+----------------------------
+
+First [fork the repository](https://help.github.com/articles/fork-a-repo/) on
+GitHub.
+
+Then clone your fork:
+
+```bash
+$ git clone https://github.com/code-rhapsodie/dataflow-bundle.git
+$ git checkout -b bug-or-feature-description
+```
+
+And install the dependencies:
+
+```bash
+$ composer install
+```
+
+Write your code and add tests. Then run the tests:
+
+```bash
+$ vendor/bin/phpunit
+```
+
+Commit your changes and push them to GitHub:
+
+```bash
+$ git commit -m "Fix nasty bug"
+$ git push -u origin bug-or-feature-description
+```
+
+Then [create a pull request](https://help.github.com/articles/creating-a-pull-request/)
+on GitHub.
+
+If you need to make some changes, commit and push them as you like. When asked
+to squash your commits, do so as follows:
+
+```bash
+git rebase -i
+git push origin bug-or-feature-description -f
+```
+
+Coding standard
+---------------
+
+This project follows the [Symfony](https://symfony.com/doc/current/contributing/code/standards.html) coding style.
+Please make sure your pull requests adhere to this standard.
+
+To fix, execute this command after [download PHP CS Fixer](https://cs.symfony.com/):
+
+```shell script
+$ php php-cs-fixer.phar fix
+```
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..ec8a624
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2019 Code Rhapsodie
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..cb81a5e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,385 @@
+# Code-Rhapsodie Dataflow Bundle
+
+DataflowBundle is a bundle for Symfony 3.4+
+providing an easy way to create import / export dataflow.
+
+# Features
+
+* Define and configure a Dataflow
+* Run the Job scheduled
+* Run one Dataflow from the command line
+* Define the schedule for a Dataflow from the command line
+* Enable/Disable a scheduled Dataflow from the command line
+* Display the list of scheduled Dataflow from the command line
+* Display the result for the last Job for a Dataflow from the command line
+
+
+## Installation
+
+### Add the dependency
+
+To install this bundle, run this command :
+
+```shell script
+$ composer require code-rhapsodie/dataflow
+```
+
+#### Suggest
+
+You can use the generic readers, writers and steps from [PortPHP](https://github.com/portphp/portphp).
+
+For the writers, you must use the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` like this:
+
+```php
+addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter));
+// ...
+```
+
+### Register the bundle
+
+#### Symfony 4 (new tree)
+
+For Symfony 4, add `CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
+` in the `config/bundles.php` file.
+
+Like this:
+
+```php
+ ['all' => true],
+ // ...
+];
+```
+
+#### Symfony 3.4 (old tree)
+
+For Symfony 3.4, add a new line in the `app/AppKernel.php` file.
+
+Like this:
+
+```php
+myReader = $myReader;
+ $this->myWriter = $myWriter;
+ }
+
+ protected function buildDataflow(DataflowBuilder $builder, array $options): void
+ {
+
+ $this->myReader->setFilename($options['fileName']);
+
+ $builder->setReader($this->myReader)
+ ->addStep(function($data) use ($options) {
+ // TODO : Write your code here...
+ return $data;
+ })
+ ->addWriter($this->myWriter)
+ ;
+ }
+
+ protected function configureOptions(OptionsResolver $optionsResolver): void
+ {
+ $optionsResolver->setDefaults([
+ 'my_option' => 'my_default_value',
+ 'fileName' => null,
+ ]);
+ $optionsResolver->setRequired('fileName');
+ }
+
+ public function getLabel(): string
+ {
+ return 'My First Dataflow';
+ }
+
+ public function getAliases(): iterable
+ {
+ return ['mfd'];
+ }
+}
+
+```
+
+The `DataflowTypeInterface` is used by Symfony for auto-configuration our custom datafow type only if the folder is correctly configured (see the `services` configuration file in your projet).
+If you don't use the auto-configuration, you must add this tag `coderhapsodie.dataflow.type` in your dataflow type service configuration:
+
+```yaml
+ CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType:
+ tags:
+ - { name: coderhapsodie.dataflow.type }
+```
+
+### Use the options for your dataflow type
+
+The `AbstractDataflowType` can help you define the options of your Datataflow type.
+
+Add this method in your DataflowType class:
+
+```php
+setDefaults([
+ 'my_option' => 'my_default_value',
+ 'fileName' => null,
+ ]);
+ $optionsResolver->setRequired('fileName');
+ }
+
+}
+```
+
+With this configuration, the option `fileName` is required. For an advanced usage of the option resolver, read the [Symfony documentation](https://symfony.com/doc/current/components/options_resolver.html).
+
+
+### Check if your DataflowType is ready
+
+Execute this command to check if your DataflowType is correctly registered:
+
+```shell script
+$ bin/console debug:container --tag coderhapsodie.dataflow.type --show-private
+```
+
+The result is like this:
+
+```
+Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflow.type" Tag
+===========================================================================================
+
+ ---------------------------------------------------------------- ----------------------------------------------------------------
+ Service ID Class name
+ ---------------------------------------------------------------- ----------------------------------------------------------------
+ CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType
+ ---------------------------------------------------------------- ----------------------------------------------------------------
+
+```
+
+
+### Readers
+
+*Readers* provide the workflow with elements to import / export. Usually, elements are read from an external resource (file, database, webservice, etc).
+
+A *Reader* must implements `Port\Reader` or return a `iterable` if you use the `Port\Reader\IteratorReader`.
+
+The only constraint on the returned elements typing is that they cannot be `false`.
+
+The reader can be a generator like this example :
+
+```php
+filename = $filename;
+ }
+
+ public function __invoke(): iterable
+ {
+ if (!$this->filename) {
+ throw new \Exception("The file name is not defined. Define it with 'setFilename' method");
+ }
+
+ if (!$fh = fopen($this->filename, 'r')) {
+ throw new \Exception("Unable to open file '".$this->filename."' for read.");
+ }
+
+ while (false === ($read = fread($fh, 1024))) {
+ yield explode("|", $read);
+ }
+ }
+}
+```
+
+To setup your reader in the dataflow builder, you must use `Port\Reader\IteratorReader` like this
+
+```php
+$builder->setReader(new \Port\Reader\IteratorReader($this->myReader))
+```
+
+
+### Steps
+
+*Steps* are operations performed on the elements before they are handled by the *Writers*. Usually, steps are either:
+- converters, that alter the element
+- filters, that conditionally prevents further operations on the element
+
+A *Step* can be any callable, taking the element as its argument, and returning either:
+- the element, possibly altered
+- `false`, if no further operations should be performed on this element
+
+
+### Writers
+
+*Writers* performs the actual import / export operations.
+
+A *Writer* must implements `CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface`.
+As this interface is not compatible with `Port\Writer`, the adapter `CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter` is provided.
+
+This example show how to use the predefined PhpPort Writer :
+
+```php
+$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));
+```
+
+Or you own Writer:
+
+```php
+fh = fopen('/path/to/file', 'w')) {
+ throw new \Exception("Unable to open in write mode the output file.");
+ }
+ }
+
+ public function write($item)
+ {
+ fputcsv($this->fh, $item);
+ }
+
+ public function finish()
+ {
+ fclose($this->fh);
+ }
+}
+```
+
+## Queue
+
+All pending dataflow job processes are stored in a queue into the database.
+
+Add this command into your crontab for execute all queued job:
+
+```shell script
+$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:job:run-pending
+```
+
+## Commands
+
+Many commands are provided.
+
+`code-rhapsodie:dataflow:job:run-pending` Executes job in the queue according to their schedule.
+
+`code-rhapsodie:dataflow:schedule:list` Display the list of dataflows scheduled.
+
+`code-rhapsodie:dataflow:schedule:change-status` Enable or disable a scheduled dataflow
+
+`code-rhapsodie:dataflow:schedule:add` Add the schedule for a dataflow.
+
+`code-rhapsodie:dataflow:job:show` Display the last result of a job.
+
+`code-rhapsodie:dataflow:execute` Lets you execute one dataflow job.
+
+
+# Issues and feature requests
+
+Please report issues and request features at https://github.com/code-rhapsodie/dataflow-bundle/issues.
+
+# Contributing
+
+Contributions are very welcome. Please see [CONTRIBUTING.md](CONTRIBUTING.md) for
+details. Thanks to [everyone who has contributed](https://github.com/code-rhapsodie/dataflow-bundle/graphs/contributors)
+already.
+
+# License
+
+This package is licensed under the [MIT license](LICENSE).
diff --git a/Tests/Manager/ScheduledDataflowManagerTest.php b/Tests/Manager/ScheduledDataflowManagerTest.php
new file mode 100644
index 0000000..22164d1
--- /dev/null
+++ b/Tests/Manager/ScheduledDataflowManagerTest.php
@@ -0,0 +1,89 @@
+em = $this->createMock(EntityManagerInterface::class);
+ $this->scheduledDataflowRepository = $this->createMock(ScheduledDataflowRepository::class);
+ $this->jobRepository = $this->createMock(JobRepository::class);
+
+ $this->manager = new ScheduledDataflowManager($this->em, $this->scheduledDataflowRepository, $this->jobRepository);
+ }
+
+ public function testCreateJobsFromScheduledDataflows()
+ {
+ $scheduled1 = new ScheduledDataflow();
+ $scheduled2 = (new ScheduledDataflow())
+ ->setDataflowType($type = 'testType')
+ ->setOptions($options = ['opt' => 'val'])
+ ->setNext($next = new \DateTime())
+ ->setLabel($label = 'testLabel')
+ ->setFrequency($frequency = '1 year')
+ ;
+
+ $this->scheduledDataflowRepository
+ ->expects($this->once())
+ ->method('findReadyToRun')
+ ->willReturn([$scheduled1, $scheduled2])
+ ;
+
+ $this->jobRepository
+ ->expects($this->exactly(2))
+ ->method('findPendingForScheduledDataflow')
+ ->withConsecutive([$scheduled1], [$scheduled2])
+ ->willReturnOnConsecutiveCalls(new Job(), null)
+ ;
+
+ $this->em
+ ->expects($this->once())
+ ->method('persist')
+ ->with(
+ $this->callback(function (Job $job) use ($type, $options, $next, $label, $scheduled2) {
+ return (
+ $job->getStatus() === Job::STATUS_PENDING
+ && $job->getDataflowType() === $type
+ && $job->getOptions() === $options
+ && $job->getRequestedDate() == $next
+ && $job->getLabel() === $label
+ && $job->getScheduledDataflow() === $scheduled2
+ );
+ })
+ )
+ ;
+
+ $this->em
+ ->expects($this->once())
+ ->method('flush')
+ ;
+
+ $this->manager->createJobsFromScheduledDataflows();
+
+ $this->assertEquals($next->add(\DateInterval::createFromDateString($frequency)), $scheduled2->getNext());
+ }
+}
diff --git a/Tests/Registry/DataflowTypeRegistryTest.php b/Tests/Registry/DataflowTypeRegistryTest.php
new file mode 100644
index 0000000..837e0c1
--- /dev/null
+++ b/Tests/Registry/DataflowTypeRegistryTest.php
@@ -0,0 +1,47 @@
+registry = new DataflowTypeRegistry();
+ }
+
+ public function testEverything()
+ {
+ $alias1 = 'alias1';
+ $alias2 = 'alias2';
+
+ /** @var MockObject|DataflowTypeInterface $type */
+ $type = $this->createMock(DataflowTypeInterface::class);
+ $type
+ ->expects($this->once())
+ ->method('getAliases')
+ ->willReturn([$alias1, $alias2])
+ ;
+
+ $this->registry->registerDataflowType($type);
+
+ $this->assertSame($type, $this->registry->getDataflowType(get_class($type)));
+ $this->assertSame($type, $this->registry->getDataflowType($alias1));
+ $this->assertSame($type, $this->registry->getDataflowType($alias2));
+ $this->assertContains($type, $this->registry->listDataflowTypes());
+ }
+
+ public function testUnknown()
+ {
+ $this->expectException(UnknownDataflowTypeException::class);
+ $this->registry->getDataflowType('unknown');
+ }
+}
diff --git a/Tests/Runner/PendingDataflowRunnerTest.php b/Tests/Runner/PendingDataflowRunnerTest.php
new file mode 100644
index 0000000..077a62c
--- /dev/null
+++ b/Tests/Runner/PendingDataflowRunnerTest.php
@@ -0,0 +1,144 @@
+em = $this->createMock(EntityManagerInterface::class);
+ $this->repository = $this->createMock(JobRepository::class);
+ $this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
+ $this->dispatcher = $this->createMock(EventDispatcherInterface::class);
+
+ $this->runner = new PendingDataflowRunner($this->em, $this->repository, $this->registry, $this->dispatcher);
+ }
+
+ public function testRunPendingDataflows()
+ {
+ $now = new \DateTime();
+ $job1 = (new Job())
+ ->setStatus(Job::STATUS_PENDING)
+ ->setDataflowType($type1 = 'type1')
+ ->setOptions($options1 = ['option1' => 'value1'])
+ ;
+ $job2 = (new Job())
+ ->setStatus(Job::STATUS_PENDING)
+ ->setDataflowType($type2 = 'type2')
+ ->setOptions($options2 = ['option2' => 'value2'])
+ ;
+
+ $this->repository
+ ->expects($this->exactly(3))
+ ->method('findNextPendingDataflow')
+ ->willReturnOnConsecutiveCalls($job1, $job2, null)
+ ;
+
+ $this->dispatcher
+ ->expects($this->exactly(4))
+ ->method('dispatch')
+ ->withConsecutive(
+ [
+ Events::BEFORE_PROCESSING,
+ $this->callback(function (ProcessingEvent $event) use ($job1) {
+ return $event->getJob() === $job1;
+ })
+ ],
+ [
+ Events::AFTER_PROCESSING,
+ $this->callback(function (ProcessingEvent $event) use ($job1) {
+ return $event->getJob() === $job1;
+ })
+ ],
+ [
+ Events::BEFORE_PROCESSING,
+ $this->callback(function (ProcessingEvent $event) use ($job2) {
+ return $event->getJob() === $job2;
+ })
+ ],
+ [
+ Events::AFTER_PROCESSING,
+ $this->callback(function (ProcessingEvent $event) use ($job2) {
+ return $event->getJob() === $job2;
+ })
+ ]
+ )
+ ;
+
+ $dataflowType1 = $this->createMock(DataflowTypeInterface::class);
+ $dataflowType2 = $this->createMock(DataflowTypeInterface::class);
+
+ $this->registry
+ ->expects($this->exactly(2))
+ ->method('getDataflowType')
+ ->withConsecutive([$type1], [$type2])
+ ->willReturnOnConsecutiveCalls($dataflowType1, $dataflowType2)
+ ;
+
+ $bag1 = new \SplObjectStorage();
+ $bag1->attach(new \Exception('message1'));
+ $bag2 = new \SplObjectStorage();
+ $bag2->attach(new \Exception('message2'));
+
+ $result1 = new Result('name', new \DateTime(), $end1 = new \DateTime(), $count1 = 10, $bag1);
+ $result2 = new Result('name', new \DateTime(), $end2 = new \DateTime(), $count2 = 20, $bag2);
+
+ $dataflowType1
+ ->expects($this->once())
+ ->method('process')
+ ->with($options1)
+ ->willReturn($result1)
+ ;
+ $dataflowType2
+ ->expects($this->once())
+ ->method('process')
+ ->with($options2)
+ ->willReturn($result2)
+ ;
+
+ $this->em
+ ->expects($this->exactly(4))
+ ->method('flush')
+ ;
+
+ $this->runner->runPendingDataflows();
+
+ $this->assertGreaterThanOrEqual($now, $job1->getStartTime());
+ $this->assertSame(Job::STATUS_COMPLETED, $job1->getStatus());
+ $this->assertSame($end1, $job1->getEndTime());
+ $this->assertSame($count1 - count($bag1), $job1->getCount());
+
+ $this->assertGreaterThanOrEqual($now, $job2->getStartTime());
+ $this->assertSame(Job::STATUS_COMPLETED, $job2->getStatus());
+ $this->assertSame($end2, $job2->getEndTime());
+ $this->assertSame($count2 - count($bag2), $job2->getCount());
+ }
+}
diff --git a/Tests/Validator/Constraints/FrequencyValidatorTest.php b/Tests/Validator/Constraints/FrequencyValidatorTest.php
new file mode 100644
index 0000000..4db5881
--- /dev/null
+++ b/Tests/Validator/Constraints/FrequencyValidatorTest.php
@@ -0,0 +1,75 @@
+validator->validate($value, new Frequency());
+
+ $this->assertNoViolation();
+ }
+
+ public function getValidValues()
+ {
+ return [
+ ['3 days'],
+ ['2 weeks'],
+ ['1 month'],
+ ['first sunday'],
+ ];
+ }
+
+ public function testInvalidValue()
+ {
+ $constraint = new Frequency([
+ 'invalidMessage' => 'testMessage',
+ ]);
+
+ $this->validator->validate('wrong value', $constraint);
+
+ $this->buildViolation('testMessage')
+ ->setParameter('{{ string }}', 'wrong value')
+ ->assertRaised()
+ ;
+ }
+
+ /**
+ * @dataProvider getNegativeValues
+ */
+ public function testNegativeIntervals($value)
+ {
+ $constraint = new Frequency([
+ 'negativeIntervalMessage' => 'testMessage',
+ ]);
+
+ $this->validator->validate($value, $constraint);
+
+ $this->buildViolation('testMessage')
+ ->setParameter('{{ string }}', $value)
+ ->assertRaised()
+ ;
+ }
+
+ public function getNegativeValues()
+ {
+ return [
+ ['now'],
+ ['-1 day'],
+ ['last month'],
+ ];
+ }
+}
diff --git a/Tests/bootstrap.php b/Tests/bootstrap.php
new file mode 100644
index 0000000..3862c76
--- /dev/null
+++ b/Tests/bootstrap.php
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ ./src/
+
+ Tests/
+ vendor/
+
+
+
+
+
diff --git a/src/CodeRhapsodieDataflowBundle.php b/src/CodeRhapsodieDataflowBundle.php
new file mode 100644
index 0000000..e063e25
--- /dev/null
+++ b/src/CodeRhapsodieDataflowBundle.php
@@ -0,0 +1,25 @@
+addCompilerPass(new DataflowTypeCompilerPass());
+ }
+}
diff --git a/src/Command/AddScheduledDataflowCommand.php b/src/Command/AddScheduledDataflowCommand.php
new file mode 100644
index 0000000..e86ee44
--- /dev/null
+++ b/src/Command/AddScheduledDataflowCommand.php
@@ -0,0 +1,132 @@
+registry = $registry;
+ $this->scheduledDataflowRepository = $scheduledDataflowRepository;
+ $this->validator = $validator;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('Create a scheduled dataflow')
+ ->setHelp('The %command.name% allows you to create a new scheduled dataflow.')
+ ->addOption('label', null, InputOption::VALUE_REQUIRED, 'Label of the scheduled dataflow')
+ ->addOption('type', null, InputOption::VALUE_REQUIRED, 'Type of the scheduled dataflow (FQCN)')
+ ->addOption('options', null, InputOption::VALUE_OPTIONAL, 'Options of the scheduled dataflow (ex: {"option1": "value1", "option2": "value2"})')
+ ->addOption('frequency', null, InputOption::VALUE_REQUIRED, 'Frequency of the scheduled dataflow')
+ ->addOption('first_run', null, InputOption::VALUE_REQUIRED, 'Date for the first run of the scheduled dataflow (Y-m-d H:i:s)')
+ ->addOption('enabled', null, InputOption::VALUE_REQUIRED, 'State of the scheduled dataflow');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $choices = [];
+ $typeMapping = [];
+ foreach ($this->registry->listDataflowTypes() as $fqcn => $dataflowType) {
+ $choices[] = $dataflowType->getLabel();
+ $typeMapping[$dataflowType->getLabel()] = $fqcn;
+ }
+
+ $io = new SymfonyStyle($input, $output);
+ $label = $input->getOption('label');
+ if (!$label) {
+ $label = $io->ask('What is the scheduled dataflow label?');
+ }
+ $type = $input->getOption('type');
+ if (!$type) {
+ $selectedType = $io->choice('What is the scheduled dataflow type?', $choices);
+ $type = $typeMapping[$selectedType];
+ }
+ $options = $input->getOption('options');
+ if (!$options) {
+ $options = $io->ask('What are the launch options for the scheduled dataflow? (ex: {"option1": "value1", "option2": "value2"})', json_encode([]));
+ }
+ $frequency = $input->getOption('frequency');
+ if (!$frequency) {
+ $frequency = $io->choice('What is the frequency for the scheduled dataflow?', ScheduledDataflow::AVAILABLE_FREQUENCIES);
+ }
+ $firstRun = $input->getOption('first_run');
+ if (!$firstRun) {
+ $firstRun = $io->ask('When is the first execution of the scheduled dataflow (format: Y-m-d H:i:s)?');
+ }
+ $enabled = $input->getOption('enabled');
+ if (!$enabled) {
+ $enabled = $io->confirm('Enable the scheduled dataflow?');
+ }
+
+ try {
+ $newScheduledDataflow = $this->createEntityFromArray([
+ 'label' => $label,
+ 'type' => $type,
+ 'options' => $options,
+ 'frequency' => $frequency,
+ 'first_run' => $firstRun,
+ 'enabled' => $enabled,
+ ]);
+
+ $errors = $this->validator->validate($newScheduledDataflow);
+ if (count($errors) > 0) {
+ $io->error((string) $errors);
+
+ return 2;
+ }
+
+ $this->scheduledDataflowRepository->save($newScheduledDataflow);
+ $io->success(sprintf('New scheduled dataflow "%s" (id:%d) was created successfully.', $newScheduledDataflow->getLabel(), $newScheduledDataflow->getId()));
+
+ return 0;
+ } catch (\Exception $e) {
+ $io->error(sprintf('An error occured when creating new scheduled dataflow : "%s".', $e->getMessage()));
+
+ return 1;
+ }
+ }
+
+ private function createEntityFromArray(array $input): ScheduledDataflow
+ {
+ $scheduledDataflow = new ScheduledDataflow();
+ $scheduledDataflow->setLabel($input['label']);
+ $scheduledDataflow->setDataflowType($input['type']);
+ $scheduledDataflow->setOptions(json_decode($input['options'], true));
+ $scheduledDataflow->setFrequency($input['frequency']);
+ $scheduledDataflow->setNext(new \DateTimeImmutable($input['first_run']));
+ $scheduledDataflow->setEnabled($input['enabled']);
+
+ return $scheduledDataflow;
+ }
+}
diff --git a/src/Command/ChangeScheduleStatusCommand.php b/src/Command/ChangeScheduleStatusCommand.php
new file mode 100644
index 0000000..3dd39a1
--- /dev/null
+++ b/src/Command/ChangeScheduleStatusCommand.php
@@ -0,0 +1,81 @@
+scheduledDataflowRepository = $scheduledDataflowRepository;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('Change schedule status')
+ ->setHelp('The %command.name% command able you to change schedule status.')
+ ->addArgument('schedule-id', InputArgument::REQUIRED, 'Id of the schedule')
+ ->addOption('enable', null, InputOption::VALUE_NONE, 'Enable the schedule')
+ ->addOption('disable', null, InputOption::VALUE_NONE, 'Disable the schedule');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output);
+ /** @var ScheduledDataflow|null $schedule */
+ $schedule = $this->scheduledDataflowRepository->find($input->getArgument('schedule-id'));
+
+ if (!$schedule) {
+ $io->error(sprintf('Cannot find scheduled dataflow with id "%d".', $input->getArgument('schedule-id')));
+
+ return 1;
+ }
+
+ if ($input->getOption('enable') && $input->getOption('disable')) {
+ $io->error('You cannot pass enable and disable options in the same time.');
+
+ return 2;
+ }
+ if (!$input->getOption('enable') && !$input->getOption('disable')) {
+ $io->error('You must pass enable or disable option.');
+
+ return 3;
+ }
+
+ try {
+ $schedule->setEnabled($input->getOption('enable'));
+ $this->scheduledDataflowRepository->save($schedule);
+ $io->success(sprintf('Schedule with id "%s" has been successfully updated.', $schedule->getId()));
+ } catch (\Exception $e) {
+ $io->error(sprintf('An error occured when changing schedule status : "%s".', $e->getMessage()));
+
+ return 4;
+ }
+
+ return 0;
+ }
+}
diff --git a/src/Command/ExecuteDataflowCommand.php b/src/Command/ExecuteDataflowCommand.php
new file mode 100644
index 0000000..e8a39ad
--- /dev/null
+++ b/src/Command/ExecuteDataflowCommand.php
@@ -0,0 +1,66 @@
+registry = $registry;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('Runs one dataflow type with provided options')
+ ->setHelp(<<<'EOF'
+The %command.name% command runs one dataflow with the provided options.
+
+ php %command.full_name% App\Dataflow\MyDataflow '{"option1": "value1", "option2": "value2"}'
+EOF
+ )
+ ->addArgument('fqcn', InputArgument::REQUIRED, 'FQCN or alias of the dataflow type')
+ ->addArgument('options', InputArgument::OPTIONAL, 'Options for the dataflow type as a json string', '[]')
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $fqcnOrAlias = $input->getArgument('fqcn');
+ $options = json_decode($input->getArgument('options'), true);
+
+ $dataflowType = $this->registry->getDataflowType($fqcnOrAlias);
+ $result = $dataflowType->process($options);
+
+ $output->writeln('Executed: '.$result->getName());
+ $output->writeln('Start time: '.$result->getStartTime()->format('Y/m/d H:i:s'));
+ $output->writeln('End time: '.$result->getEndTime()->format('Y/m/d H:i:s'));
+ $output->writeln('Success: '.$result->getSuccessCount());
+
+ return 0;
+ }
+}
diff --git a/src/Command/JobShowCommand.php b/src/Command/JobShowCommand.php
new file mode 100644
index 0000000..6431870
--- /dev/null
+++ b/src/Command/JobShowCommand.php
@@ -0,0 +1,113 @@
+ 'Pending',
+ Job::STATUS_RUNNING => 'Running',
+ Job::STATUS_COMPLETED => 'Completed',
+ ];
+
+ protected static $defaultName = 'code-rhapsodie:dataflow:job:show';
+
+ /** @var JobRepository */
+ private $jobRepository;
+
+ public function __construct(JobRepository $jobRepository)
+ {
+ parent::__construct();
+
+ $this->jobRepository = $jobRepository;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('Display job details for schedule or specific job')
+ ->setHelp('The %command.name% display job details for schedule or specific job.')
+ ->addOption('job-id', null, InputOption::VALUE_REQUIRED, 'Id of the job to get details')
+ ->addOption('schedule-id', null, InputOption::VALUE_REQUIRED, 'Id of schedule for last execution details')
+ ->addOption('details', null, InputOption::VALUE_NONE, 'Display full details');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output);
+
+ $jobId = (int) $input->getOption('job-id');
+ $scheduleId = (int) $input->getOption('schedule-id');
+ if ($jobId && $scheduleId) {
+ $io->error('You must use `job-id` OR `schedule-id` option, not the 2 in the same time.');
+
+ return 1;
+ }
+
+ if ($scheduleId) {
+ $job = $this->jobRepository->findLastForDataflowId($scheduleId);
+ } elseif ($jobId) {
+ $job = $this->jobRepository->find($jobId);
+ } else {
+ $io->error('You must pass `job-id` or `schedule-id` option.');
+
+ return 2;
+ }
+
+ if (null === $job) {
+ $io->error('Cannot find job :/');
+
+ return 3;
+ }
+
+ /** @var Job $job */
+ $display = [
+ ['Job id', $job->getId()],
+ ['Label', $job->getLabel()],
+ ['Requested at', $job->getRequestedDate()->format('Y-m-d H:i:s')],
+ ['Started at', $job->getStartTime() ? $job->getStartTime()->format('Y-m-d H:i:s') : '-'],
+ ['Ended at', $job->getEndTime() ? $job->getEndTime()->format('Y-m-d H:i:s') : '-'],
+ ['Object number', $job->getCount()],
+ ['Errors', count($job->getExceptions())],
+ ['Status', $this->translateStatus($job->getStatus())],
+ ];
+ if ($input->getOption('details')) {
+ $display[] = ['Type', $job->getDataflowType()];
+ $display[] = ['Options', json_encode($job->getOptions())];
+ $io->section('Summary');
+ }
+
+ $io->table(['Field', 'Value'], $display);
+ if ($input->getOption('details')) {
+ $io->section('Exceptions');
+ $exceptions = array_map(function (string $exception) {
+ return substr($exception, 0, 900).'…';
+ }, $job->getExceptions());
+
+ $io->write($exceptions);
+ }
+
+ return 0;
+ }
+
+ private function translateStatus(int $status): string
+ {
+ return self::STATUS_MAPPING[$status] ?? 'Unknown status';
+ }
+}
diff --git a/src/Command/RunPendingDataflowsCommand.php b/src/Command/RunPendingDataflowsCommand.php
new file mode 100644
index 0000000..6de16c1
--- /dev/null
+++ b/src/Command/RunPendingDataflowsCommand.php
@@ -0,0 +1,69 @@
+manager = $manager;
+ $this->runner = $runner;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('Runs dataflows based on the scheduled defined in the UI.')
+ ->setHelp(<<<'EOF'
+The %command.name% command runs dataflows according to the schedule defined in the UI by the user.
+EOF
+ )
+ ;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ if (!$this->lock()) {
+ $output->writeln('The command is already running in another process.');
+
+ return 0;
+ }
+
+ $this->manager->createJobsFromScheduledDataflows();
+ $this->runner->runPendingDataflows();
+
+ $this->release();
+
+ return 0;
+ }
+}
diff --git a/src/Command/ScheduleListCommand.php b/src/Command/ScheduleListCommand.php
new file mode 100644
index 0000000..7ff2178
--- /dev/null
+++ b/src/Command/ScheduleListCommand.php
@@ -0,0 +1,59 @@
+scheduledDataflowRepository = $scheduledDataflowRepository;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function configure()
+ {
+ $this
+ ->setDescription('List scheduled dataflows')
+ ->setHelp('The %command.name% lists all scheduled dataflows.');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function execute(InputInterface $input, OutputInterface $output)
+ {
+ $io = new SymfonyStyle($input, $output);
+ $display = [];
+ $schedules = $this->scheduledDataflowRepository->listAllOrderedByLabel();
+ foreach ($schedules as $schedule) {
+ $display[] = [
+ $schedule['id'],
+ $schedule['label'],
+ $schedule['enabled'] ? 'yes' : 'no',
+ $schedule['startTime'] ? (new \DateTime($schedule['startTime']))->format('Y-m-d H:i:s') : '-',
+ $schedule['next'] ? $schedule['next']->format('Y-m-d H:i:s') : '-',
+ ];
+ }
+
+ $io->table(['id', 'label', 'enabled?', 'last execution', 'next execution'], $display);
+
+ return 0;
+ }
+}
diff --git a/src/DataflowType/AbstractDataflowType.php b/src/DataflowType/AbstractDataflowType.php
new file mode 100644
index 0000000..c493287
--- /dev/null
+++ b/src/DataflowType/AbstractDataflowType.php
@@ -0,0 +1,36 @@
+configureOptions($optionsResolver);
+ $options = $optionsResolver->resolve($options);
+
+ $builder = (new DataflowBuilder())
+ ->setName($this->getLabel())
+ ;
+ $this->buildDataflow($builder, $options);
+ $dataflow = $builder->getDataflow();
+
+ return $dataflow->process();
+ }
+
+ protected function configureOptions(OptionsResolver $optionsResolver): void
+ {
+ }
+
+ abstract protected function buildDataflow(DataflowBuilder $builder, array $options): void;
+}
diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php
new file mode 100644
index 0000000..43e5b89
--- /dev/null
+++ b/src/DataflowType/Dataflow/Dataflow.php
@@ -0,0 +1,111 @@
+reader = $reader;
+ $this->name = $name;
+ }
+
+ /**
+ * @param callable $step
+ *
+ * @return $this
+ */
+ public function addStep(callable $step): self
+ {
+ $this->steps[] = $step;
+
+ return $this;
+ }
+
+ /**
+ * @param WriterInterface $writer
+ *
+ * @return $this
+ */
+ public function addWriter(WriterInterface $writer): self
+ {
+ $this->writers[] = $writer;
+
+ return $this;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function process(): Result
+ {
+ $count = 0;
+ $exceptions = new \SplObjectStorage();
+ $startTime = new \DateTime();
+
+ SignalHandler::create(['SIGTERM', 'SIGINT'], function () {
+ throw new InterruptedProcessingException();
+ });
+
+ foreach ($this->writers as $writer) {
+ $writer->prepare();
+ }
+
+ foreach ($this->reader as $index => $item) {
+ try {
+ $this->processItem($item);
+ } catch (\Exception $e) {
+ $exceptions->attach($e, $index);
+ }
+
+ ++$count;
+ }
+
+ foreach ($this->writers as $writer) {
+ $writer->finish();
+ }
+
+ return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
+ }
+
+ /**
+ * @param mixed $item
+ */
+ private function processItem($item): void
+ {
+ foreach ($this->steps as $step) {
+ $item = call_user_func($step, $item);
+
+ if (false === $item) {
+ return;
+ }
+ }
+
+ foreach ($this->writers as $writer) {
+ $writer->write($item);
+ }
+ }
+}
diff --git a/src/DataflowType/Dataflow/DataflowInterface.php b/src/DataflowType/Dataflow/DataflowInterface.php
new file mode 100644
index 0000000..89c3cba
--- /dev/null
+++ b/src/DataflowType/Dataflow/DataflowInterface.php
@@ -0,0 +1,20 @@
+name = $name;
+
+ return $this;
+ }
+
+ public function setReader(iterable $reader): self
+ {
+ $this->reader = $reader;
+
+ return $this;
+ }
+
+ public function addStep(callable $step, int $priority = 0): self
+ {
+ $this->steps[$priority][] = $step;
+
+ return $this;
+ }
+
+ public function addWriter(WriterInterface $writer): self
+ {
+ $this->writers[] = $writer;
+
+ return $this;
+ }
+
+ public function getDataflow(): DataflowInterface
+ {
+ $dataflow = new Dataflow($this->reader, $this->name);
+
+ krsort($this->steps);
+ foreach ($this->steps as $stepArray) {
+ foreach ($stepArray as $step) {
+ $dataflow->addStep($step);
+ }
+ }
+
+ foreach ($this->writers as $writer) {
+ $dataflow->addWriter($writer);
+ }
+
+ return $dataflow;
+ }
+}
diff --git a/src/DataflowType/DataflowTypeInterface.php b/src/DataflowType/DataflowTypeInterface.php
new file mode 100644
index 0000000..079132c
--- /dev/null
+++ b/src/DataflowType/DataflowTypeInterface.php
@@ -0,0 +1,14 @@
+name = $name;
+ $this->startTime = $startTime;
+ $this->endTime = $endTime;
+ $this->elapsed = $startTime->diff($endTime);
+ $this->totalProcessedCount = $totalCount;
+ $this->errorCount = count($exceptions);
+ $this->successCount = $totalCount - $this->errorCount;
+ $this->exceptions = $exceptions;
+ }
+
+ /**
+ * @return string
+ */
+ public function getName(): string
+ {
+ return $this->name;
+ }
+
+ /**
+ * @return \DateTimeInterface
+ */
+ public function getStartTime(): \DateTimeInterface
+ {
+ return $this->startTime;
+ }
+
+ /**
+ * @return \DateTimeInterface
+ */
+ public function getEndTime(): \DateTimeInterface
+ {
+ return $this->endTime;
+ }
+
+ /**
+ * @return \DateInterval
+ */
+ public function getElapsed(): \DateInterval
+ {
+ return $this->elapsed;
+ }
+
+ /**
+ * @return int
+ */
+ public function getErrorCount(): int
+ {
+ return $this->errorCount;
+ }
+
+ /**
+ * @return int
+ */
+ public function getSuccessCount(): int
+ {
+ return $this->successCount;
+ }
+
+ /**
+ * @return int
+ */
+ public function getTotalProcessedCount(): int
+ {
+ return $this->totalProcessedCount;
+ }
+
+ /**
+ * @return bool
+ */
+ public function hasErrors(): bool
+ {
+ return $this->errorCount > 0;
+ }
+
+ /**
+ * @return \SplObjectStorage
+ */
+ public function getExceptions(): \SplObjectStorage
+ {
+ return $this->exceptions;
+ }
+}
diff --git a/src/DataflowType/Writer/PortWriterAdapter.php b/src/DataflowType/Writer/PortWriterAdapter.php
new file mode 100644
index 0000000..ab3aa31
--- /dev/null
+++ b/src/DataflowType/Writer/PortWriterAdapter.php
@@ -0,0 +1,31 @@
+writer = $writer;
+ }
+
+ public function prepare()
+ {
+ $this->writer->prepare();
+ }
+
+ public function write($item)
+ {
+ $this->writer->writeItem((array) $item);
+ }
+
+ public function finish()
+ {
+ $this->writer->finish();
+ }
+}
diff --git a/src/DataflowType/Writer/WriterInterface.php b/src/DataflowType/Writer/WriterInterface.php
new file mode 100644
index 0000000..8433516
--- /dev/null
+++ b/src/DataflowType/Writer/WriterInterface.php
@@ -0,0 +1,14 @@
+load('services.yaml');
+
+ $container
+ ->registerForAutoconfiguration(DataflowTypeInterface::class)
+ ->addTag('coderhapsodie.dataflow.type')
+ ;
+ }
+}
diff --git a/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
new file mode 100644
index 0000000..79e591e
--- /dev/null
+++ b/src/DependencyInjection/Compiler/DataflowTypeCompilerPass.php
@@ -0,0 +1,31 @@
+has(DataflowTypeRegistry::class)) {
+ return;
+ }
+
+ $registry = $container->findDefinition(DataflowTypeRegistry::class);
+ foreach ($container->findTaggedServiceIds('coderhapsodie.dataflow.type') as $id => $tags) {
+ $registry->addMethodCall('registerDataflowType', [new Reference($id)]);
+ }
+ }
+}
diff --git a/src/Entity/Job.php b/src/Entity/Job.php
new file mode 100644
index 0000000..4816c5f
--- /dev/null
+++ b/src/Entity/Job.php
@@ -0,0 +1,325 @@
+setStatus(static::STATUS_PENDING)
+ ->setDataflowType($scheduled->getDataflowType())
+ ->setOptions($scheduled->getOptions())
+ ->setRequestedDate(clone $scheduled->getNext())
+ ->setLabel($scheduled->getLabel())
+ ->setScheduledDataflow($scheduled)
+ ;
+ }
+
+ /**
+ * @return int
+ */
+ public function getId(): int
+ {
+ return $this->id;
+ }
+
+ /**
+ * @return int
+ */
+ public function getStatus(): int
+ {
+ return $this->status;
+ }
+
+ /**
+ * @param int $status
+ *
+ * @return Job
+ */
+ public function setStatus(int $status): Job
+ {
+ $this->status = $status;
+
+ return $this;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getLabel(): ?string
+ {
+ return $this->label;
+ }
+
+ /**
+ * @param null|string $label
+ *
+ * @return Job
+ */
+ public function setLabel(?string $label): Job
+ {
+ $this->label = $label;
+
+ return $this;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getDataflowType(): ?string
+ {
+ return $this->dataflowType;
+ }
+
+ /**
+ * @param null|string $dataflowType
+ *
+ * @return Job
+ */
+ public function setDataflowType(?string $dataflowType): Job
+ {
+ $this->dataflowType = $dataflowType;
+
+ return $this;
+ }
+
+ /**
+ * @return array|null
+ */
+ public function getOptions(): ?array
+ {
+ return $this->options;
+ }
+
+ /**
+ * @param array|null $options
+ *
+ * @return Job
+ */
+ public function setOptions(?array $options): Job
+ {
+ $this->options = $options;
+
+ return $this;
+ }
+
+ /**
+ * @return \DateTimeInterface|null
+ */
+ public function getRequestedDate(): ?\DateTimeInterface
+ {
+ return $this->requestedDate;
+ }
+
+ /**
+ * @param \DateTimeInterface|null $requestedDate
+ *
+ * @return Job
+ */
+ public function setRequestedDate(?\DateTimeInterface $requestedDate): Job
+ {
+ $this->requestedDate = $requestedDate;
+
+ return $this;
+ }
+
+ /**
+ * @return ScheduledDataflow|null
+ */
+ public function getScheduledDataflow(): ?ScheduledDataflow
+ {
+ return $this->scheduledDataflow;
+ }
+
+ /**
+ * @param ScheduledDataflow|null $scheduledDataflow
+ *
+ * @return Job
+ */
+ public function setScheduledDataflow(?ScheduledDataflow $scheduledDataflow): Job
+ {
+ $this->scheduledDataflow = $scheduledDataflow;
+
+ return $this;
+ }
+
+ /**
+ * @return int|null
+ */
+ public function getCount(): ?int
+ {
+ return $this->count;
+ }
+
+ /**
+ * @param int|null $count
+ *
+ * @return Job
+ */
+ public function setCount(?int $count): Job
+ {
+ $this->count = $count;
+
+ return $this;
+ }
+
+ /**
+ * @return array|null
+ */
+ public function getExceptions(): ?array
+ {
+ return $this->exceptions;
+ }
+
+ /**
+ * @param array|null $exceptions
+ *
+ * @return Job
+ */
+ public function setExceptions(?array $exceptions): Job
+ {
+ $this->exceptions = $exceptions;
+
+ return $this;
+ }
+
+ /**
+ * @return \DateTimeInterface|null
+ */
+ public function getStartTime(): ?\DateTimeInterface
+ {
+ return $this->startTime;
+ }
+
+ /**
+ * @param \DateTimeInterface|null $startTime
+ *
+ * @return Job
+ */
+ public function setStartTime(?\DateTimeInterface $startTime): Job
+ {
+ $this->startTime = $startTime;
+
+ return $this;
+ }
+
+ /**
+ * @return \DateTimeInterface|null
+ */
+ public function getEndTime(): ?\DateTimeInterface
+ {
+ return $this->endTime;
+ }
+
+ /**
+ * @param \DateTimeInterface|null $endTime
+ *
+ * @return Job
+ */
+ public function setEndTime(?\DateTimeInterface $endTime): Job
+ {
+ $this->endTime = $endTime;
+
+ return $this;
+ }
+}
diff --git a/src/Entity/ScheduledDataflow.php b/src/Entity/ScheduledDataflow.php
new file mode 100644
index 0000000..b28aae4
--- /dev/null
+++ b/src/Entity/ScheduledDataflow.php
@@ -0,0 +1,213 @@
+id;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getLabel(): ?string
+ {
+ return $this->label;
+ }
+
+ /**
+ * @param null|string $label
+ *
+ * @return ScheduledDataflow
+ */
+ public function setLabel(?string $label): ScheduledDataflow
+ {
+ $this->label = $label;
+
+ return $this;
+ }
+
+ /**
+ * @return null|string
+ */
+ public function getDataflowType(): ?string
+ {
+ return $this->dataflowType;
+ }
+
+ /**
+ * @param null|string $dataflowType
+ *
+ * @return ScheduledDataflow
+ */
+ public function setDataflowType(?string $dataflowType): ScheduledDataflow
+ {
+ $this->dataflowType = $dataflowType;
+
+ return $this;
+ }
+
+ /**
+ * @return array|null
+ */
+ public function getOptions(): ?array
+ {
+ return $this->options;
+ }
+
+ /**
+ * @param array|null $options
+ *
+ * @return ScheduledDataflow
+ */
+ public function setOptions(?array $options): ScheduledDataflow
+ {
+ $this->options = $options;
+
+ return $this;
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getFrequency(): ?string
+ {
+ return $this->frequency;
+ }
+
+ /**
+ * @param string|null $frequency
+ *
+ * @return ScheduledDataflow
+ */
+ public function setFrequency(?string $frequency): ScheduledDataflow
+ {
+ $this->frequency = $frequency;
+
+ return $this;
+ }
+
+ /**
+ * @return \DateTimeInterface|null
+ */
+ public function getNext(): ?\DateTimeInterface
+ {
+ return $this->next;
+ }
+
+ /**
+ * @param \DateTimeInterface|null $next
+ *
+ * @return ScheduledDataflow
+ */
+ public function setNext(?\DateTimeInterface $next): ScheduledDataflow
+ {
+ $this->next = $next;
+
+ return $this;
+ }
+
+ /**
+ * @return bool|null
+ */
+ public function getEnabled(): ?bool
+ {
+ return $this->enabled;
+ }
+
+ /**
+ * @param bool|null $enabled
+ *
+ * @return ScheduledDataflow
+ */
+ public function setEnabled(?bool $enabled): ScheduledDataflow
+ {
+ $this->enabled = $enabled;
+
+ return $this;
+ }
+}
diff --git a/src/Event/Events.php b/src/Event/Events.php
new file mode 100644
index 0000000..8b7a28b
--- /dev/null
+++ b/src/Event/Events.php
@@ -0,0 +1,11 @@
+job = $job;
+ }
+
+ /**
+ * @return Job
+ */
+ public function getJob(): Job
+ {
+ return $this->job;
+ }
+}
diff --git a/src/Exceptions/InterruptedProcessingException.php b/src/Exceptions/InterruptedProcessingException.php
new file mode 100644
index 0000000..9ce6263
--- /dev/null
+++ b/src/Exceptions/InterruptedProcessingException.php
@@ -0,0 +1,12 @@
+em = $em;
+ $this->scheduledDataflowRepository = $scheduledDataflowRepository;
+ $this->jobRepository = $jobRepository;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createJobsFromScheduledDataflows(): void
+ {
+ foreach ($this->scheduledDataflowRepository->findReadyToRun() as $scheduled) {
+ if (null !== $this->jobRepository->findPendingForScheduledDataflow($scheduled)) {
+ continue;
+ }
+
+ $this->createPendingForScheduled($scheduled);
+ $this->updateScheduledDataflowNext($scheduled);
+ }
+
+ $this->em->flush();
+ }
+
+ /**
+ * @param ScheduledDataflow $scheduled
+ */
+ private function updateScheduledDataflowNext(ScheduledDataflow $scheduled): void
+ {
+ $interval = \DateInterval::createFromDateString($scheduled->getFrequency());
+ $next = clone $scheduled->getNext();
+ $now = new \DateTime();
+
+ while ($next < $now) {
+ $next->add($interval);
+ }
+
+ $scheduled->setNext($next);
+ }
+
+ /**
+ * @param ScheduledDataflow $scheduled
+ */
+ private function createPendingForScheduled(ScheduledDataflow $scheduled): void
+ {
+ $this->em->persist(Job::createFromScheduledDataflow($scheduled));
+ }
+}
diff --git a/src/Manager/ScheduledDataflowManagerInterface.php b/src/Manager/ScheduledDataflowManagerInterface.php
new file mode 100644
index 0000000..290dfa3
--- /dev/null
+++ b/src/Manager/ScheduledDataflowManagerInterface.php
@@ -0,0 +1,16 @@
+fqcnRegistry[$fqcnOrAlias])) {
+ return $this->fqcnRegistry[$fqcnOrAlias];
+ }
+
+ if (isset($this->aliasesRegistry[$fqcnOrAlias])) {
+ return $this->aliasesRegistry[$fqcnOrAlias];
+ }
+
+ throw new UnknownDataflowTypeException();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function listDataflowTypes(): iterable
+ {
+ return $this->fqcnRegistry;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function registerDataflowType(DataflowTypeInterface $dataflowType): void
+ {
+ $this->fqcnRegistry[get_class($dataflowType)] = $dataflowType;
+ foreach ($dataflowType->getAliases() as $alias) {
+ $this->aliasesRegistry[$alias] = $dataflowType;
+ }
+ }
+}
diff --git a/src/Registry/DataflowTypeRegistryInterface.php b/src/Registry/DataflowTypeRegistryInterface.php
new file mode 100644
index 0000000..245f236
--- /dev/null
+++ b/src/Registry/DataflowTypeRegistryInterface.php
@@ -0,0 +1,36 @@
+findBy([
+ 'scheduledDataflow' => null,
+ 'status' => Job::STATUS_PENDING,
+ ]);
+ }
+
+ public function findPendingForScheduledDataflow(ScheduledDataflow $scheduled): ?Job
+ {
+ return $this->findOneBy([
+ 'scheduledDataflow' => $scheduled->getId(),
+ 'status' => Job::STATUS_PENDING,
+ ]);
+ }
+
+ public function findNextPendingDataflow(): ?Job
+ {
+ $criteria = (new Criteria())
+ ->where(Criteria::expr()->lte('requestedDate', new \DateTime()))
+ ->andWhere(Criteria::expr()->eq('status', Job::STATUS_PENDING))
+ ->orderBy(['requestedDate' => Criteria::ASC])
+ ->setMaxResults(1)
+ ;
+
+ return $this->matching($criteria)->first() ?: null;
+ }
+
+ public function findLastForDataflowId(int $dataflowId): ?Job
+ {
+ return $this->findOneBy(['scheduledDataflow' => $dataflowId], ['requestedDate' => 'desc']);
+ }
+
+ public function findLatests(): iterable
+ {
+ return $this->findBy([], ['requestedDate' => 'desc'], 20);
+ }
+
+ public function findForScheduled(int $id): iterable
+ {
+ return $this->findBy(['scheduledDataflow' => $id], ['requestedDate' => 'desc'], 20);
+ }
+
+ public function save(Job $job)
+ {
+ $this->_em->persist($job);
+ $this->_em->flush();
+ }
+}
diff --git a/src/Repository/ScheduledDataflowRepository.php b/src/Repository/ScheduledDataflowRepository.php
new file mode 100644
index 0000000..e9bc798
--- /dev/null
+++ b/src/Repository/ScheduledDataflowRepository.php
@@ -0,0 +1,61 @@
+where(Criteria::expr()->lte('next', new \DateTime()))
+ ->andWhere(Criteria::expr()->eq('enabled', 1))
+ ->orderBy(['next' => Criteria::ASC])
+ ;
+
+ return $this->matching($criteria);
+ }
+
+ public function findAllOrderedByLabel(): iterable
+ {
+ return $this->findBy([], ['label' => 'asc']);
+ }
+
+ public function listAllOrderedByLabel(): array
+ {
+ $query = $this->createQueryBuilder('w')
+ ->select('w.id', 'w.label', 'w.enabled', 'w.next', 'max(j.startTime) as startTime')
+ ->leftJoin('w.jobs', 'j')
+ ->orderBy('w.label', 'ASC')
+ ->groupBy('w.id');
+
+ return $query->getQuery()->execute();
+ }
+
+ public function save(ScheduledDataflow $scheduledDataflow)
+ {
+ $this->_em->persist($scheduledDataflow);
+ $this->_em->flush();
+ }
+
+ public function delete(int $id): void
+ {
+ $dataflow = $this->find($id);
+
+ $this->_em->remove($dataflow);
+ $this->_em->flush();
+ }
+}
diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml
new file mode 100644
index 0000000..38b3ec4
--- /dev/null
+++ b/src/Resources/config/services.yaml
@@ -0,0 +1,62 @@
+services:
+ _defaults:
+ public: false
+
+ CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry'
+ CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistry:
+
+ CodeRhapsodie\DataflowBundle\Command\AddScheduledDataflowCommand:
+ arguments:
+ $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
+ $scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
+ $validator: '@validator'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Command\ChangeScheduleStatusCommand:
+ arguments:
+ $scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Command\ExecuteDataflowCommand:
+ arguments:
+ $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Command\JobShowCommand:
+ arguments:
+ $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Command\RunPendingDataflowsCommand:
+ arguments:
+ $manager: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface'
+ $runner: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Command\ScheduleListCommand:
+ arguments:
+ $scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
+ tags: ['console.command']
+
+ CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository:
+ factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
+ arguments: ['CodeRhapsodie\DataflowBundle\Entity\ScheduledDataflow']
+
+ CodeRhapsodie\DataflowBundle\Repository\JobRepository:
+ factory: ['@doctrine.orm.default_entity_manager', 'getRepository']
+ arguments: ['CodeRhapsodie\DataflowBundle\Entity\Job']
+
+ CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManagerInterface: '@CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager'
+ CodeRhapsodie\DataflowBundle\Manager\ScheduledDataflowManager:
+ arguments:
+ $em: '@doctrine.orm.default_entity_manager'
+ $scheduledDataflowRepository: '@CodeRhapsodie\DataflowBundle\Repository\ScheduledDataflowRepository'
+ $jobRepository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
+
+ CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunnerInterface: '@CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner'
+ CodeRhapsodie\DataflowBundle\Runner\PendingDataflowRunner:
+ arguments:
+ $em: '@doctrine.orm.default_entity_manager'
+ $repository: '@CodeRhapsodie\DataflowBundle\Repository\JobRepository'
+ $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface'
+ $dispatcher: '@event_dispatcher'
diff --git a/src/Runner/PendingDataflowRunner.php b/src/Runner/PendingDataflowRunner.php
new file mode 100644
index 0000000..3aea999
--- /dev/null
+++ b/src/Runner/PendingDataflowRunner.php
@@ -0,0 +1,89 @@
+em = $em;
+ $this->repository = $repository;
+ $this->registry = $registry;
+ $this->dispatcher = $dispatcher;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function runPendingDataflows(): void
+ {
+ while (null !== ($job = $this->repository->findNextPendingDataflow())) {
+ $this->beforeProcessing($job);
+
+ $dataflowType = $this->registry->getDataflowType($job->getDataflowType());
+ $result = $dataflowType->process($job->getOptions());
+
+ $this->afterProcessing($job, $result);
+ }
+ }
+
+ /**
+ * @param Job $job
+ */
+ private function beforeProcessing(Job $job): void
+ {
+ $this->dispatcher->dispatch(Events::BEFORE_PROCESSING, new ProcessingEvent($job));
+
+ $job
+ ->setStatus(Job::STATUS_RUNNING)
+ ->setStartTime(new \DateTime())
+ ;
+ $this->em->flush();
+ }
+
+ /**
+ * @param Job $job
+ * @param Result $result
+ */
+ private function afterProcessing(Job $job, Result $result): void
+ {
+ $exceptions = [];
+ /** @var \Exception $exception */
+ foreach ($result->getExceptions() as $exception) {
+ $exceptions[] = (string) $exception;
+ }
+
+ $job
+ ->setEndTime($result->getEndTime())
+ ->setStatus(Job::STATUS_COMPLETED)
+ ->setCount($result->getSuccessCount())
+ ->setExceptions($exceptions)
+ ;
+ $this->em->flush();
+
+ $this->dispatcher->dispatch(Events::AFTER_PROCESSING, new ProcessingEvent($job));
+ }
+}
diff --git a/src/Runner/PendingDataflowRunnerInterface.php b/src/Runner/PendingDataflowRunnerInterface.php
new file mode 100644
index 0000000..d7cb6f5
--- /dev/null
+++ b/src/Runner/PendingDataflowRunnerInterface.php
@@ -0,0 +1,10 @@
+context->buildViolation($constraint->invalidMessage)
+ ->setParameter('{{ string }}', $value)
+ ->addViolation()
+ ;
+
+ return;
+ }
+
+ $now = new \DateTime();
+ $dt = clone $now;
+ $dt->add($interval);
+
+ if ($dt <= $now) {
+ $this->context->buildViolation($constraint->negativeIntervalMessage)
+ ->setParameter('{{ string }}', $value)
+ ->addViolation()
+ ;
+
+ return;
+ }
+ }
+}