From 769671f9efa3832b1afc6fe1f91f0904a01560b7 Mon Sep 17 00:00:00 2001 From: Mirko Zizzari Date: Thu, 7 Sep 2023 14:30:31 +0200 Subject: [PATCH] Issue #557: - getIndexTemplate async - handle 404 Index Template not found --- .../datasource/listener/JobScheduler.java | 98 +++++++++++-------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java index da69b79a8..4354add0e 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobScheduler.java @@ -25,9 +25,12 @@ import io.openk9.datasource.sql.TransactionInvoker; import io.openk9.datasource.util.ActorActionListener; import io.openk9.datasource.util.CborSerializable; +import io.openk9.datasource.util.UniActionListener; import io.vavr.Function3; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.IndicesClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; @@ -36,6 +39,7 @@ import org.elasticsearch.client.indices.PutComposableIndexTemplateRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -561,49 +565,59 @@ private static Behavior onCopyIndexTemplate( IndicesClient indices = restHighLevelClient.indices(); - try { - - GetComposableIndexTemplatesResponse indexTemplate = indices.getIndexTemplate( - new GetComposableIndexTemplateRequest(oldDataIndex.getName() + "-template"), - RequestOptions.DEFAULT); - - for (ComposableIndexTemplate composableIndexTemplate : indexTemplate.getIndexTemplates().values()) { - - PutComposableIndexTemplateRequest request = - new PutComposableIndexTemplateRequest(); - - Template template = composableIndexTemplate.template(); - - ComposableIndexTemplate newComposableIndexTemplate = - new ComposableIndexTemplate( - List.of(newDataIndexName), - new Template( - template.settings(), - template.mappings(), - template.aliases() - ), - composableIndexTemplate.composedOf(), - composableIndexTemplate.priority(), - composableIndexTemplate.version(), - composableIndexTemplate.metadata() - ); - - request - .name(newDataIndexName + "-template") - .indexTemplate(newComposableIndexTemplate); - - indices.putIndexTemplateAsync( - request, - RequestOptions.DEFAULT, - ActorActionListener.of(ctx.getSelf(), (r, t) -> - new PersistSchedulerInternal(tenantName, scheduler, t)) - ); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - + indices.getIndexTemplateAsync( + new GetComposableIndexTemplateRequest(oldDataIndex.getName() + "-template"), + RequestOptions.DEFAULT, new ActionListener<>() { + @Override + public void onResponse(GetComposableIndexTemplatesResponse indexTemplate) { + for (ComposableIndexTemplate composableIndexTemplate : indexTemplate.getIndexTemplates().values()) { + + PutComposableIndexTemplateRequest request = + new PutComposableIndexTemplateRequest(); + + Template template = composableIndexTemplate.template(); + + ComposableIndexTemplate newComposableIndexTemplate = + new ComposableIndexTemplate( + List.of(newDataIndexName), + new Template( + template.settings(), + template.mappings(), + template.aliases() + ), + composableIndexTemplate.composedOf(), + composableIndexTemplate.priority(), + composableIndexTemplate.version(), + composableIndexTemplate.metadata() + ); + + request + .name(newDataIndexName + "-template") + .indexTemplate(newComposableIndexTemplate); + + indices.putIndexTemplateAsync( + request, + RequestOptions.DEFAULT, + ActorActionListener.of(ctx.getSelf(), (r, t) -> + new PersistSchedulerInternal(tenantName, scheduler, t)) + ); + } + } + @Override + public void onFailure(Exception e) { + if (e instanceof ElasticsearchStatusException + && ((ElasticsearchStatusException)e).status() == RestStatus.NOT_FOUND) { + log.warn("Cannot Copy Index Template", e); + ctx.getSelf().tell( + new PersistSchedulerInternal(tenantName, scheduler, null)); + } + else { + ctx.getSelf().tell( + new PersistSchedulerInternal(null, null, e)); + } + } + }); return Behaviors.same(); }