Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add alias handling and atomic swap and remove of old indexes #43

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 140 additions & 23 deletions Classes/Command/IndexCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,33 +143,46 @@ protected function fullSync(InputInterface $input): void
$collection = new Collection($response->getBody());
$this->bibliographyItems = $collection->pluck('data');
$cursor = 0; // set Cursor to 0, not to bulk size
$index = $this->extConf['elasticIndexName'];
$mappingParams = BibElasticMapping::getMappingParams($index);
// we are working with alias names to swap indexes from zotero_temp to zotero after successfully indexing
$tempIndexAlias = $this->extConf['elasticIndexName'].'_temp';
$indexName = $this->extConf['elasticIndexName'] . '_' . date('Ymd_His');
$tempIndexParams = BibElasticMapping::getMappingParams($indexName);

// add alias name 'zotero_temp to this index
// and add a wildcard alias to find all zotero_* indices with the alias zotero-index
$aliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->extConf['elasticIndexName'].'_*',
'alias' => $this->extConf['elasticIndexName'].'-index',
],
]
]
]
];

try {
// in older Elasticsearch versions (until 7) exists returns a bool
if ($this->client->indices()->exists(['index' => $index])) {
$this->client->indices()->delete(['index' => $index]);
$this->client->indices()->create($mappingParams);
}
$this->client->indices()->create($tempIndexParams);
$this->client->indices()->updateAliases($aliasParams);
} catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Index: " . $index . " does not exist. Trying to create new index.");
$this->client->indices()->create($mappingParams);
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error creating elasticsearch index.');
throw new \Exception('Bibliography sync unsuccessful.');
}
}

$apiCounter = self::API_TRIALS;

while ($cursor < $this->total) {
try {
$this->sync($cursor, 0);

$this->sync($indexName, $cursor, 0);
$apiCounter = self::API_TRIALS;
$remainingItems = $this->total - $cursor;
$advanceBy = min($remainingItems, $this->bulkSize);
Expand All @@ -188,6 +201,11 @@ protected function fullSync(InputInterface $input): void
}
}
}

// swap alias for index from zotero_temp to zotero and remove old indexes (keep the last one)
$this->swapIndexAliases($indexName, $tempIndexAlias);
//delete old indexes
$this->deleteOldIndexes($indexName);
$this->io->progressFinish();
}

Expand All @@ -196,7 +214,7 @@ protected function versionedSync(int $version): void
$apiCounter = self::API_TRIALS;
while (true) {
try {
$this->sync(0, $version);
$this->sync( $this->extConf['elasticIndexName'], 0, $version);
$this->io->text('done');
return;
} catch (\Exception $e) {
Expand All @@ -214,13 +232,13 @@ protected function versionedSync(int $version): void
}
}

protected function sync(int $cursor = 0, int $version = 0): void
protected function sync(string $indexName, int $cursor = 0, int $version = 0,): void
{
$this->fetchBibliography($cursor, $version);
$this->fetchCitations($cursor, $version);
$this->fetchTeiData($cursor, $version);
$this->buildDataSets();
$this->commitBibliography();
$this->commitBibliography($indexName);
}

protected function getVersion(InputInterface $input): int
Expand Down Expand Up @@ -344,21 +362,18 @@ protected function buildDataSets(): void
});
}

protected function commitBibliography(): void
protected function commitBibliography(string $indexName): void
{
if ($this->dataSets->count() == 0) {
$this->io->text('no new bibliographic entries');
return;
}
$index = $this->extConf['elasticIndexName'];

$params = [ 'body' => [] ];

$bulkCount = 0;
foreach ($this->dataSets as $document) {
$params['body'][] = [ 'index' =>
[
'_index' => $index,
'_index' => $indexName,
'_id' => $document['key']
]
];
Expand All @@ -372,6 +387,108 @@ protected function commitBibliography(): void
$this->client->bulk($params);
}

protected function swapIndexAliases(string $indexName, string $tempIndexAlias): void
{
// get index with alias = zotero
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName']]);
$aliasesArray = $aliasesRequest->asArray();

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note('Remove alias "' .$this->extConf['elasticIndexName']. '" from index '. $index . 'and add it to ' . $indexName );
// get index name with alias 'zotero'
if (array_key_exists($this->extConf['elasticIndexName'], $aliasArray['aliases'])) {
//swap alias from old to new index
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $index,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'add' => [
'index' => $indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);
}
}
}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Alias: " . $this->extConf['elasticIndexName'] . " does not exist. Move alias to ".$indexName);
// rename alias name from temp index to zotero
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);

} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName']);
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}
}

protected function deleteOldIndexes($indexName): void
{
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName'].'-index']);
$aliasesArray = $aliasesRequest->asArray();

// sort $aliasesArray by key name
ksort($aliasesArray);

// remove current key $indexName from array
unset($aliasesArray[$indexName]);

// remove the last key (we keep the last two indexes)
array_pop($aliasesArray);

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note("Delete index " . $index);
$this->client->indices()->delete(['index' => $index]);
}

}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Nothing to remove, there are no indexes with alias " . $this->extConf['elasticIndexName'].'-index');
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName'].'-index');
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}


}

/* protected function commitLocales(): void
{
$localeIndex = $this->extConf['elasticLocaleIndexName'];
Expand Down
Loading