Skip to content

Commit

Permalink
Merge pull request #43 from slub/21-copy-index-on-full-sync
Browse files Browse the repository at this point in the history
add alias handling and atomic swap and remove of old indexes
  • Loading branch information
dikastes authored Dec 4, 2024
2 parents bd3ab8e + 6537123 commit 840b887
Showing 1 changed file with 168 additions and 42 deletions.
210 changes: 168 additions & 42 deletions Classes/Command/IndexCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,31 @@
class IndexCommand extends Command
{

protected ZoteroApi $bibApi;
const API_TRIALS = 3;

protected string $apiKey;
protected string $zoteroApiKey;
protected Collection $bibliographyItems;
protected Collection $deletedItems;
protected Collection $teiDataSets;
protected Collection $dataSets;
protected int $bulkSize;
protected Client $client;
protected Collection $dataSets;
protected Collection $deletedItems;
protected array $extConf;
readonly string $indexName;
protected InputInterface $input;
protected SymfonyStyle $io;
protected int $bulkSize;
protected int $total;
protected Collection $locales;
protected Collection $localizedCitations;
protected OutputInterface $output;
protected Collection $teiDataSets;
protected int $total;

public function __construct(
private readonly SiteFinder $siteFinder,
private readonly LoggerInterface $logger
) {
parent::__construct();
$this->extConf = GeneralUtility::makeInstance(ExtensionConfiguration::class)->get('liszt_bibliography');
$this->indexName = $this->extConf['elasticIndexName'] . '_' . date('Ymd_His');
$this->initLocales();
}

Expand Down Expand Up @@ -100,20 +104,21 @@ protected function configure(): void

protected function initialize(InputInterface $input, OutputInterface $output): void
{
$this->extConf = GeneralUtility::makeInstance(ExtensionConfiguration::class)->get('liszt_bibliography');
$this->input = $input;
$this->output = $output;
$this->client = ElasticClientBuilder::getClient();
$this->apiKey = $this->extConf['zoteroApiKey'];
$this->io = new SymfonyStyle($input, $output);
$this->zoteroApiKey = $this->extConf['zoteroApiKey'];
$this->io = GeneralUtility::makeInstance(SymfonyStyle::class, $this->input, $this->output);
$this->io->title($this->getDescription());
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->bulkSize = (int) $this->extConf['zoteroBulkSize'];
$version = $this->getVersion($input);
$version = $this->getVersion();
if ($version == 0) {
$this->io->text('Full data synchronization requested.');
$this->fullSync($input);
$this->fullSync();
$this->logger->info('Full data synchronization successful.');
} else {
$this->io->text('Synchronizing all data from version ' . $version);
Expand All @@ -123,17 +128,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return Command::SUCCESS;
}

protected function fullSync(InputInterface $input): void
protected function fullSync(): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
top()->
limit(1)->
send();
if ($input->getOption('total')) {
$this->total = (int) $input->getOption('total');
if ($this->input->getOption('total')) {
$this->total = (int) $this->input->getOption('total');
} else {
$this->total = (int) $response->getHeaders()['Total-Results'][0];
}
Expand All @@ -143,33 +148,45 @@ protected function fullSync(InputInterface $input): void
$collection = new Collection($response->getBody());
$this->bibliographyItems = $collection->pluck('data');
$cursor = 0; // set Cursor to 0, not to bulk size
$index = $this->extConf['elasticIndexName'];
$mappingParams = BibElasticMapping::getMappingParams($index);
// we are working with alias names to swap indexes from zotero_temp to zotero after successfully indexing
$tempIndexAlias = $this->extConf['elasticIndexName'].'_temp';
$tempIndexParams = BibElasticMapping::getMappingParams($this->indexName);

// add alias name 'zotero_temp to this index
// and add a wildcard alias to find all zotero_* indices with the alias zotero-index
$aliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->extConf['elasticIndexName'].'_*',
'alias' => $this->extConf['elasticIndexName'].'-index',
],
]
]
]
];

try {
// in older Elasticsearch versions (until 7) exists returns a bool
if ($this->client->indices()->exists(['index' => $index])) {
$this->client->indices()->delete(['index' => $index]);
$this->client->indices()->create($mappingParams);
}
$this->client->indices()->create($tempIndexParams);
$this->client->indices()->updateAliases($aliasParams);
} catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Index: " . $index . " does not exist. Trying to create new index.");
$this->client->indices()->create($mappingParams);
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error creating elasticsearch index.');
throw new \Exception('Bibliography sync unsuccessful.');
}
}

