Skip to content

Commit

Permalink
NEW batching for AlgoliaReindexAll job
Browse files Browse the repository at this point in the history
  • Loading branch information
wilr committed Jul 26, 2021
1 parent 9e0c1de commit a3dcbed
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 62 deletions.
8 changes: 7 additions & 1 deletion src/Jobs/AlgoliaIndexItemJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,18 @@ public function process()

$obj = DataObject::get_by_id($this->itemClass, $id);

if ($obj && $obj->canIndexInAlgolia()) {
if (!$obj) {
$this->addMessage('Record #'. $id . ' not found');
} elseif (!$obj->canIndexInAlgolia()) {
$this->addMessage('Record #'. $id .' not indexed, canIndexInAlgolia returned false');
} else {
if (!$obj->AlgoliaUUID) {
$obj->assignAlgoliaUUID();
}
$obj->doImmediateIndexInAlgolia();

$this->addMessage('Record #'. $id .' indexed as objectID '. $obj->AlgoliaUUID);

unset($obj);
}

Expand Down
58 changes: 31 additions & 27 deletions src/Jobs/AlgoliaReindexAllJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
use Wilr\SilverStripe\Algolia\Tasks\AlgoliaReindex;

/**
* Reindex everything via a queued job (when AlgoliaReindex task won't do)
*
* Reindex everything via a queued job (when AlgoliaReindex task won't do). This
* supports reindexing via batch operations. Algolia limits apply.
*/
class AlgoliaReindexAllJob extends AbstractQueuedJob implements QueuedJob
{
Expand All @@ -34,7 +34,7 @@ public function __construct($params = array())

public function getTitle()
{
return 'Algolia reindexing everything';
return 'Algolia re-indexing all records';
}

public function getJobType()
Expand All @@ -54,59 +54,63 @@ public function setup()

$filters = $this->config()->get('reindexing_default_filters');

// find all classes we have to index and do so
// find all classes we have to index and add them to the indexData map
// in groups of batch size, this setup operation does the heavy lifting
// and process simply handles one batch at a time.
foreach ($algoliaService->indexes as $index) {
$classes = (isset($index['includeClasses'])) ? $index['includeClasses'] : null;

if ($classes) {
foreach ($classes as $candidate) {
$filter = (isset($filters[$candidate])) ? $filters[$candidate] : '';
$count = 0;

foreach ($task->getItems($candidate, $filter)->column('ID') as $id) {
$key = $candidate . '|'. $id;
$count++;

$this->indexData[$key] = $key;
if (!isset($this->indexData[$candidate])) {
$this->indexData[$candidate] = [];
}

$this->indexData[$candidate][] = $id;
$this->totalSteps++;
}

$this->addMessage('Indexing '. $count . ' '. $candidate . ' instances with filters '. $filter );
}
}
}
}

/**
* Index data is in groups of 20.
*/
public function process()
{
$remainingChildren = $this->indexData;

if (!count($remainingChildren)) {
if (!$remainingChildren || empty($remainingChildren)) {
$this->isComplete = true;

return;
}

$this->currentStep++;

list($class, $id) = explode('|', array_shift($remainingChildren));
$task = new AlgoliaReindex();
$batchSize = $task->config()->get('batch_size');

$obj = DataObject::get_by_id($class, $id);
foreach ($remainingChildren as $class => $ids) {
$take = array_slice($ids, 0, $batchSize);
$this->indexData[$class] = array_slice($ids, $batchSize);

if ($obj && $obj->canIndexInAlgolia()) {
if (!$obj->AlgoliaUUID) {
$obj->assignAlgoliaUUID();
}
$take = array_slice($ids, 0, $batchSize);

if ($obj->AlgoliaUUID) {
$obj->doImmediateIndexInAlgolia();
if (!empty($take)) {
$this->currentStep += count($take);
$task->indexItems($class, '', DataObject::get($class)->filter('ID', $take), false);
$this->addMessage('Indexing '. $class . ' ['. implode(', ', $take) . ']');
} else {
unset($this->indexData[$class]);
}
}

$this->addMessage(sprintf('[%s/%s], %s', $this->currentStep, $this->totalSteps, $class . '#'. $id));

$this->indexData = $remainingChildren;

if (!count($remainingChildren)) {
$this->isComplete = true;

return;
}
}
}
79 changes: 45 additions & 34 deletions src/Tasks/AlgoliaReindex.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,29 @@ public function getItems($targetClass, $filter = '')
* @param string $targetClass
* @param string $filter
* @param DataList? $items
* @param bool $output;
*/
public function indexItems($targetClass, $filter = '', $items = null)
public function indexItems($targetClass, $filter = '', $items = null, $output = true)
{
$algoliaService = Injector::inst()->create(AlgoliaService::class);
$algoliaService = Injector::inst()->get(AlgoliaService::class);
$count = 0;
$skipped = 0;
$total = ($items) ? $items->count() : 0;
$batchSize = $this->config()->get('batch_size');
$batchesTotal = ($total > 0) ? (ceil($total / $batchSize)) : 0;
$indexer = Injector::inst()->create(AlgoliaIndexer::class);

echo sprintf(
'Found %s %s remaining to index which match filter (%s), will export in batches of %s, %s batches total %s',
$total,
$targetClass,
$filter,
$batchSize,
$batchesTotal,
PHP_EOL
);
$indexer = Injector::inst()->get(AlgoliaIndexer::class);

if ($output) {
echo sprintf(
'Found %s %s remaining to index which match filter (%s), will export in batches of %s, %s batches total %s',
$total,
$targetClass,
$filter,
$batchSize,
$batchesTotal,
PHP_EOL
);
}

$pos = 0;

Expand All @@ -152,10 +155,12 @@ public function indexItems($targetClass, $filter = '', $items = null)
foreach ($limitedSize as $item) {
$pos++;

echo '.';
if ($output) {
echo '.';

if ($pos % 50 == 0) {
echo sprintf(' [%s/%s]%s', $pos, $total, PHP_EOL);
if ($pos % 50 == 0) {
echo sprintf(' [%s/%s]%s', $pos, $total, PHP_EOL);
}
}

// fetch the actual instance
Expand Down Expand Up @@ -187,7 +192,9 @@ public function indexItems($targetClass, $filter = '', $items = null)

unset($currentBatches[$batchKey]);

sleep(1);
if ($output) {
sleep(1);
}
}
}
}
Expand All @@ -196,26 +203,30 @@ public function indexItems($targetClass, $filter = '', $items = null)
if (count($currentBatches[$class]) > 0) {
$this->indexbatch($currentBatches[$class]);

sleep(1);
if ($output) {
sleep(1);
}
}
}

Debug::message(
sprintf(
"Number of objects indexed: %s, Skipped %s",
$count,
$skipped
)
);

Debug::message(
sprintf(
"See index at <a href='https://www.algolia.com/apps/%s/explorer/indices' target='_blank'>".
"algolia.com/apps/%s/explorer/indices</a>",
$algoliaService->applicationId,
$algoliaService->applicationId
)
);
if ($output) {
Debug::message(
sprintf(
"Number of objects indexed: %s, Skipped %s",
$count,
$skipped
)
);

Debug::message(
sprintf(
"See index at <a href='https://www.algolia.com/apps/%s/explorer/indices' target='_blank'>".
"algolia.com/apps/%s/explorer/indices</a>",
$algoliaService->applicationId,
$algoliaService->applicationId
)
);
}
}

/**
Expand Down

0 comments on commit a3dcbed

Please sign in to comment.