Skip to content

Commit

Permalink
Add symfony messenger handler
Browse files Browse the repository at this point in the history
  • Loading branch information
TheGrimmChester committed Jun 6, 2024
1 parent c87a80d commit fe33e87
Show file tree
Hide file tree
Showing 120 changed files with 1,977 additions and 1,583 deletions.
2 changes: 1 addition & 1 deletion install/Application/config/packages/test/akeneo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
Synolia\SyliusAkeneoPlugin\Factory\ProductModelPipelineFactory: ~
Synolia\SyliusAkeneoPlugin\Factory\AssociationTypePipelineFactory: ~
Synolia\SyliusAkeneoPlugin\Filter\ProductFilter: ~
Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider: ~
Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider: ~
Synolia\SyliusAkeneoPlugin\Provider\TaskProvider: ~
Synolia\SyliusAkeneoPlugin\Retriever\FamilyRetriever: ~
Synolia\SyliusAkeneoPlugin\Repository\ProductAttributeRepository: ~
Expand Down
395 changes: 50 additions & 345 deletions ruleset/phpstan-baseline.neon

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/Builder/Attribute/AssetAttributeValueValueBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AssetCollectionAttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\BooleanAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\CollectionReferenceEntityAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\DatabaseMappingAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use DateTime;
use DateTimeInterface;
use LogicException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\DateAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\IntegerAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\MetricAttributeTypeMatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Transformer\AttributeOptionValueDataTransformerInterface;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\MultiSelectAttributeTypeMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\MissingLocaleTranslationOrScopeException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\MissingScopeException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\TranslationNotFoundException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoReferenceEntityAttributeDataProviderInterface;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoReferenceEntityAttributeDataProviderInterface;
use Synolia\SyliusAkeneoPlugin\Provider\ProductRefEntityAttributeValueValueBuilderProviderInterface;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\ReferenceEntityAttributeTypeMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Transformer\AttributeOptionValueDataTransformerInterface;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\SelectAttributeTypeMatcher;
Expand Down
2 changes: 1 addition & 1 deletion src/Builder/Attribute/TableAttributeValueValueBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Sylius\Component\Product\Model\ProductAttributeInterface;
use Sylius\Component\Resource\Repository\RepositoryInterface;
use Synolia\SyliusAkeneoPlugin\Exceptions\Processor\MissingAkeneoProductAttributeValueProcessorException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\ProductAttributeValue\TableAttributeValueProcessorProviderInterface;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\TableAttributeTypeMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Synolia\SyliusAkeneoPlugin\Builder\Attribute;

use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\TextareaAttributeTypeMatcher;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\TextAttributeTypeMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use Synolia\SyliusAkeneoPlugin\Event\ProductOptionValueTranslation\AfterProcessingProductOptionValueTranslationEvent;
use Synolia\SyliusAkeneoPlugin\Event\ProductOptionValueTranslation\BeforeProcessingProductOptionValueTranslationEvent;
use Synolia\SyliusAkeneoPlugin\Exceptions\Builder\ProductOptionValueTranslation\ProductOptionValueTranslationBuilderNotFoundException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\SyliusAkeneoLocaleCodeProvider;
use Synolia\SyliusAkeneoPlugin\Transformer\ProductOptionValueDataTransformerInterface;
use Webmozart\Assert\Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
use Synolia\SyliusAkeneoPlugin\Exceptions\Retriever\FamilyMeasureNotFoundException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Retriever\MeasurableNotFoundException;
use Synolia\SyliusAkeneoPlugin\Exceptions\UnsupportedAttributeTypeException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributeDataProviderInterface;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributeDataProviderInterface;
use Synolia\SyliusAkeneoPlugin\Provider\Data\AkeneoAttributePropertiesProvider;
use Synolia\SyliusAkeneoPlugin\Provider\ProductFilterRulesProviderInterface;
use Synolia\SyliusAkeneoPlugin\Retriever\FamilyMeasureRetriever;
use Synolia\SyliusAkeneoPlugin\TypeMatcher\Attribute\AttributeTypeMatcher;
Expand Down
1 change: 1 addition & 0 deletions src/Command/AbstractImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected function configure(): void
->addOption('max-concurrency', 'c', InputOption::VALUE_OPTIONAL, 'Max process concurrency', 5)
->addOption('batch-after-fetch', 'a', InputOption::VALUE_OPTIONAL, 'Fetch all pages then start processing the batches', true)
->addOption('filter', 'f', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Add filter')
->addOption('handler', 'i', InputOption::VALUE_OPTIONAL, 'Specify batch handler')
;
}

Expand Down
4 changes: 4 additions & 0 deletions src/Configuration/ConfigurationContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ public function disableBatching(): self;
public function getFilters(): array;

public function setFilters(array $filters): self;

public function getHandler(): string;

public function setHandler(string $handler): self;
}
15 changes: 15 additions & 0 deletions src/Configuration/ConfigurationContextTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Synolia\SyliusAkeneoPlugin\Configuration;

use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Handler\Task\SymfonyProcessTaskHandler;