$apiCounter = self::API_TRIALS;

while ($cursor < $this->total) {
try {
$this->sync($cursor, 0);

$apiCounter = self::API_TRIALS;
$remainingItems = $this->total - $cursor;
$advanceBy = min($remainingItems, $this->bulkSize);
Expand All @@ -188,6 +205,11 @@ protected function fullSync(InputInterface $input): void
}
}
}

// swap alias for index from zotero_temp to zotero and remove old indexes (keep the last one)
$this->swapIndexAliases($tempIndexAlias);
//delete old indexes
$this->deleteOldIndexes();
$this->io->progressFinish();
}

Expand Down Expand Up @@ -223,22 +245,22 @@ protected function sync(int $cursor = 0, int $version = 0): void
$this->commitBibliography();
}

protected function getVersion(InputInterface $input): int
protected function getVersion(): int
{
// if -a is specified, perfom a full update
if ($input->getOption('all')) {
if ($this->input->getOption('all')) {
return 0;
}

// also set version to 0 for dev tests if the total results are limited
if ($input->getOption('total')) {
$this->io->text('Total results limited to: '. $input->getOption('total'));
if ($this->input->getOption('total')) {
$this->io->text('Total results limited to: '. $this->input->getOption('total'));
return 0;
}


// if a version is manually specified, perform sync from this version
$argumentVersion = $input->getArgument('version');
$argumentVersion = $this->input->getArgument('version');
if ($argumentVersion > 0) {
return (int) $argumentVersion;
}
Expand Down Expand Up @@ -275,7 +297,7 @@ protected function getVersion(InputInterface $input): int

protected function fetchBibliography(int $cursor, int $version): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
Expand All @@ -291,7 +313,6 @@ protected function fetchBibliography(int $cursor, int $version): void

protected function fetchCitations(int $cursor, int $version): void
{
$this->localizedCitations = new Collection();
$this->locales->each(function($locale) use($cursor, $version) { $this->fetchCitationLocale($locale, $cursor, $version); });
}

Expand Down Expand Up @@ -320,7 +341,7 @@ protected function fetchCitationLocale(string $locale, int $cursor, int $version

protected function fetchTeiData(int $cursor, int $version): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
Expand Down Expand Up @@ -350,15 +371,12 @@ protected function commitBibliography(): void
$this->io->text('no new bibliographic entries');
return;
}
$index = $this->extConf['elasticIndexName'];

$params = [ 'body' => [] ];

$bulkCount = 0;
foreach ($this->dataSets as $document) {
$params['body'][] = [ 'index' =>
[
'_index' => $index,
'_index' => $this->indexName,
'_id' => $document['key']
]
];
Expand All @@ -372,6 +390,114 @@ protected function commitBibliography(): void
$this->client->bulk($params);
}

protected function swapIndexAliases(string $tempIndexAlias): void
{
// get index with alias = zotero
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName']]);
$aliasesArray = $aliasesRequest->asArray();

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note('Remove alias "' .$this->extConf['elasticIndexName']. '" from index '. $index . ' and add it to ' . $this->indexName );
// get index name with alias 'zotero'
if (array_key_exists($this->extConf['elasticIndexName'], $aliasArray['aliases'])) {
//swap alias from old to new index
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $index,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'add' => [
'index' => $this->indexName,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'remove' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);
}
}
}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Alias: " . $this->extConf['elasticIndexName'] . " does not exist. Move alias to ".$this->indexName);
// rename alias name from temp index to zotero
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);

} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName']);
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}
}

protected function deleteOldIndexes(): void
{
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName'].'-index']);
$aliasesArray = $aliasesRequest->asArray();

// sort $aliasesArray by key name
ksort($aliasesArray);

// remove current key $indexName from array
unset($aliasesArray[$this->indexName]);

// remove the last key (we keep the last two indexes)
array_pop($aliasesArray);

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note("Delete index " . $index);
$this->client->indices()->delete(['index' => $index]);
}

}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Nothing to remove, there are no indexes with alias " . $this->extConf['elasticIndexName'].'-index');
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName'].'-index');
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}


}

/* protected function commitLocales(): void
{
$localeIndex = $this->extConf['elasticLocaleIndexName'];
Expand Down

0 comments on commit 840b887

Please sign in to comment.