Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messenger Queue and Scheduler #91

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The only job of the bundle is to store Pimcore elements (assets, documents, data

1. `composer require valantic/pimcore-elastica-bridge`
1. Edit `config/bundles.php` and add `\Valantic\ElasticaBridgeBundle\ValanticElasticaBridgeBundle::class => ['all' => true],`
1. Configure the connection to your Elasticsearch cluster as seen in [`example/app/config/config.yaml`](example/app/config/config.yaml)
1. Configure the connection to your Elasticsearch cluster as seen in [`example/app/config/config.yaml`](/docs/example/config/config.yaml)
1. Don't forget to register your newly created services (implementing `IndexInterface` etc.) in your `services.yaml`
```yml
App\Elasticsearch\:
Expand All @@ -22,7 +22,7 @@ The only job of the bundle is to store Pimcore elements (assets, documents, data

## Usage

Please see the [`docs/example/`](docs/example/) folder for a complete example. The following steps link to the corresponding section in the example and explain in a bit more detail what they are doing.
Please see the [`docs/example/`](/docs/example) folder for a complete example. The following steps link to the corresponding section in the example and explain in a bit more detail what they are doing.

### Define an index

Expand Down Expand Up @@ -67,6 +67,10 @@ valantic_elastica_bridge:
# If true, when a document fails to be indexed, it will be skipped and indexing continue with the next document. If false, indexing that index will be aborted.
should_skip_failing_documents: false
```

### Async Configuration
This bundle supports utilizing the message queue for indexing. To enable this feature, you can find the necessary configuration in the [async](async.md) documentation.

## Events

This project uses Symfony's event dispatcher. Here are the events that you can listen to:
Expand Down
56 changes: 56 additions & 0 deletions async.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Async Settings
## Basic Configuration
The package is capable of running indexing jobs asynchronously. This is done by using the Symfony Messenger component.
To enable this feature, you need to configure the following settings:

```yaml
# config.yaml
framework:
messenger:
transports:
elastica_bridge_populate: '%env(MESSENGER_TRANSPORT_DSN)%'
```

```dotenv
# .env
MESSENGER_TRANSPORT_DSN='doctrine://default?queue_name=elastica_bridge_populate'
```

This configuration will send all relevant messages to the `elastica_bridge_populate` transport. The `elastica_bridge_populate` transport is
configured to use the `doctrine` transport, which stores the messages in the database. The `queue_name` parameter
specifies the name of the queue where the messages are stored.

## Event Listeners
To take full advantage you will need to configure some event listeners.
See [PopulateListener.php](/docs/example/src/EventListener/PopulateListener.php) and [PopulateService.php](/docs/example/src/Service/PopulateService.php) for a full working example.

The package provides the following events:

| Event Description | Possible Use Cases | Event Name | Event Class |
|-------------------------------|-----------------------------------------------------------------------------------------------------------------|------------------------------|---------------------------|
| Before the index is populated | <ul><li>Determine the source of the message and possibly clear previous errors</li></ul> | `PRE_EXECUTE` | `PreExecuteEvent` |
| Before the index is populated | <ul><li>Set expected message count</li></ul> | `PRE_PROCESS_MESSAGES_EVENT` | `PreProcessMessagesEvent` |
| Before a document is created | <ul><li>Stop document creation if execution is locked</li><li>give the remaining messages for logging</li></ul> | `PRE_DOCUMENT_CREATE` | `PreDocumentCreateEvent` |
| After a document is created | <ul><li>Decrement remaining messages</li><li>lock execution if document creation failed</li></ul> | `POST_DOCUMENT_CREATE` | `PostDocumentCreateEvent` |
| Before a index is switched | <ul><li>Skip switch if execution is locked</li><li>update the remaining messages</li></ul> | `PRE_SWITCH_INDEX` | `PreSwitchIndexEvent` |
| Before a index is switched | <ul><li>Check if all messages are consumed</li><li>update the remaining messages</li></ul> | `WAIT_FOR_COMPLETION_EVENT` | `WaitForCompletionEvent` |
| After a index is switched | <ul><li>Log</li><li>Send Notifications</li></ul> | `POST_SWITCH_INDEX` | `PostSwitchIndexEvent` |



## Workers
Workers are preferably setup using a supervisor configuration. The following is an example configuration for a worker:

### Queue Worker
To process the messages, you need to set up a worker. This can be done by running the following command:

```shell
$ bin/console messenger:consume elastica_bridge_populate
```

### Scheduler Worker
To process the messages in a scheduled manner, you can use the following command:

```shell
$ bin/console messenger:consume scheduler_populate_index
```
22 changes: 12 additions & 10 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@
"pimcore/pimcore": "^11.0",
"psr/log": "^3.0",
"ruflin/elastica": "^8.0.0",
"symfony/console": "^6.2",
"symfony/lock": "^6.2"
"symfony/console": "^6.4|^7.1",
"symfony/lock": "^6.4|^7.1",
"symfony/scheduler": "^6.4|^7.1"
},
"require-dev": {
"symfony/routing": "^6.4",
"symfony/http-kernel": "^6.4",
"bamarni/composer-bin-plugin": "^1.8.2",
"phpstan/extension-installer": "^1.4.1",
"phpstan/phpstan": "^1.11.9",
"phpstan/phpstan-deprecation-rules": "^1.2.0",
"phpstan/phpstan-strict-rules": "^1.6.0",
"rector/rector": "^1.2.2",
"roave/security-advisories": "dev-latest",
"sentry/sentry": "^4.8.1",
"symfony/http-client": "^6.4.10"
"phpstan/extension-installer": "^1.4.3",
"phpstan/phpstan": "^1.12.7",
"phpstan/phpstan-deprecation-rules": "^1.2.1",
"phpstan/phpstan-strict-rules": "^1.6.1",
"rector/rector": "^1.2.8",
"sentry/sentry": "^4.9.0",
"symfony/http-client": "^6.4.12|^7.1"
},
"license": "MIT",
"authors": [
Expand Down
10 changes: 10 additions & 0 deletions docs/example/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
valantic_elastica_bridge:
events:
auto_save:
document: false
asset: false
data_object: false
client:
should_add_sentry_breadcrumbs: true
dsn: 'http://localhost:9200'
indexing:
lock_timeout: 3600 # lock timeout for indexing
interval: 600 # interval in which the scheduler is executed
cooldown: 3600 # cooldown between two scheduler runs for each index
103 changes: 103 additions & 0 deletions docs/example/src/EventListener/PopulateListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace App\EventListener;

use App\Service\PopulateService;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Valantic\ElasticaBridgeBundle\Model\Event\ElasticaBridgeEvents;
use Valantic\ElasticaBridgeBundle\Model\Event\PostDocumentCreateEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreDocumentCreateEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreExecuteEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreProcessMessagesEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\PreSwitchIndexEvent;
use Valantic\ElasticaBridgeBundle\Model\Event\WaitForCompletionEvent;

class PopulateListener implements EventSubscriberInterface
{
public function __construct(
private readonly PopulateService $populateService,
) {}

public function onPostDocumentCreate(PostDocumentCreateEvent $event): void
{
if ($event->success) {
$this->populateService->decrementRemainingMessages($event->index->getName());

return;
}

if ($event->willRetry || $event->skipped) {
return;
}

$this->populateService->lockExecution($event->index->getName());
}

public function onPostSwitchIndex(): void {}

public function onPreDocumentCreate(PreDocumentCreateEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->stopExecution();

return;
}

$event->setCurrentCount($this->populateService->getRemainingMessages($event->index->getName()));
}

public function onPrePopulateIndex(PreExecuteEvent $prePopulateEvent): void
{
if ($prePopulateEvent->source === PreExecuteEvent::SOURCE_CLI) {
$this->populateService->unlockExecution($prePopulateEvent->index->getName());
}
}

public function onPreSwitchIndex(PreSwitchIndexEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->skipSwitch();
$event->initiateCooldown = false;
}

$event->setRemainingMessages($this->populateService->getRemainingMessages($event->index->getName()));
}

