Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add alias handling and atomic swap and remove of old indexes #43

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 168 additions & 42 deletions Classes/Command/IndexCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,31 @@
class IndexCommand extends Command
{

protected ZoteroApi $bibApi;
const API_TRIALS = 3;

protected string $apiKey;
protected string $zoteroApiKey;
protected Collection $bibliographyItems;
protected Collection $deletedItems;
protected Collection $teiDataSets;
protected Collection $dataSets;
protected int $bulkSize;
protected Client $client;
protected Collection $dataSets;
protected Collection $deletedItems;
protected array $extConf;
readonly string $indexName;
protected InputInterface $input;
protected SymfonyStyle $io;
protected int $bulkSize;
protected int $total;
protected Collection $locales;
protected Collection $localizedCitations;
protected OutputInterface $output;
protected Collection $teiDataSets;
protected int $total;

public function __construct(
private readonly SiteFinder $siteFinder,
private readonly LoggerInterface $logger
) {
parent::__construct();
$this->extConf = GeneralUtility::makeInstance(ExtensionConfiguration::class)->get('liszt_bibliography');
$this->indexName = $this->extConf['elasticIndexName'] . '_' . date('Ymd_His');
$this->initLocales();
}

Expand Down Expand Up @@ -100,20 +104,21 @@ protected function configure(): void

protected function initialize(InputInterface $input, OutputInterface $output): void
{
$this->extConf = GeneralUtility::makeInstance(ExtensionConfiguration::class)->get('liszt_bibliography');
$this->input = $input;
$this->output = $output;
$this->client = ElasticClientBuilder::getClient();
$this->apiKey = $this->extConf['zoteroApiKey'];
$this->io = new SymfonyStyle($input, $output);
$this->zoteroApiKey = $this->extConf['zoteroApiKey'];
$this->io = GeneralUtility::makeInstance(SymfonyStyle::class, $this->input, $this->output);
$this->io->title($this->getDescription());
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->bulkSize = (int) $this->extConf['zoteroBulkSize'];
$version = $this->getVersion($input);
$version = $this->getVersion();
if ($version == 0) {
$this->io->text('Full data synchronization requested.');
$this->fullSync($input);
$this->fullSync();
$this->logger->info('Full data synchronization successful.');
} else {
$this->io->text('Synchronizing all data from version ' . $version);
Expand All @@ -123,17 +128,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return Command::SUCCESS;
}

protected function fullSync(InputInterface $input): void
protected function fullSync(): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
top()->
limit(1)->
send();
if ($input->getOption('total')) {
$this->total = (int) $input->getOption('total');
if ($this->input->getOption('total')) {
$this->total = (int) $this->input->getOption('total');
} else {
$this->total = (int) $response->getHeaders()['Total-Results'][0];
}
Expand All @@ -143,33 +148,45 @@ protected function fullSync(InputInterface $input): void
$collection = new Collection($response->getBody());
$this->bibliographyItems = $collection->pluck('data');
$cursor = 0; // set Cursor to 0, not to bulk size
$index = $this->extConf['elasticIndexName'];
$mappingParams = BibElasticMapping::getMappingParams($index);
// we are working with alias names to swap indexes from zotero_temp to zotero after successfully indexing
$tempIndexAlias = $this->extConf['elasticIndexName'].'_temp';
$tempIndexParams = BibElasticMapping::getMappingParams($this->indexName);

// add alias name 'zotero_temp to this index
// and add a wildcard alias to find all zotero_* indices with the alias zotero-index
$aliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->extConf['elasticIndexName'].'_*',
'alias' => $this->extConf['elasticIndexName'].'-index',
],
]
]
]
];

try {
// in older Elasticsearch versions (until 7) exists returns a bool
if ($this->client->indices()->exists(['index' => $index])) {
$this->client->indices()->delete(['index' => $index]);
$this->client->indices()->create($mappingParams);
}
$this->client->indices()->create($tempIndexParams);
$this->client->indices()->updateAliases($aliasParams);
} catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Index: " . $index . " does not exist. Trying to create new index.");
$this->client->indices()->create($mappingParams);
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error creating elasticsearch index.');
throw new \Exception('Bibliography sync unsuccessful.');
}
}

$apiCounter = self::API_TRIALS;

while ($cursor < $this->total) {
try {
$this->sync($cursor, 0);

$apiCounter = self::API_TRIALS;
$remainingItems = $this->total - $cursor;
$advanceBy = min($remainingItems, $this->bulkSize);
Expand All @@ -188,6 +205,11 @@ protected function fullSync(InputInterface $input): void
}
}
}

// swap alias for index from zotero_temp to zotero and remove old indexes (keep the last one)
$this->swapIndexAliases($tempIndexAlias);
//delete old indexes
$this->deleteOldIndexes();
$this->io->progressFinish();
}