trait ConfigurationContextTrait
{
Expand All @@ -24,6 +25,8 @@ trait ConfigurationContextTrait

private array $filters = [];

private string $handler = SymfonyProcessTaskHandler::HANDLER_CODE;

public function getBatchSize(): int
{
return $this->batchSize;
Expand Down Expand Up @@ -132,4 +135,16 @@ public function setFilters(array $filters): ConfigurationContextInterface

return $this;
}

public function getHandler(): string
{
return $this->handler;
}

public function setHandler(string $handler): ConfigurationContextInterface
{
$this->handler = $handler;

return $this;
}
}
1 change: 1 addition & 0 deletions src/Entity/Asset.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*/
#[ORM\Entity(repositoryClass: AssetRepository::class)]
#[ORM\Table(name: 'akeneo_assets')]
#[ORM\Index(columns: ['family_code', 'asset_code', 'attribute_code'], name: 'asset_idx')]
class Asset implements AssetInterface
{
/**
Expand Down
9 changes: 9 additions & 0 deletions src/Exceptions/NotImplementedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Exceptions;

class NotImplementedException extends \LogicException
{
}
16 changes: 16 additions & 0 deletions src/Factory/Message/Batch/BatchMessageFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Factory\Message\Batch;

use Synolia\SyliusAkeneoPlugin\Message\Batch\BatchMessageInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;

class BatchMessageFactory implements BatchMessageFactoryInterface
{
public static function createFromPayload(PipelinePayloadInterface $payload, array $items): BatchMessageInterface
{
return $payload->createBatchMessage($items);
}
}
13 changes: 13 additions & 0 deletions src/Factory/Message/Batch/BatchMessageFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Factory\Message\Batch;

use Synolia\SyliusAkeneoPlugin\Message\Batch\BatchMessageInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;

interface BatchMessageFactoryInterface
{
public static function createFromPayload(PipelinePayloadInterface $payload, array $items): BatchMessageInterface;
}
1 change: 1 addition & 0 deletions src/Factory/PayloadFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private function createContext(
->setBatchSize((int) $input->getOption('batch-size'))
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency'))
->setFilters((array) ($input->getOption('filter') ?: []))
->setHandler($input->getOption('handler') ?? $context->getHandler())
;

if (!$isBatchingAllowed) {
Expand Down
8 changes: 4 additions & 4 deletions src/Factory/ProductPipelineFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
use League\Pipeline\PipelineInterface;
use Synolia\SyliusAkeneoPlugin\Pipeline\Processor;
use Synolia\SyliusAkeneoPlugin\Task\Product\ProcessProductsTask;
use Synolia\SyliusAkeneoPlugin\Task\Product\SetupProductTask;
use Synolia\SyliusAkeneoPlugin\Task\Product\TearDownProductTask;
use Synolia\SyliusAkeneoPlugin\Task\SetupTask;
use Synolia\SyliusAkeneoPlugin\Task\TearDownTask;

final class ProductPipelineFactory extends AbstractPipelineFactory
{
Expand All @@ -18,9 +18,9 @@ public function create(): PipelineInterface
$pipeline = new Pipeline(new Processor($this->dispatcher));

return $pipeline
->pipe($this->taskProvider->get(SetupProductTask::class))
->pipe($this->taskProvider->get(SetupTask::class))
->pipe($this->taskProvider->get(ProcessProductsTask::class))
->pipe($this->taskProvider->get(TearDownProductTask::class))
->pipe($this->taskProvider->get(TearDownTask::class))
;
}
}
124 changes: 124 additions & 0 deletions src/Handler/Task/SymfonyMessengerTaskHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Handler\Task;

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Synolia\SyliusAkeneoPlugin\Factory\Message\Batch\BatchMessageFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;

class SymfonyMessengerTaskHandler implements TaskHandlerInterface
{
public const HANDLER_CODE = 'messenger';

public function __construct(
protected EntityManagerInterface $entityManager,
protected LoggerInterface $logger,
private MessageBusInterface $bus,
private BatchMessageFactoryInterface $batchMessageFactory,
) {
}

public function support(PipelinePayloadInterface $pipelinePayload): bool
{
return $pipelinePayload->getHandler() === self::HANDLER_CODE;
}

public function batch(PipelinePayloadInterface $pipelinePayload, array $items): void
{
$this->bus->dispatch($this->batchMessageFactory->createFromPayload($pipelinePayload, $items));
}

public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
): void {
$count = 0;
$items = [];

if ($handleType instanceof PageInterface) {
$this->handleByPage($pipelinePayload, $handleType, $count, $items);
} else {
$this->handleByCursor($pipelinePayload, $handleType, $count, $items);
}
}

private function handleByPage(
PipelinePayloadInterface $payload,
PageInterface $page,
int &$count = 0,
array &$items = [],
): void {
while (
($page instanceof Page && $page->hasNextPage()) ||
($page instanceof Page && !$page->hasPreviousPage()) ||
$page instanceof Page
) {
foreach ($page->getItems() as $item) {
++$count;
$items[] = $item;
$identifiers[] = $item['code'] ?? $item['identifier'];

if (0 === $count % $payload->getBatchSize()) {
$this->logger->notice('Batching', ['codes' => $identifiers]);
$this->batch($payload, $items);
$items = [];
}
}

$page = $page->getNextPage();
}

if ($items !== []) {
$this->batch($payload, $items);
$items = [];
}
}

private function handleByCursor(
PipelinePayloadInterface $payload,
ResourceCursorInterface $resourceCursor,
int &$count = 0,
array &$items = [],
): void {
/**
* @var array<string, mixed> $item
*/
foreach ($resourceCursor as $item) {
++$count;
$items[] = $item;
$identifiers[] = $item['code'] ?? $item['identifier'];

if (0 === $count % $payload->getBatchSize()) {
$this->logger->notice('Batching', ['codes' => $identifiers]);
$this->batch($payload, $items);
$items = [];
}
}

if ($items !== []) {
$this->batch($payload, $items);
$items = [];
}
}

public function setUp(PipelinePayloadInterface $pipelinePayload): PipelinePayloadInterface
{
return $pipelinePayload;
}

public function tearDown(PipelinePayloadInterface $pipelinePayload): PipelinePayloadInterface
{
return $pipelinePayload;
}

public function continue(PipelinePayloadInterface $pipelinePayload): void
{
}
}
Loading

0 comments on commit fe33e87

Please sign in to comment.