public function onWaitForCompletion(WaitForCompletionEvent $event): void
{
if ($this->populateService->isExecutionLocked($event->index->getName())) {
$event->skipSwitch();

return;
}

$retryCount = $event->retries;
$event->setRemainingMessages($this->populateService->getRemainingMessages($event->index->getName()));

if ($retryCount > $event->maximumRetries - 1) {
$remainingMessages = $this->populateService->getActualMessageCount($event->index->getName());
$event->setRemainingMessages($remainingMessages);
$this->populateService->setExpectedMessages($event->index->getName(), $remainingMessages);
}
}

public function preProcessMessagesEvent(PreProcessMessagesEvent $messageQueueInitializedEvent): void
{
$this->populateService->setExpectedMessages($messageQueueInitializedEvent->index->getName(), $messageQueueInitializedEvent->expectedMessages);
}

public static function getSubscribedEvents(): array
{
return [
ElasticaBridgeEvents::PRE_EXECUTE => 'onPrePopulateIndex',
ElasticaBridgeEvents::PRE_PROCESS_MESSAGES_EVENT => 'preProcessMessagesEvent',
ElasticaBridgeEvents::PRE_DOCUMENT_CREATE => 'onPreDocumentCreate',
ElasticaBridgeEvents::POST_DOCUMENT_CREATE => 'onPostDocumentCreate',
ElasticaBridgeEvents::PRE_SWITCH_INDEX => 'onPreSwitchIndex',
ElasticaBridgeEvents::WAIT_FOR_COMPLETION_EVENT => 'onWaitForCompletion',
ElasticaBridgeEvents::POST_SWITCH_INDEX => 'onPostSwitchIndex',
];
}
}
101 changes: 101 additions & 0 deletions docs/example/src/Service/PopulateService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

