diff --git a/src/Processor/ProductGroup/ProductGroupProcessor.php b/src/Processor/ProductGroup/ProductGroupProcessor.php index a5f55ca0..eb7c6398 100644 --- a/src/Processor/ProductGroup/ProductGroupProcessor.php +++ b/src/Processor/ProductGroup/ProductGroupProcessor.php @@ -54,7 +54,6 @@ private function createGroupForCodeAndFamily( $productGroup->setModel($code); $productGroup->setFamily($family); $productGroup->setFamilyVariant($familyVariant); - $this->entityManager->persist($productGroup); return; } diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index f45ec21c..6b3ad69e 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -1,3 +1,7 @@ +parameters: + env(SYNOLIA_AKENEO_MAX_RETRY_COUNT): 30 + env(SYNOLIA_AKENEO_RETRY_WAIT_TIME): 5000 + services: _defaults: autowire: true @@ -5,6 +9,8 @@ services: public: false bind: $projectDir: '%kernel.project_dir%' + $maxRetryCount: '%env(int:SYNOLIA_AKENEO_MAX_RETRY_COUNT)%' + $retryWaitTime: '%env(int:SYNOLIA_AKENEO_RETRY_WAIT_TIME)%' Synolia\SyliusAkeneoPlugin\: resource: '../../*' diff --git a/src/Task/ProductModel/BatchProductModelTask.php b/src/Task/ProductModel/BatchProductModelTask.php index 49f8ca32..403ea260 100644 --- a/src/Task/ProductModel/BatchProductModelTask.php +++ b/src/Task/ProductModel/BatchProductModelTask.php @@ -5,7 +5,6 @@ namespace Synolia\SyliusAkeneoPlugin\Task\ProductModel; use Doctrine\DBAL\Exception; -use Doctrine\DBAL\Result; use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\ORMInvalidArgumentException; use Doctrine\Persistence\ManagerRegistry; @@ -41,6 +40,9 @@ public function __construct( private IsProductProcessableCheckerInterface $isProductProcessableChecker, private ProductGroupProcessor $productGroupProcessor, private ManagerRegistry $managerRegistry, + private int $maxRetryCount, + private int $retryWaitTime, + private int $retryCount = 0, ) { parent::__construct($entityManager); } @@ -57,34 +59,36 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte $this->logger->notice(Messages::createOrUpdate($this->type)); $query = $this->getSelectStatement($payload); - /** @var Result $queryResult */ $queryResult = $query->executeQuery(); while ($results = $queryResult->fetchAll()) { + /** @var array $result */ foreach ($results as $result) { - /** @var array $resource */ - $resource = json_decode($result['values'], true); - - $this->handleProductGroup($resource); - try { - $this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource)); + $this->handleProductModel($result); - $this->entityManager->beginTransaction(); + unset($resource, $product); + $this->removeEntry($payload, (int) $result['id']); + } catch (ORMInvalidArgumentException $ORMInvalidArgumentException) { + if ($this->retryCount === $this->maxRetryCount) { + $this->retryCount = 0; + unset($resource, $product); + $this->removeEntry($payload, (int) $result['id']); - if ($this->isProductProcessableChecker->check($resource)) { - $product = $this->process($resource); - $this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product)); + continue; } - $this->entityManager->flush(); - $this->entityManager->commit(); + usleep($this->retryWaitTime); - unset($resource, $product); - $this->removeEntry($payload, (int) $result['id']); + $this->logger->error('Retrying import', [ + 'product' => $result, + 'retry_count' => $this->retryCount, + 'error' => $ORMInvalidArgumentException->getMessage(), + ]); + + return $this->__invoke($payload); } catch (\Throwable $throwable) { - $this->entityManager->rollback(); - $this->logger->warning($throwable->getMessage()); + $this->logger->error($throwable->getMessage()); $this->removeEntry($payload, (int) $result['id']); } } @@ -93,23 +97,40 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte return $payload; } + private function handleProductModel(array $result): void + { + /** @var array $resource */ + $resource = json_decode($result['values'], true); + + $this->handleProductGroup($resource); + $this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource)); + + if ($this->isProductProcessableChecker->check($resource)) { + $product = $this->process($resource); + $this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product)); + } + + $this->entityManager->flush(); + } + private function handleProductGroup(array $resource): void { try { - $this->entityManager->beginTransaction(); - $this->productGroupProcessor->process($resource); - $this->entityManager->flush(); - $this->entityManager->commit(); - } catch (ORMInvalidArgumentException) { - if ($this->entityManager->getConnection()->isTransactionActive()) { - $this->entityManager->rollback(); - } + $this->logger->info('Processed product group', [ + 'code' => $resource['parent'] ?? $resource['code'], + ]); + } catch (ORMInvalidArgumentException $ORMInvalidArgumentException) { if (!$this->entityManager->isOpen()) { + $this->logger->warning('Recreating entity manager'); $this->entityManager = $this->getNewEntityManager(); } + + ++$this->retryCount; + + throw $ORMInvalidArgumentException; } }