Skip to content

Commit

Permalink
fix AlgoliaReindexAllJob to batch into steps
Browse files Browse the repository at this point in the history
Complete overhaul of this class to actually do what it says in the setup description - do the heavy lifting / batching at setup and then step through them 1 chunk at a time using QueuedJob's currentStep.
  • Loading branch information
matt-in-a-hat committed Jul 31, 2024
1 parent c584e2a commit 92adb22
Showing 1 changed file with 75 additions and 67 deletions.
142 changes: 75 additions & 67 deletions src/Jobs/AlgoliaReindexAllJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Wilr\Silverstripe\Algolia\Jobs;

use Exception;
use SilverStripe\Core\Config\Configurable;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\ORM\DataObject;
Expand Down Expand Up @@ -60,103 +59,112 @@ public function setup()
$this->currentStep = 0;

$filters = $this->config()->get('reindexing_default_filters');
$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');

// find all classes we have to index and add them to the indexData map
// in groups of batch size, this setup operation does the heavy lifting
// and process simply handles one batch at a time.
foreach ($algoliaService->indexes as $index) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null;
$indexFilters = (isset($index['includeFilters'])) ? $index['includeFilters'] : null;

if ($classes) {
foreach ($classes as $candidate) {
$filter = (isset($filters[$candidate])) ? $filters[$candidate] : '';
$count = 0;

foreach ($task->getItems($candidate, $filter, $indexFilters)->column('ID') as $id) {
$count++;

if (!isset($this->indexData[$candidate])) {
$this->indexData[$candidate] = [];
foreach ($classes as $class) {
$filter = (isset($filters[$class])) ? $filters[$class] : '';
$ids = $task->getItems($class, $filter, $indexFilters)->column('ID');
if (count($ids)) {
if ($batching && $batchSize > 1) {
foreach (array_chunk($ids, $batchSize) as $chunk) {
$this->indexData[] = [
'indexName' => $indexName,
'class' => $class,
'ids' => $chunk,
];
}
} else {
foreach ($ids as $id) {
$this->indexData[] = [
'indexName' => $indexName,
'class' => $class,
'id' => $id,
];
}
}

$this->indexData[$candidate][] = $id;
$this->totalSteps++;
$this->addMessage('[' . $indexName . '] Indexing ' . count($ids) . ' ' . $class . ' instances with filters: ' . ($filter ?: '(none)'));
} else {
$this->addMessage('[' . $indexName . '] 0 ' . $class . ' instances to index with filters: ' . ($filter ?: '(none) - skipping.'));
}

$this->addMessage('Indexing ' . $count . ' ' . $candidate . ' instances with filters ' . $filter);
}
}
}
$this->totalSteps += count($this->indexData);
}

/**
* Index data is in groups of 20.
* Index data is an array of steps to process, each step either looks like this with batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'ids' => array of int,
* ]
* or this without batching:
* [
* 'indexName' => string,
* 'class' => string,
* 'id' => int,
* ]
* We process one step / batch / id per call.
*/
public function process()
{
$remainingChildren = $this->indexData;

if (!$remainingChildren || empty($remainingChildren)) {
if ($this->currentStep >= $this->totalSteps || !isset($this->indexData[$this->currentStep])) {
$this->isComplete = true;
$this->addMessage('Done!');

return;
}
if (!isset($this->indexData[$this->currentStep])) {
$this->isComplete = true;
$this->addMessage('Somehow we ran out of job data before all steps were processed. So we will assume we are done!');
$this->addMessage('Dumping out the jop data for debug purposes: ' . json_encode($this->indexData));
return;
}

$algoliaService = Injector::inst()->create(AlgoliaService::class);
$task = new AlgoliaReindex();

$batchSize = $task->config()->get('batch_size');
$batching = $this->config()->get('use_batching');
$stepData = $this->indexData[$this->currentStep];
$class = $stepData['class'];

foreach ($remainingChildren as $class => $ids) {
foreach ($algoliaService->indexes as $indexName => $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : [];
try {
$task = new AlgoliaReindex();

if (!in_array($class, $classes)) {
continue;
if (isset($stepData['ids'])) {
if ($task->indexItems($stepData['indexName'], DataObject::get($class)->filter('ID', $stepData['ids']), false)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . implode(', ', $stepData['ids']) . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $stepData['ids']) . ']');
}

$take = array_slice($ids, 0, $batchSize);
$this->indexData[$class] = array_slice($ids, $batchSize);

if (!empty($take)) {
$this->currentStep += count($take);
$errors = [];

try {
if ($batching) {
if ($task->indexItems($indexName, DataObject::get($class)->filter('ID', $take), false)) {
$this->addMessage('Successfully indexing ' . $class . ' [' . implode(', ', $take) . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . implode(', ', $take) . ']');
}
} else {
$items = DataObject::get($class)->filter('ID', $take);

foreach ($items as $item) {
if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']');
}
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
} else {
$item = DataObject::get($class)->byID($stepData['id']);
if ($item) {
if ($task->indexItem($item)) {
$this->addMessage('Successfully indexed ' . $class . ' [' . $item->ID . ']');
} else {
$this->addMessage('Error indexing ' . $class . ' [' . $item->ID . ']');
}
} else {
unset($this->indexData[$class]);
$this->addMessage('Error indexing ' . $class . ' [' . $stepData['id'] . '] - failed to load item from DB');
}
}

$errors = $task->getErrors();
} catch (Throwable $e) {
$errors[] = $e->getMessage();
}

if (!empty($errors)) {
$this->addMessage(implode(', ', $errors));
$task->clearErrors();
}

$this->currentStep++;
}
}

0 comments on commit 92adb22

Please sign in to comment.