declare(strict_types=1);

namespace App\Service;

use Doctrine\DBAL\Connection;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;

class PopulateService
{
public const KEY_NAME_FAILURE = 'failure';
private const KEY_PREFIX = 'elasticsearch_populate';
private const REMAINING_MESSAGES = 'remaining_messages';

public function __construct(
#[Autowire(service: 'cache.default_redis_provider')]
private readonly \Redis $redis,
private readonly Connection $connection,
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function decrementRemainingMessages(string $indexName): void
{
$this->redis->decr($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function incrementRemainingMessages(string $indexName): void
{
$this->redis->incr($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function getRemainingMessages(string $indexName): int
{
return (int) $this->redis->get($this->getKeyName($indexName, self::REMAINING_MESSAGES));
}

public function setExpectedMessages(string $indexName, int $expectedMessages): void
{
$this->redis->set($this->getKeyName($indexName, self::REMAINING_MESSAGES), $expectedMessages);
}

public function getActualMessageCount(string $indexName): int
{
$query = "SELECT
COUNT(mm.id) AS remaining_messages
FROM messenger_messages mm
WHERE mm.queue_name = 'elastica_bridge_populate'
AND mm.body LIKE CONCAT('%\\\\\\\\\"', :indexName, '\\\\\\\\\"%')
AND mm.delivered_at IS NULL
AND mm.body LIKE '%CreateDocument%'";

$count = $this->connection->executeQuery($query, ['indexName' => $indexName, 'indexNameLength' => strlen($indexName)])->fetchOne();

return (int) $count;
}

public function lockExecution(string $document): string
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);

if ($this->isExecutionLocked($document)) {
return $key;
}

$this->redis->set($key, 1, ['NX', 'EX' => 1200]);
$this->consoleOutput->writeln(sprintf('Locking execution for %s (%s)', $document, $key), ConsoleOutputInterface::VERBOSITY_VERBOSE);

return $key;
}

public function unlockExecution(string $document): void
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);

if ($this->redis->exists($key) === 0) {
return;
}

$this->redis->del($key);
$this->consoleOutput->writeln(sprintf('Unlocking execution for %s (%s)', $document, hash('sha256', $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}

public function isExecutionLocked(string $document): bool
{
$key = $this->getKeyName($document, self::KEY_NAME_FAILURE);
$exists = $this->redis->exists($key);

if (is_int($exists)) {
return $exists > 0;
}

return false;
}

public function getKeyName(string $document, string $type): string
{
return sprintf('%s_%s_%s', self::KEY_PREFIX, $document, $type);
}
}
Loading
Loading