diff --git a/docs/LAUNCH.md b/docs/LAUNCH.md index c8cfb0d8..2f042f54 100644 --- a/docs/LAUNCH.md +++ b/docs/LAUNCH.md @@ -45,6 +45,10 @@ Import from where it stopped. This option is not fetching new data from akeneo. It only processes the rest of the data in the temp table. +`--batch-after-fetch` + +Fetch all pages then start processing the batches. +This allows you to recover the import process by launching the command again with the `--continue` argument ### Logs diff --git a/src/Command/AbstractImportCommand.php b/src/Command/AbstractImportCommand.php index 10fbe181..f2c7be69 100644 --- a/src/Command/AbstractImportCommand.php +++ b/src/Command/AbstractImportCommand.php @@ -41,10 +41,14 @@ protected function configure(): void ->addOption('disable-batch', 'd', InputOption::VALUE_NONE, 'Disable batch processing') ->addOption('batch-size', 's', InputOption::VALUE_OPTIONAL, 'Batch Size', 100) ->addOption('max-concurrency', 'c', InputOption::VALUE_OPTIONAL, 'Max process concurrency', 5) + ->addOption('batch-after-fetch', 'a', InputOption::VALUE_OPTIONAL, 'Process batches as soon as data is arriving', true) ->addOption('filter', 'f', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Add filter') ; } + /** + * @throws CommandLockedException + */ protected function preExecute(): void { if (!$this->lock()) { diff --git a/src/Factory/PayloadFactory.php b/src/Factory/PayloadFactory.php index 2b266a7b..c1297fdc 100644 --- a/src/Factory/PayloadFactory.php +++ b/src/Factory/PayloadFactory.php @@ -4,6 +4,7 @@ namespace Synolia\SyliusAkeneoPlugin\Factory; +use const FILTER_VALIDATE_BOOLEAN; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface; @@ -42,11 +43,13 @@ private function createContext( $isBatchingAllowed = !($input->getOption('disable-batch') ?? true); $isParallelAllowed = $input->getOption('parallel') ?? false; + $batchAfterFetch = $input->getOption('batch-after-fetch') ?? false; $context ->setIsContinue($input->getOption('continue') ?? false) ->setAllowParallel($isParallelAllowed) ->setBatchingAllowed($isBatchingAllowed) + ->setProcessAsSoonAsPossible(filter_var($batchAfterFetch, FILTER_VALIDATE_BOOLEAN)) ->setBatchSize((int) $input->getOption('batch-size')) ->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency')) ->setFilters((array) ($input->getOption('filter') ?: [])) diff --git a/src/Manager/ProcessManager.php b/src/Manager/ProcessManager.php new file mode 100644 index 00000000..16272baf --- /dev/null +++ b/src/Manager/ProcessManager.php @@ -0,0 +1,66 @@ +processManager->setNumberOfParallelProcesses($numberOfParallelProcesses); + } + + public function addProcess(Process $process, callable $callback = null, array $env = []): ProcessManagerInterface + { + if ($this->instantProcessing) { + $this->processManager->addProcess($process, $callback, $env); + + return $this; + } + + $this->processes[] = $process; + + return $this; + } + + public function waitForAllProcesses(): void + { + $this->processManager->waitForAllProcesses(); + } + + public function startAll(): void + { + if ($this->instantProcessing) { + $this->waitForAllProcesses(); + + return; + } + + foreach ($this->processes as $process) { + $this->processManager->addProcess($process); + } + + $this->waitForAllProcesses(); + } + + public function setInstantProcessing(bool $instantProcessing): self + { + $this->instantProcessing = $instantProcessing; + + return $this; + } +} diff --git a/src/Manager/ProcessManagerInterface.php b/src/Manager/ProcessManagerInterface.php new file mode 100644 index 00000000..50bf44ac --- /dev/null +++ b/src/Manager/ProcessManagerInterface.php @@ -0,0 +1,20 @@ +maxRunningProcessQueueSize = $commandContext->getMaxRunningProcessQueueSize(); $this->verbosity = $commandContext->getVerbosity(); $this->isContinue = $commandContext->isContinue(); + $this->processAsSoonAsPossible = $commandContext->getProcessAsSoonAsPossible(); } } diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 4303cae5..73e25458 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -155,7 +155,8 @@ services: - !tagged_iterator { tag: !php/const Synolia\SyliusAkeneoPlugin\Processor\ProductAttributeValue\ReferenceEntity\ReferenceEntityAttributeValueProcessorInterface::TAG_ID, default_priority_method: getDefaultPriority } BluePsyduck\SymfonyProcessManager\ProcessManagerInterface: - class: BluePsyduck\SymfonyProcessManager\ProcessManager + class: Synolia\SyliusAkeneoPlugin\Manager\ProcessManager + BluePsyduck\SymfonyProcessManager\ProcessManager: ~ Synolia\SyliusAkeneoPlugin\Provider\Asset\AssetValueBuilderProviderInterface: class: Synolia\SyliusAkeneoPlugin\Provider\Asset\AssetValueBuilderProvider diff --git a/src/Task/AbstractProcessTask.php b/src/Task/AbstractProcessTask.php index d75b11ea..8cc9934b 100644 --- a/src/Task/AbstractProcessTask.php +++ b/src/Task/AbstractProcessTask.php @@ -7,16 +7,14 @@ use Akeneo\Pim\ApiClient\Pagination\Page; use Akeneo\Pim\ApiClient\Pagination\PageInterface; use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface; -use BluePsyduck\SymfonyProcessManager\ProcessManager; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\Result; use Doctrine\DBAL\Statement; use Doctrine\ORM\EntityManagerInterface; -use LogicException; use Psr\Log\LoggerInterface; use Symfony\Component\Process\Process; use Synolia\SyliusAkeneoPlugin\Logger\Messages; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Throwable; @@ -83,11 +81,6 @@ protected function batch( array $ids, ): void { if ($payload->allowParallel()) { - if (!$this->processManager instanceof ProcessManager) { - throw new LogicException('ProcessManager'); - } - $this->processManager->setNumberOfParallelProcesses($payload->getMaxRunningProcessQueueSize()); - $processArguments = [ 'php', 'bin/console', @@ -105,6 +98,9 @@ protected function batch( $isTtySupported = Process::isTtySupported(); $process->setTty($isTtySupported); $this->processManager->addProcess($process); + $this->logger->info('Added batch process', [ + 'ids' => $ids, + ]); return; } @@ -118,6 +114,9 @@ abstract protected function createBatchPayload(PipelinePayloadInterface $payload protected function process(PipelinePayloadInterface $initialPayload): void { + $this->processManager->setInstantProcessing($initialPayload->getProcessAsSoonAsPossible()); + $this->processManager->setNumberOfParallelProcesses($initialPayload->getMaxRunningProcessQueueSize()); + $this->logger->debug(self::class); $this->type = $initialPayload->getType(); $this->logger->notice(Messages::createOrUpdate($this->type)); @@ -145,7 +144,8 @@ protected function process(PipelinePayloadInterface $initialPayload): void $query = $this->prepareSelectBatchIdsQuery($initialPayload->getTmpTableName(), (int) $result['id'], $initialPayload->getBatchSize()); $queryResult = $query->executeQuery(); } - $this->processManager->waitForAllProcesses(); + + $this->processManager->startAll(); } catch (Throwable $throwable) { $this->logger->warning($throwable->getMessage()); @@ -165,6 +165,9 @@ protected function handle( PipelinePayloadInterface $payload, \Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface|\Akeneo\Pim\ApiClient\Pagination\PageInterface $handleType, ): void { + $this->processManager->setInstantProcessing($payload->getProcessAsSoonAsPossible()); + $this->processManager->setNumberOfParallelProcesses($payload->getMaxRunningProcessQueueSize()); + $count = 0; $ids = []; diff --git a/src/Task/Asset/ProcessAssetTask.php b/src/Task/Asset/ProcessAssetTask.php index c3c47c1e..26bf8d10 100644 --- a/src/Task/Asset/ProcessAssetTask.php +++ b/src/Task/Asset/ProcessAssetTask.php @@ -4,11 +4,11 @@ namespace Synolia\SyliusAkeneoPlugin\Task\Asset; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Synolia\SyliusAkeneoPlugin\Checker\EditionCheckerInterface; use Synolia\SyliusAkeneoPlugin\Logger\Messages; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\Asset\AssetPayload; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Task\AbstractProcessTask; diff --git a/src/Task/AssociationType/ProcessAssociationTypeTask.php b/src/Task/AssociationType/ProcessAssociationTypeTask.php index 040755b0..ee083a50 100644 --- a/src/Task/AssociationType/ProcessAssociationTypeTask.php +++ b/src/Task/AssociationType/ProcessAssociationTypeTask.php @@ -4,12 +4,12 @@ namespace Synolia\SyliusAkeneoPlugin\Task\AssociationType; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; use Synolia\SyliusAkeneoPlugin\Event\FilterEvent; use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\Association\AssociationTypePayload; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface; diff --git a/src/Task/Attribute/ProcessAttributeTask.php b/src/Task/Attribute/ProcessAttributeTask.php index 31fdbfbe..5778f12f 100644 --- a/src/Task/Attribute/ProcessAttributeTask.php +++ b/src/Task/Attribute/ProcessAttributeTask.php @@ -4,12 +4,12 @@ namespace Synolia\SyliusAkeneoPlugin\Task\Attribute; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; use Synolia\SyliusAkeneoPlugin\Event\FilterEvent; use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\Attribute\AttributePayload; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface; diff --git a/src/Task/Family/ProcessFamilyTask.php b/src/Task/Family/ProcessFamilyTask.php index 64ec3f27..9d4bd3dd 100644 --- a/src/Task/Family/ProcessFamilyTask.php +++ b/src/Task/Family/ProcessFamilyTask.php @@ -4,13 +4,13 @@ namespace Synolia\SyliusAkeneoPlugin\Task\Family; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; use Synolia\SyliusAkeneoPlugin\Event\FilterEvent; use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException; use Synolia\SyliusAkeneoPlugin\Logger\Messages; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\Family\FamilyPayload; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface; diff --git a/src/Task/Product/ProcessProductsTask.php b/src/Task/Product/ProcessProductsTask.php index ed80822d..e1460012 100644 --- a/src/Task/Product/ProcessProductsTask.php +++ b/src/Task/Product/ProcessProductsTask.php @@ -6,7 +6,6 @@ use Akeneo\Pim\ApiClient\Pagination\Page; use Akeneo\Pim\ApiClient\Pagination\PageInterface; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\DBAL\ParameterType; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; @@ -15,6 +14,7 @@ use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException; use Synolia\SyliusAkeneoPlugin\Filter\ProductFilterInterface; use Synolia\SyliusAkeneoPlugin\Logger\Messages; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Payload\Product\ProductPayload; use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface; diff --git a/src/Task/ProductModel/ProcessProductModelsTask.php b/src/Task/ProductModel/ProcessProductModelsTask.php index cb7e0a4f..c248c366 100644 --- a/src/Task/ProductModel/ProcessProductModelsTask.php +++ b/src/Task/ProductModel/ProcessProductModelsTask.php @@ -4,7 +4,6 @@ namespace Synolia\SyliusAkeneoPlugin\Task\ProductModel; -use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface; use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -12,10 +11,12 @@ use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException; use Synolia\SyliusAkeneoPlugin\Filter\ProductFilter; use Synolia\SyliusAkeneoPlugin\Logger\Messages; +use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface; use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface; use Synolia\SyliusAkeneoPlugin\Payload\ProductModel\ProductModelPayload; use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface; use Synolia\SyliusAkeneoPlugin\Task\AbstractProcessTask; +use Throwable; final class ProcessProductModelsTask extends AbstractProcessTask { @@ -34,6 +35,8 @@ public function __construct( /** * @param ProductModelPayload $payload + * + * @throws Throwable */ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInterface { @@ -66,7 +69,8 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte ); $this->handle($payload, $resources); - $this->processManager->waitForAllProcesses(); + + $this->processManager->startAll(); return $payload; }