Expand Down Expand Up @@ -223,22 +245,22 @@ protected function sync(int $cursor = 0, int $version = 0): void
$this->commitBibliography();
}

protected function getVersion(InputInterface $input): int
protected function getVersion(): int
{
// if -a is specified, perfom a full update
if ($input->getOption('all')) {
if ($this->input->getOption('all')) {
return 0;
}

// also set version to 0 for dev tests if the total results are limited
if ($input->getOption('total')) {
$this->io->text('Total results limited to: '. $input->getOption('total'));
if ($this->input->getOption('total')) {
$this->io->text('Total results limited to: '. $this->input->getOption('total'));
return 0;
}


// if a version is manually specified, perform sync from this version
$argumentVersion = $input->getArgument('version');
$argumentVersion = $this->input->getArgument('version');
if ($argumentVersion > 0) {
return (int) $argumentVersion;
}
Expand Down Expand Up @@ -275,7 +297,7 @@ protected function getVersion(InputInterface $input): int

protected function fetchBibliography(int $cursor, int $version): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
Expand All @@ -291,7 +313,6 @@ protected function fetchBibliography(int $cursor, int $version): void

protected function fetchCitations(int $cursor, int $version): void
{
$this->localizedCitations = new Collection();
$this->locales->each(function($locale) use($cursor, $version) { $this->fetchCitationLocale($locale, $cursor, $version); });
}

Expand Down Expand Up @@ -320,7 +341,7 @@ protected function fetchCitationLocale(string $locale, int $cursor, int $version

protected function fetchTeiData(int $cursor, int $version): void
{
$client = new ZoteroApi($this->extConf['zoteroApiKey']);
$client = GeneralUtility::makeInstance(ZoteroApi::class, $this->zoteroApiKey);
$response = $client->
group($this->extConf['zoteroGroupId'])->
items()->
Expand Down Expand Up @@ -350,15 +371,12 @@ protected function commitBibliography(): void
$this->io->text('no new bibliographic entries');
return;
}
$index = $this->extConf['elasticIndexName'];

$params = [ 'body' => [] ];

$bulkCount = 0;
foreach ($this->dataSets as $document) {
$params['body'][] = [ 'index' =>
[
'_index' => $index,
'_index' => $this->indexName,
'_id' => $document['key']
]
];
Expand All @@ -372,6 +390,114 @@ protected function commitBibliography(): void
$this->client->bulk($params);
}

protected function swapIndexAliases(string $tempIndexAlias): void
{
// get index with alias = zotero
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName']]);
$aliasesArray = $aliasesRequest->asArray();

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note('Remove alias "' .$this->extConf['elasticIndexName']. '" from index '. $index . ' and add it to ' . $this->indexName );
// get index name with alias 'zotero'
if (array_key_exists($this->extConf['elasticIndexName'], $aliasArray['aliases'])) {
//swap alias from old to new index
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $index,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'add' => [
'index' => $this->indexName,
'alias' => $this->extConf['elasticIndexName'],
],
],
[
'remove' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);
}
}
}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Alias: " . $this->extConf['elasticIndexName'] . " does not exist. Move alias to ".$this->indexName);
// rename alias name from temp index to zotero
$aliasParams = [
'body' => [
'actions' => [
[
'remove' => [
'index' => $this->indexName,
'alias' => $tempIndexAlias,
],
],
[
'add' => [
'index' => $this->indexName,
'alias' => $this->extConf['elasticIndexName'],
],
]
]
]
];
$this->client->indices()->updateAliases($aliasParams);

} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName']);
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}
}

protected function deleteOldIndexes(): void
{
try {
$aliasesRequest = $this->client->indices()->getAlias(['name' => $this->extConf['elasticIndexName'].'-index']);
$aliasesArray = $aliasesRequest->asArray();

// sort $aliasesArray by key name
ksort($aliasesArray);

// remove current key $indexName from array
unset($aliasesArray[$this->indexName]);

// remove the last key (we keep the last two indexes)
array_pop($aliasesArray);

foreach ($aliasesArray as $index => $aliasArray) {
$this->io->note("Delete index " . $index);
$this->client->indices()->delete(['index' => $index]);
}

}
catch (\Exception $e) {
// other versions return a Message object
if ($e->getCode() === 404) {
$this->io->note("Nothing to remove, there are no indexes with alias " . $this->extConf['elasticIndexName'].'-index');
} else {
$this->io->error("Exception: " . $e->getMessage());
$this->logger->error('Bibliography sync unsuccessful. Error getting alias: ' . $this->extConf['elasticIndexName'].'-index');
throw new \Exception('Bibliography sync unsuccessful.', 0, $e);
}
}


}

/* protected function commitLocales(): void
{
$localeIndex = $this->extConf['elasticLocaleIndexName'];
Expand Down
Loading