Skip to content

Commit

Permalink
Fetch all pages then start processing the batches
Browse files Browse the repository at this point in the history
  • Loading branch information
TheGrimmChester committed Nov 1, 2023
1 parent c4917ac commit d83b159
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 17 deletions.
4 changes: 4 additions & 0 deletions docs/LAUNCH.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Import from where it stopped.
This option is not fetching new data from akeneo.
It only processes the rest of the data in the temp table.

`--batch-after-fetch`

Fetch all pages then start processing the batches.
This allows you to recover the import process by launching the command again with the `--continue` argument

### Logs

Expand Down
4 changes: 4 additions & 0 deletions src/Command/AbstractImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ protected function configure(): void
->addOption('disable-batch', 'd', InputOption::VALUE_NONE, 'Disable batch processing')
->addOption('batch-size', 's', InputOption::VALUE_OPTIONAL, 'Batch Size', 100)
->addOption('max-concurrency', 'c', InputOption::VALUE_OPTIONAL, 'Max process concurrency', 5)
->addOption('batch-after-fetch', 'a', InputOption::VALUE_OPTIONAL, 'Process batches as soon as data is arriving', true)
->addOption('filter', 'f', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Add filter')
;
}

/**
* @throws CommandLockedException
*/
protected function preExecute(): void
{
if (!$this->lock()) {
Expand Down
3 changes: 3 additions & 0 deletions src/Factory/PayloadFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Factory;

use const FILTER_VALIDATE_BOOLEAN;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
Expand Down Expand Up @@ -42,11 +43,13 @@ private function createContext(

$isBatchingAllowed = !($input->getOption('disable-batch') ?? true);
$isParallelAllowed = $input->getOption('parallel') ?? false;
$batchAfterFetch = $input->getOption('batch-after-fetch') ?? false;

$context
->setIsContinue($input->getOption('continue') ?? false)
->setAllowParallel($isParallelAllowed)
->setBatchingAllowed($isBatchingAllowed)
->setProcessAsSoonAsPossible(filter_var($batchAfterFetch, FILTER_VALIDATE_BOOLEAN))
->setBatchSize((int) $input->getOption('batch-size'))
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency'))
->setFilters((array) ($input->getOption('filter') ?: []))
Expand Down
66 changes: 66 additions & 0 deletions src/Manager/ProcessManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Manager;

use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;

class ProcessManager implements ProcessManagerInterface
{
private array $processes = [];

private bool $instantProcessing = false;

public function __construct(
protected \BluePsyduck\SymfonyProcessManager\ProcessManager $processManager,
protected LoggerInterface $logger,
) {
}

public function setNumberOfParallelProcesses(int $numberOfParallelProcesses): void
{
$this->processManager->setNumberOfParallelProcesses($numberOfParallelProcesses);
}

public function addProcess(Process $process, callable $callback = null, array $env = []): ProcessManagerInterface
{
if ($this->instantProcessing) {
$this->processManager->addProcess($process, $callback, $env);

return $this;
}

$this->processes[] = $process;

return $this;
}

public function waitForAllProcesses(): void
{
$this->processManager->waitForAllProcesses();
}

public function startAll(): void
{
if ($this->instantProcessing) {
$this->waitForAllProcesses();

return;
}

foreach ($this->processes as $process) {
$this->processManager->addProcess($process);
}

$this->waitForAllProcesses();
}

public function setInstantProcessing(bool $instantProcessing): self
{
$this->instantProcessing = $instantProcessing;

return $this;
}
}
20 changes: 20 additions & 0 deletions src/Manager/ProcessManagerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Manager;

use Symfony\Component\Process\Process;

interface ProcessManagerInterface
{
public function startAll(): void;

public function addProcess(Process $process, callable $callback = null, array $env = []): self;

public function setInstantProcessing(bool $instantProcessing): self;

public function waitForAllProcesses(): void;

public function setNumberOfParallelProcesses(int $numberOfParallelProcesses): void;
}
1 change: 1 addition & 0 deletions src/Payload/AbstractPayload.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function __construct(
$this->maxRunningProcessQueueSize = $commandContext->getMaxRunningProcessQueueSize();
$this->verbosity = $commandContext->getVerbosity();
$this->isContinue = $commandContext->isContinue();
$this->processAsSoonAsPossible = $commandContext->getProcessAsSoonAsPossible();
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ services:
- !tagged_iterator { tag: !php/const Synolia\SyliusAkeneoPlugin\Processor\ProductAttributeValue\ReferenceEntity\ReferenceEntityAttributeValueProcessorInterface::TAG_ID, default_priority_method: getDefaultPriority }

BluePsyduck\SymfonyProcessManager\ProcessManagerInterface:
class: BluePsyduck\SymfonyProcessManager\ProcessManager
class: Synolia\SyliusAkeneoPlugin\Manager\ProcessManager
BluePsyduck\SymfonyProcessManager\ProcessManager: ~

Synolia\SyliusAkeneoPlugin\Provider\Asset\AssetValueBuilderProviderInterface:
class: Synolia\SyliusAkeneoPlugin\Provider\Asset\AssetValueBuilderProvider
Expand Down
21 changes: 12 additions & 9 deletions src/Task/AbstractProcessTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use BluePsyduck\SymfonyProcessManager\ProcessManager;
use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Statement;
use Doctrine\ORM\EntityManagerInterface;
use LogicException;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Throwable;

Expand Down Expand Up @@ -83,11 +81,6 @@ protected function batch(
array $ids,
): void {
if ($payload->allowParallel()) {
if (!$this->processManager instanceof ProcessManager) {
throw new LogicException('ProcessManager');
}
$this->processManager->setNumberOfParallelProcesses($payload->getMaxRunningProcessQueueSize());

$processArguments = [
'php',
'bin/console',
Expand All @@ -105,6 +98,9 @@ protected function batch(
$isTtySupported = Process::isTtySupported();
$process->setTty($isTtySupported);
$this->processManager->addProcess($process);
$this->logger->info('Added batch process', [
'ids' => $ids,
]);

return;
}
Expand All @@ -118,6 +114,9 @@ abstract protected function createBatchPayload(PipelinePayloadInterface $payload

protected function process(PipelinePayloadInterface $initialPayload): void
{
$this->processManager->setInstantProcessing($initialPayload->getProcessAsSoonAsPossible());
$this->processManager->setNumberOfParallelProcesses($initialPayload->getMaxRunningProcessQueueSize());

$this->logger->debug(self::class);
$this->type = $initialPayload->getType();
$this->logger->notice(Messages::createOrUpdate($this->type));
Expand Down Expand Up @@ -145,7 +144,8 @@ protected function process(PipelinePayloadInterface $initialPayload): void
$query = $this->prepareSelectBatchIdsQuery($initialPayload->getTmpTableName(), (int) $result['id'], $initialPayload->getBatchSize());
$queryResult = $query->executeQuery();
}
$this->processManager->waitForAllProcesses();

$this->processManager->startAll();
} catch (Throwable $throwable) {
$this->logger->warning($throwable->getMessage());

Expand All @@ -165,6 +165,9 @@ protected function handle(
PipelinePayloadInterface $payload,
\Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface|\Akeneo\Pim\ApiClient\Pagination\PageInterface $handleType,
): void {
$this->processManager->setInstantProcessing($payload->getProcessAsSoonAsPossible());
$this->processManager->setNumberOfParallelProcesses($payload->getMaxRunningProcessQueueSize());

$count = 0;
$ids = [];

Expand Down
2 changes: 1 addition & 1 deletion src/Task/Asset/ProcessAssetTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Synolia\SyliusAkeneoPlugin\Task\Asset;

use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Synolia\SyliusAkeneoPlugin\Checker\EditionCheckerInterface;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Asset\AssetPayload;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Task\AbstractProcessTask;
Expand Down
2 changes: 1 addition & 1 deletion src/Task/AssociationType/ProcessAssociationTypeTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

namespace Synolia\SyliusAkeneoPlugin\Task\AssociationType;

use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Event\FilterEvent;
use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Association\AssociationTypePayload;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface;
Expand Down
2 changes: 1 addition & 1 deletion src/Task/Attribute/ProcessAttributeTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

namespace Synolia\SyliusAkeneoPlugin\Task\Attribute;

use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Event\FilterEvent;
use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Attribute\AttributePayload;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface;
Expand Down
2 changes: 1 addition & 1 deletion src/Task/Family/ProcessFamilyTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

namespace Synolia\SyliusAkeneoPlugin\Task\Family;

use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Event\FilterEvent;
use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Family\FamilyPayload;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface;
Expand Down
2 changes: 1 addition & 1 deletion src/Task/Product/ProcessProductsTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\DBAL\ParameterType;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
Expand All @@ -15,6 +14,7 @@
use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException;
use Synolia\SyliusAkeneoPlugin\Filter\ProductFilterInterface;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Product\ProductPayload;
use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface;
Expand Down
8 changes: 6 additions & 2 deletions src/Task/ProductModel/ProcessProductModelsTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

namespace Synolia\SyliusAkeneoPlugin\Task\ProductModel;

use BluePsyduck\SymfonyProcessManager\ProcessManagerInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Event\FilterEvent;
use Synolia\SyliusAkeneoPlugin\Exceptions\Payload\CommandContextIsNullException;
use Synolia\SyliusAkeneoPlugin\Filter\ProductFilter;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Manager\ProcessManagerInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;
use Synolia\SyliusAkeneoPlugin\Payload\ProductModel\ProductModelPayload;
use Synolia\SyliusAkeneoPlugin\Provider\Configuration\Api\ApiConnectionProviderInterface;
use Synolia\SyliusAkeneoPlugin\Task\AbstractProcessTask;
use Throwable;

final class ProcessProductModelsTask extends AbstractProcessTask
{
Expand All @@ -34,6 +35,8 @@ public function __construct(

/**
* @param ProductModelPayload $payload
*
* @throws Throwable
*/
public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInterface
{
Expand Down Expand Up @@ -66,7 +69,8 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte
);

$this->handle($payload, $resources);
$this->processManager->waitForAllProcesses();

$this->processManager->startAll();

return $payload;
}
Expand Down

0 comments on commit d83b159

Please sign in to comment.