diff --git a/dependencies.gradle b/dependencies.gradle
index 65df86a3c..42753f3ad 100644
--- a/dependencies.gradle
+++ b/dependencies.gradle
@@ -61,4 +61,5 @@ ext {
revKafka = '2.6.0'
revMicrometer = '1.6.2'
revPrometheus = '0.9.0'
+ revElasticSearch7 = '7.17.13'
}
diff --git a/es7-persistence/README.md b/es7-persistence/README.md
new file mode 100644
index 000000000..85143daad
--- /dev/null
+++ b/es7-persistence/README.md
@@ -0,0 +1,95 @@
+# ES7 Persistence
+
+This module provides ES7 persistence when indexing workflows and tasks.
+
+### ES Breaking changes
+
+From ES6 to ES7 there were significant breaking changes which affected ES7-persistence module implementation.
+* Mapping type deprecation
+* Templates API
+* TransportClient deprecation
+
+More information can be found here: https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking-changes-7.0.html
+
+
+## Build
+
+1. In order to use the ES7, you must change the following files from ES6 to ES7:
+
+https://github.com/Netflix/conductor/blob/main/build.gradle
+https://github.com/Netflix/conductor/blob/main/server/src/main/resources/application.properties
+
+In file:
+
+- /build.gradle
+
+change ext['elasticsearch.version'] from revElasticSearch6 to revElasticSearch7
+
+
+In file:
+
+- /server/src/main/resources/application.properties
+
+change conductor.elasticsearch.version from 6 to 7
+
+Also you need to recreate dependencies.lock files with ES7 dependencies. To do that delete all dependencies.lock files and then run:
+
+```
+./gradlew generateLock updateLock saveLock
+```
+
+
+2. To use the ES7 for all modules include test-harness, you must change also the following files:
+
+https://github.com/Netflix/conductor/blob/main/test-harness/build.gradle
+https://github.com/Netflix/conductor/blob/main/test-harness/src/test/java/com/netflix/conductor/test/integration/AbstractEndToEndTest.java
+
+In file:
+
+- /test-harness/build.gradle
+
+* change module inclusion from 'es6-persistence' to 'es7-persistence'
+
+In file:
+
+- /test-harness/src/test/java/com/netflix/conductor/test/integration/AbstractEndToEndTest.java
+
+* change conductor.elasticsearch.version from 6 to 7
+* change DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss").withTag("6.8.12") to DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss").withTag("7.6.2")
+
+
+
+### Configuration
+(Default values shown below)
+
+This module uses the following configuration options:
+```properties
+# A comma separated list of schema/host/port of the ES nodes to communicate with.
+# Schema can be `http` or `https`. If schema is ignored then `http` transport will be used;
+# Since ES deprecated TransportClient, conductor will use only the REST transport protocol.
+conductor.elasticsearch.url=
+
+#The name of the workflow and task index.
+conductor.elasticsearch.indexPrefix=conductor
+
+#Worker Queue size used in executor service for async methods in IndexDao.
+conductor.elasticsearch.asyncWorkerQueueSize=100
+
+#Maximum thread pool size in executor service for async methods in IndexDao
+conductor.elasticsearch.asyncMaxPoolSize=12
+
+#Timeout (in seconds) for the in-memory to be flushed if not explicitly indexed
+conductor.elasticsearch.asyncBufferFlushTimeout=10
+```
+
+
+### BASIC Authentication
+If you need to pass user/password to connect to ES, add the following properties to your config file
+* conductor.elasticsearch.username
+* conductor.elasticsearch.password
+
+Example
+```
+conductor.elasticsearch.username=someusername
+conductor.elasticsearch.password=somepassword
+```
diff --git a/es7-persistence/build.gradle b/es7-persistence/build.gradle
new file mode 100644
index 000000000..fdb652950
--- /dev/null
+++ b/es7-persistence/build.gradle
@@ -0,0 +1,54 @@
+plugins {
+ id 'com.github.johnrengelman.shadow' version '7.0.0'
+ id 'java'
+}
+
+configurations {
+ // Prevent shaded dependencies from being published, while keeping them available to tests
+ shadow.extendsFrom compileOnly
+ testRuntime.extendsFrom compileOnly
+}
+
+ext['elasticsearch.version'] = revElasticSearch7
+
+dependencies {
+
+ implementation project(':conductor-common')
+ implementation project(':conductor-core')
+ implementation project(':conductor-common-persistence')
+
+ compileOnly 'org.springframework.boot:spring-boot-starter'
+ compileOnly 'org.springframework.retry:spring-retry'
+
+ implementation "commons-io:commons-io:${revCommonsIo}"
+ implementation "org.apache.commons:commons-lang3"
+ implementation "com.google.guava:guava:${revGuava}"
+
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.core:jackson-core"
+
+ implementation "org.elasticsearch.client:elasticsearch-rest-client:${revElasticSearch7}"
+ implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:${revElasticSearch7}"
+
+ testImplementation "net.java.dev.jna:jna:5.7.0"
+ testImplementation "org.awaitility:awaitility:${revAwaitility}"
+ testImplementation "org.testcontainers:elasticsearch:${revTestContainer}"
+ testImplementation project(':conductor-test-util').sourceSets.test.output
+ testImplementation 'org.springframework.retry:spring-retry'
+
+}
+
+// Drop the classifier and delete jar task actions to replace the regular jar artifact with the shadow artifact
+shadowJar {
+ configurations = [project.configurations.shadow]
+ classifier = null
+
+ // Service files are not included by default.
+ mergeServiceFiles {
+ include 'META-INF/services/*'
+ include 'META-INF/maven/*'
+ }
+}
+
+jar.enabled = false
+jar.dependsOn shadowJar
diff --git a/es7-persistence/dependencies.lock b/es7-persistence/dependencies.lock
new file mode 100644
index 000000000..9411dbc82
--- /dev/null
+++ b/es7-persistence/dependencies.lock
@@ -0,0 +1,325 @@
+{
+ "annotationProcessor": {
+ "org.springframework.boot:spring-boot-configuration-processor": {
+ "locked": "2.7.16"
+ }
+ },
+ "compileClasspath": {
+ "com.fasterxml.jackson.core:jackson-core": {
+ "locked": "2.13.5"
+ },
+ "com.fasterxml.jackson.core:jackson-databind": {
+ "locked": "2.13.5"
+ },
+ "com.google.guava:guava": {
+ "locked": "32.1.2-jre"
+ },
+ "com.netflix.conductor:conductor-common": {
+ "locked": "3.15.0"
+ },
+ "com.netflix.conductor:conductor-common-persistence": {
+ "project": true
+ },
+ "com.netflix.conductor:conductor-core": {
+ "locked": "3.15.0"
+ },
+ "commons-io:commons-io": {
+ "locked": "2.7"
+ },
+ "org.apache.commons:commons-lang3": {
+ "locked": "3.12.0"
+ },
+ "org.apache.logging.log4j:log4j-api": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-core": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-jul": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-slf4j-impl": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-web": {
+ "locked": "2.17.2"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-client": {
+ "locked": "7.17.15"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-high-level-client": {
+ "locked": "7.17.15"
+ },
+ "org.springframework.boot:spring-boot-starter": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.retry:spring-retry": {
+ "locked": "1.3.4"
+ }
+ },
+ "runtimeClasspath": {
+ "com.fasterxml.jackson.core:jackson-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.13.5"
+ },
+ "com.fasterxml.jackson.core:jackson-databind": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.13.5"
+ },
+ "com.google.guava:guava": {
+ "locked": "32.1.2-jre"
+ },
+ "com.netflix.conductor:conductor-common": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.15.0"
+ },
+ "com.netflix.conductor:conductor-common-persistence": {
+ "project": true
+ },
+ "com.netflix.conductor:conductor-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.15.0"
+ },
+ "commons-io:commons-io": {
+ "locked": "2.7"
+ },
+ "org.apache.commons:commons-lang3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.12.0"
+ },
+ "org.apache.logging.log4j:log4j-api": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-jul": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-slf4j-impl": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-web": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-client": {
+ "locked": "7.17.15"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-high-level-client": {
+ "locked": "7.17.15"
+ }
+ },
+ "shadow": {
+ "org.springframework.boot:spring-boot-starter": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.retry:spring-retry": {
+ "locked": "1.3.4"
+ }
+ },
+ "testCompileClasspath": {
+ "com.fasterxml.jackson.core:jackson-core": {
+ "locked": "2.13.5"
+ },
+ "com.fasterxml.jackson.core:jackson-databind": {
+ "locked": "2.13.5"
+ },
+ "com.google.guava:guava": {
+ "locked": "32.1.2-jre"
+ },
+ "com.netflix.conductor:conductor-common": {
+ "locked": "3.15.0"
+ },
+ "com.netflix.conductor:conductor-common-persistence": {
+ "project": true
+ },
+ "com.netflix.conductor:conductor-core": {
+ "locked": "3.15.0"
+ },
+ "commons-io:commons-io": {
+ "locked": "2.7"
+ },
+ "junit:junit": {
+ "locked": "4.13.2"
+ },
+ "net.java.dev.jna:jna": {
+ "locked": "5.7.0"
+ },
+ "org.apache.commons:commons-lang3": {
+ "locked": "3.12.0"
+ },
+ "org.apache.logging.log4j:log4j-api": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-core": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-jul": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-slf4j-impl": {
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-web": {
+ "locked": "2.17.2"
+ },
+ "org.awaitility:awaitility": {
+ "locked": "3.1.6"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-client": {
+ "locked": "7.17.15"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-high-level-client": {
+ "locked": "7.17.15"
+ },
+ "org.junit.vintage:junit-vintage-engine": {
+ "locked": "5.8.2"
+ },
+ "org.springframework.boot:spring-boot-starter-log4j2": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.boot:spring-boot-starter-test": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.retry:spring-retry": {
+ "locked": "1.3.4"
+ },
+ "org.testcontainers:elasticsearch": {
+ "locked": "1.18.3"
+ }
+ },
+ "testRuntime": {
+ "org.springframework.boot:spring-boot-starter": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.retry:spring-retry": {
+ "locked": "1.3.4"
+ }
+ },
+ "testRuntimeClasspath": {
+ "com.fasterxml.jackson.core:jackson-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.13.5"
+ },
+ "com.fasterxml.jackson.core:jackson-databind": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.13.5"
+ },
+ "com.google.guava:guava": {
+ "locked": "32.1.2-jre"
+ },
+ "com.netflix.conductor:conductor-common": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.15.0"
+ },
+ "com.netflix.conductor:conductor-common-persistence": {
+ "project": true
+ },
+ "com.netflix.conductor:conductor-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.15.0"
+ },
+ "commons-io:commons-io": {
+ "locked": "2.7"
+ },
+ "junit:junit": {
+ "locked": "4.13.2"
+ },
+ "net.java.dev.jna:jna": {
+ "locked": "5.7.0"
+ },
+ "org.apache.commons:commons-lang3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "3.12.0"
+ },
+ "org.apache.logging.log4j:log4j-api": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-core": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-jul": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-slf4j-impl": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.apache.logging.log4j:log4j-web": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common-persistence"
+ ],
+ "locked": "2.17.2"
+ },
+ "org.awaitility:awaitility": {
+ "locked": "3.1.6"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-client": {
+ "locked": "7.17.15"
+ },
+ "org.elasticsearch.client:elasticsearch-rest-high-level-client": {
+ "locked": "7.17.15"
+ },
+ "org.junit.vintage:junit-vintage-engine": {
+ "locked": "5.8.2"
+ },
+ "org.springframework.boot:spring-boot-starter-log4j2": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.boot:spring-boot-starter-test": {
+ "locked": "2.7.16"
+ },
+ "org.springframework.retry:spring-retry": {
+ "locked": "1.3.4"
+ },
+ "org.testcontainers:elasticsearch": {
+ "locked": "1.18.3"
+ }
+ }
+}
\ No newline at end of file
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchConditions.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchConditions.java
new file mode 100644
index 000000000..9875f1776
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchConditions.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.config;
+
+import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+
+public class ElasticSearchConditions {
+
+ private ElasticSearchConditions() {}
+
+ public static class ElasticSearchV7Enabled extends AllNestedConditions {
+
+ ElasticSearchV7Enabled() {
+ super(ConfigurationPhase.PARSE_CONFIGURATION);
+ }
+
+ @SuppressWarnings("unused")
+ @ConditionalOnProperty(
+ name = "conductor.indexing.enabled",
+ havingValue = "true",
+ matchIfMissing = true)
+ static class enabledIndexing {}
+
+ @SuppressWarnings("unused")
+ @ConditionalOnProperty(
+ name = "conductor.elasticsearch.version",
+ havingValue = "7",
+ matchIfMissing = true)
+ static class enabledES7 {}
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchProperties.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchProperties.java
new file mode 100644
index 000000000..d4ed352a7
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchProperties.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.config;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.convert.DurationUnit;
+
+@ConfigurationProperties("conductor.elasticsearch")
+public class ElasticSearchProperties {
+
+ /**
+ * The comma separated list of urls for the elasticsearch cluster. Format --
+ * host1:port1,host2:port2
+ */
+ private String url = "localhost:9300";
+
+ /** The index prefix to be used when creating indices */
+ private String indexPrefix = "conductor";
+
+ /** The color of the elasticserach cluster to wait for to confirm healthy status */
+ private String clusterHealthColor = "green";
+
+ /** The size of the batch to be used for bulk indexing in async mode */
+ private int indexBatchSize = 1;
+
+ /** The size of the queue used for holding async indexing tasks */
+ private int asyncWorkerQueueSize = 100;
+
+ /** The maximum number of threads allowed in the async pool */
+ private int asyncMaxPoolSize = 12;
+
+ /**
+ * The time in seconds after which the async buffers will be flushed (if no activity) to prevent
+ * data loss
+ */
+ @DurationUnit(ChronoUnit.SECONDS)
+ private Duration asyncBufferFlushTimeout = Duration.ofSeconds(10);
+
+ /** The number of shards that the index will be created with */
+ private int indexShardCount = 5;
+
+ /** The number of replicas that the index will be configured to have */
+ private int indexReplicasCount = 1;
+
+ /** The number of task log results that will be returned in the response */
+ private int taskLogResultLimit = 10;
+
+ /** The timeout in milliseconds used when requesting a connection from the connection manager */
+ private int restClientConnectionRequestTimeout = -1;
+
+ /** Used to control if index management is to be enabled or will be controlled externally */
+ private boolean autoIndexManagementEnabled = true;
+
+ /**
+ * Document types are deprecated in ES6 and removed from ES7. This property can be used to
+ * disable the use of specific document types with an override. This property is currently used
+ * in ES6 module.
+ *
+ *
Note that this property will only take effect if {@link
+ * ElasticSearchProperties#isAutoIndexManagementEnabled} is set to false and index management is
+ * handled outside of this module.
+ */
+ private String documentTypeOverride = "";
+
+ /** Elasticsearch basic auth username */
+ private String username;
+
+ /** Elasticsearch basic auth password */
+ private String password;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getIndexPrefix() {
+ return indexPrefix;
+ }
+
+ public void setIndexPrefix(String indexPrefix) {
+ this.indexPrefix = indexPrefix;
+ }
+
+ public String getClusterHealthColor() {
+ return clusterHealthColor;
+ }
+
+ public void setClusterHealthColor(String clusterHealthColor) {
+ this.clusterHealthColor = clusterHealthColor;
+ }
+
+ public int getIndexBatchSize() {
+ return indexBatchSize;
+ }
+
+ public void setIndexBatchSize(int indexBatchSize) {
+ this.indexBatchSize = indexBatchSize;
+ }
+
+ public int getAsyncWorkerQueueSize() {
+ return asyncWorkerQueueSize;
+ }
+
+ public void setAsyncWorkerQueueSize(int asyncWorkerQueueSize) {
+ this.asyncWorkerQueueSize = asyncWorkerQueueSize;
+ }
+
+ public int getAsyncMaxPoolSize() {
+ return asyncMaxPoolSize;
+ }
+
+ public void setAsyncMaxPoolSize(int asyncMaxPoolSize) {
+ this.asyncMaxPoolSize = asyncMaxPoolSize;
+ }
+
+ public Duration getAsyncBufferFlushTimeout() {
+ return asyncBufferFlushTimeout;
+ }
+
+ public void setAsyncBufferFlushTimeout(Duration asyncBufferFlushTimeout) {
+ this.asyncBufferFlushTimeout = asyncBufferFlushTimeout;
+ }
+
+ public int getIndexShardCount() {
+ return indexShardCount;
+ }
+
+ public void setIndexShardCount(int indexShardCount) {
+ this.indexShardCount = indexShardCount;
+ }
+
+ public int getIndexReplicasCount() {
+ return indexReplicasCount;
+ }
+
+ public void setIndexReplicasCount(int indexReplicasCount) {
+ this.indexReplicasCount = indexReplicasCount;
+ }
+
+ public int getTaskLogResultLimit() {
+ return taskLogResultLimit;
+ }
+
+ public void setTaskLogResultLimit(int taskLogResultLimit) {
+ this.taskLogResultLimit = taskLogResultLimit;
+ }
+
+ public int getRestClientConnectionRequestTimeout() {
+ return restClientConnectionRequestTimeout;
+ }
+
+ public void setRestClientConnectionRequestTimeout(int restClientConnectionRequestTimeout) {
+ this.restClientConnectionRequestTimeout = restClientConnectionRequestTimeout;
+ }
+
+ public boolean isAutoIndexManagementEnabled() {
+ return autoIndexManagementEnabled;
+ }
+
+ public void setAutoIndexManagementEnabled(boolean autoIndexManagementEnabled) {
+ this.autoIndexManagementEnabled = autoIndexManagementEnabled;
+ }
+
+ public String getDocumentTypeOverride() {
+ return documentTypeOverride;
+ }
+
+ public void setDocumentTypeOverride(String documentTypeOverride) {
+ this.documentTypeOverride = documentTypeOverride;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public List toURLs() {
+ String clusterAddress = getUrl();
+ String[] hosts = clusterAddress.split(",");
+ return Arrays.stream(hosts)
+ .map(
+ host ->
+ (host.startsWith("http://") || host.startsWith("https://"))
+ ? toURL(host)
+ : toURL("http://" + host))
+ .collect(Collectors.toList());
+ }
+
+ private URL toURL(String url) {
+ try {
+ return new URL(url);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(url + "can not be converted to java.net.URL");
+ }
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchV7Configuration.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchV7Configuration.java
new file mode 100644
index 000000000..3bbdb0fab
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/config/ElasticSearchV7Configuration.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.config;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.retry.backoff.FixedBackOffPolicy;
+import org.springframework.retry.support.RetryTemplate;
+
+import com.netflix.conductor.dao.IndexDAO;
+import com.netflix.conductor.es7.dao.index.ElasticSearchRestDAOV7;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Configuration(proxyBeanMethods = false)
+@EnableConfigurationProperties(ElasticSearchProperties.class)
+@Conditional(ElasticSearchConditions.ElasticSearchV7Enabled.class)
+public class ElasticSearchV7Configuration {
+
+ private static final Logger log = LoggerFactory.getLogger(ElasticSearchV7Configuration.class);
+
+ @Bean
+ public RestClient restClient(ElasticSearchProperties properties) {
+ RestClientBuilder restClientBuilder =
+ RestClient.builder(convertToHttpHosts(properties.toURLs()));
+ if (properties.getRestClientConnectionRequestTimeout() > 0) {
+ restClientBuilder.setRequestConfigCallback(
+ requestConfigBuilder ->
+ requestConfigBuilder.setConnectionRequestTimeout(
+ properties.getRestClientConnectionRequestTimeout()));
+ }
+ return restClientBuilder.build();
+ }
+
+ @Bean
+ public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) {
+ RestClientBuilder builder = RestClient.builder(convertToHttpHosts(properties.toURLs()));
+
+ if (properties.getUsername() != null && properties.getPassword() != null) {
+ log.info(
+ "Configure ElasticSearch with BASIC authentication. User:{}",
+ properties.getUsername());
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(
+ properties.getUsername(), properties.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ } else {
+ log.info("Configure ElasticSearch with no authentication.");
+ }
+ return builder;
+ }
+
+ @Primary // If you are including this project, it's assumed you want ES to be your indexing
+ // mechanism
+ @Bean
+ public IndexDAO es7IndexDAO(
+ RestClientBuilder restClientBuilder,
+ @Qualifier("es7RetryTemplate") RetryTemplate retryTemplate,
+ ElasticSearchProperties properties,
+ ObjectMapper objectMapper) {
+ String url = properties.getUrl();
+ return new ElasticSearchRestDAOV7(
+ restClientBuilder, retryTemplate, properties, objectMapper);
+ }
+
+ @Bean
+ public RetryTemplate es7RetryTemplate() {
+ RetryTemplate retryTemplate = new RetryTemplate();
+ FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
+ fixedBackOffPolicy.setBackOffPeriod(1000L);
+ retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
+ return retryTemplate;
+ }
+
+ private HttpHost[] convertToHttpHosts(List hosts) {
+ return hosts.stream()
+ .map(host -> new HttpHost(host.getHost(), host.getPort(), host.getProtocol()))
+ .toArray(HttpHost[]::new);
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestBuilderWrapper.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestBuilderWrapper.java
new file mode 100644
index 000000000..a41f38568
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestBuilderWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.index;
+
+import java.util.Objects;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.springframework.lang.NonNull;
+
+/** Thread-safe wrapper for {@link BulkRequestBuilder}. */
+public class BulkRequestBuilderWrapper {
+ private final BulkRequestBuilder bulkRequestBuilder;
+
+ public BulkRequestBuilderWrapper(@NonNull BulkRequestBuilder bulkRequestBuilder) {
+ this.bulkRequestBuilder = Objects.requireNonNull(bulkRequestBuilder);
+ }
+
+ public void add(@NonNull UpdateRequest req) {
+ synchronized (bulkRequestBuilder) {
+ bulkRequestBuilder.add(Objects.requireNonNull(req));
+ }
+ }
+
+ public void add(@NonNull IndexRequest req) {
+ synchronized (bulkRequestBuilder) {
+ bulkRequestBuilder.add(Objects.requireNonNull(req));
+ }
+ }
+
+ public int numberOfActions() {
+ synchronized (bulkRequestBuilder) {
+ return bulkRequestBuilder.numberOfActions();
+ }
+ }
+
+ public ActionFuture execute() {
+ synchronized (bulkRequestBuilder) {
+ return bulkRequestBuilder.execute();
+ }
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestWrapper.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestWrapper.java
new file mode 100644
index 000000000..66d3a26b4
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/BulkRequestWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.index;
+
+import java.util.Objects;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.springframework.lang.NonNull;
+
+/** Thread-safe wrapper for {@link BulkRequest}. */
+class BulkRequestWrapper {
+ private final BulkRequest bulkRequest;
+
+ BulkRequestWrapper(@NonNull BulkRequest bulkRequest) {
+ this.bulkRequest = Objects.requireNonNull(bulkRequest);
+ }
+
+ public void add(@NonNull UpdateRequest req) {
+ synchronized (bulkRequest) {
+ bulkRequest.add(Objects.requireNonNull(req));
+ }
+ }
+
+ public void add(@NonNull IndexRequest req) {
+ synchronized (bulkRequest) {
+ bulkRequest.add(Objects.requireNonNull(req));
+ }
+ }
+
+ BulkRequest get() {
+ return bulkRequest;
+ }
+
+ int numberOfActions() {
+ synchronized (bulkRequest) {
+ return bulkRequest.numberOfActions();
+ }
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchBaseDAO.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchBaseDAO.java
new file mode 100644
index 000000000..6abb4c742
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchBaseDAO.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+
+import com.netflix.conductor.dao.IndexDAO;
+import com.netflix.conductor.es7.dao.query.parser.Expression;
+import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+abstract class ElasticSearchBaseDAO implements IndexDAO {
+
+ String indexPrefix;
+ ObjectMapper objectMapper;
+
+ String loadTypeMappingSource(String path) throws IOException {
+ return applyIndexPrefixToTemplate(
+ IOUtils.toString(ElasticSearchBaseDAO.class.getResourceAsStream(path)));
+ }
+
+ private String applyIndexPrefixToTemplate(String text) throws JsonProcessingException {
+ String indexPatternsFieldName = "index_patterns";
+ JsonNode root = objectMapper.readTree(text);
+ if (root != null) {
+ JsonNode indexPatternsNodeValue = root.get(indexPatternsFieldName);
+ if (indexPatternsNodeValue != null && indexPatternsNodeValue.isArray()) {
+ ArrayList patternsWithPrefix = new ArrayList<>();
+ indexPatternsNodeValue.forEach(
+ v -> {
+ String patternText = v.asText();
+ StringBuilder sb = new StringBuilder();
+ if (patternText.startsWith("*")) {
+ sb.append("*")
+ .append(indexPrefix)
+ .append("_")
+ .append(patternText.substring(1));
+ } else {
+ sb.append(indexPrefix).append("_").append(patternText);
+ }
+ patternsWithPrefix.add(sb.toString());
+ });
+ ((ObjectNode) root)
+ .set(indexPatternsFieldName, objectMapper.valueToTree(patternsWithPrefix));
+ System.out.println(
+ objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(root));
+ return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
+ }
+ }
+ return text;
+ }
+
+ BoolQueryBuilder boolQueryBuilder(String expression, String queryString)
+ throws ParserException {
+ QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
+ if (StringUtils.isNotEmpty(expression)) {
+ Expression exp = Expression.fromString(expression);
+ queryBuilder = exp.getFilterBuilder();
+ }
+ BoolQueryBuilder filterQuery = QueryBuilders.boolQuery().must(queryBuilder);
+ QueryStringQueryBuilder stringQuery = QueryBuilders.queryStringQuery(queryString);
+ return QueryBuilders.boolQuery().must(stringQuery).must(filterQuery);
+ }
+
+ protected String getIndexName(String documentType) {
+ return indexPrefix + "_" + documentType;
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java
new file mode 100644
index 000000000..05491872d
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java
@@ -0,0 +1,1344 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.index;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NByteArrayEntity;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.core.CountRequest;
+import org.elasticsearch.client.core.CountResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xcontent.XContentType;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.retry.support.RetryTemplate;
+
+import com.netflix.conductor.annotations.Trace;
+import com.netflix.conductor.common.metadata.events.EventExecution;
+import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
+import com.netflix.conductor.common.run.SearchResult;
+import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.core.events.queue.Message;
+import com.netflix.conductor.core.exception.NonTransientException;
+import com.netflix.conductor.core.exception.TransientException;
+import com.netflix.conductor.dao.IndexDAO;
+import com.netflix.conductor.es7.config.ElasticSearchProperties;
+import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
+import com.netflix.conductor.metrics.Monitors;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import jakarta.annotation.*;
+
+@Trace
+public class ElasticSearchRestDAOV7 extends ElasticSearchBaseDAO implements IndexDAO {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDAOV7.class);
+
+ private static final String CLASS_NAME = ElasticSearchRestDAOV7.class.getSimpleName();
+
+ private static final int CORE_POOL_SIZE = 6;
+ private static final long KEEP_ALIVE_TIME = 1L;
+
+ private static final String WORKFLOW_DOC_TYPE = "workflow";
+ private static final String TASK_DOC_TYPE = "task";
+ private static final String LOG_DOC_TYPE = "task_log";
+ private static final String EVENT_DOC_TYPE = "event";
+ private static final String MSG_DOC_TYPE = "message";
+
+ private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
+ private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMWW");
+
+ private @interface HttpMethod {
+
+ String GET = "GET";
+ String POST = "POST";
+ String PUT = "PUT";
+ String HEAD = "HEAD";
+ }
+
+ private static final String className = ElasticSearchRestDAOV7.class.getSimpleName();
+
+ private final String workflowIndexName;
+ private final String taskIndexName;
+ private final String eventIndexPrefix;
+ private String eventIndexName;
+ private final String messageIndexPrefix;
+ private String messageIndexName;
+ private String logIndexName;
+ private final String logIndexPrefix;
+
+ private final String clusterHealthColor;
+ private final RestHighLevelClient elasticSearchClient;
+ private final RestClient elasticSearchAdminClient;
+ private final ExecutorService executorService;
+ private final ExecutorService logExecutorService;
+ private final ConcurrentHashMap bulkRequests;
+ private final int indexBatchSize;
+ private final int asyncBufferFlushTimeout;
+ private final ElasticSearchProperties properties;
+ private final RetryTemplate retryTemplate;
+
+ static {
+ SIMPLE_DATE_FORMAT.setTimeZone(GMT);
+ }
+
+ public ElasticSearchRestDAOV7(
+ RestClientBuilder restClientBuilder,
+ RetryTemplate retryTemplate,
+ ElasticSearchProperties properties,
+ ObjectMapper objectMapper) {
+
+ this.objectMapper = objectMapper;
+ this.elasticSearchAdminClient = restClientBuilder.build();
+ this.elasticSearchClient = new RestHighLevelClient(restClientBuilder);
+ this.clusterHealthColor = properties.getClusterHealthColor();
+ this.bulkRequests = new ConcurrentHashMap<>();
+ this.indexBatchSize = properties.getIndexBatchSize();
+ this.asyncBufferFlushTimeout = (int) properties.getAsyncBufferFlushTimeout().getSeconds();
+ this.properties = properties;
+
+ this.indexPrefix = properties.getIndexPrefix();
+
+ this.workflowIndexName = getIndexName(WORKFLOW_DOC_TYPE);
+ this.taskIndexName = getIndexName(TASK_DOC_TYPE);
+ this.logIndexPrefix = this.indexPrefix + "_" + LOG_DOC_TYPE;
+ this.messageIndexPrefix = this.indexPrefix + "_" + MSG_DOC_TYPE;
+ this.eventIndexPrefix = this.indexPrefix + "_" + EVENT_DOC_TYPE;
+ int workerQueueSize = properties.getAsyncWorkerQueueSize();
+ int maximumPoolSize = properties.getAsyncMaxPoolSize();
+
+ // Set up a workerpool for performing async operations.
+ this.executorService =
+ new ThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ maximumPoolSize,
+ KEEP_ALIVE_TIME,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(workerQueueSize),
+ (runnable, executor) -> {
+ logger.warn(
+ "Request {} to async dao discarded in executor {}",
+ runnable,
+ executor);
+ Monitors.recordDiscardedIndexingCount("indexQueue");
+ });
+
+ // Set up a workerpool for performing async operations for task_logs, event_executions,
+ // message
+ int corePoolSize = 1;
+ maximumPoolSize = 2;
+ long keepAliveTime = 30L;
+ this.logExecutorService =
+ new ThreadPoolExecutor(
+ corePoolSize,
+ maximumPoolSize,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(workerQueueSize),
+ (runnable, executor) -> {
+ logger.warn(
+ "Request {} to async log dao discarded in executor {}",
+ runnable,
+ executor);
+ Monitors.recordDiscardedIndexingCount("logQueue");
+ });
+
+ Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(this::flushBulkRequests, 60, 30, TimeUnit.SECONDS);
+ this.retryTemplate = retryTemplate;
+ }
+
+ @PreDestroy
+ private void shutdown() {
+ logger.info("Gracefully shutdown executor service");
+ shutdownExecutorService(logExecutorService);
+ shutdownExecutorService(executorService);
+ }
+
+ private void shutdownExecutorService(ExecutorService execService) {
+ try {
+ execService.shutdown();
+ if (execService.awaitTermination(30, TimeUnit.SECONDS)) {
+ logger.debug("tasks completed, shutting down");
+ } else {
+ logger.warn("Forcing shutdown after waiting for 30 seconds");
+ execService.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ logger.warn(
+ "Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue");
+ execService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ @PostConstruct
+ public void setup() throws Exception {
+ waitForHealthyCluster();
+
+ if (properties.isAutoIndexManagementEnabled()) {
+ createIndexesTemplates();
+ createWorkflowIndex();
+ createTaskIndex();
+ }
+ }
+
+ private void createIndexesTemplates() {
+ try {
+ initIndexesTemplates();
+ updateIndexesNames();
+ Executors.newScheduledThreadPool(1)
+ .scheduleAtFixedRate(this::updateIndexesNames, 0, 1, TimeUnit.HOURS);
+ } catch (Exception e) {
+ logger.error("Error creating index templates!", e);
+ }
+ }
+
+ private void initIndexesTemplates() {
+ initIndexTemplate(LOG_DOC_TYPE);
+ initIndexTemplate(EVENT_DOC_TYPE);
+ initIndexTemplate(MSG_DOC_TYPE);
+ }
+
+ /** Initializes the index with the required templates and mappings. */
+ private void initIndexTemplate(String type) {
+ String template = "template_" + type;
+ try {
+ if (doesResourceNotExist("/_template/" + template)) {
+ logger.info("Creating the index template '" + template + "'");
+ InputStream stream =
+ ElasticSearchRestDAOV7.class.getResourceAsStream("/" + template + ".json");
+ byte[] templateSource = IOUtils.toByteArray(stream);
+
+ HttpEntity entity =
+ new NByteArrayEntity(templateSource, ContentType.APPLICATION_JSON);
+ Request request = new Request(HttpMethod.PUT, "/_template/" + template);
+ request.setEntity(entity);
+ String test =
+ IOUtils.toString(
+ elasticSearchAdminClient
+ .performRequest(request)
+ .getEntity()
+ .getContent());
+ }
+ } catch (Exception e) {
+ logger.error("Failed to init " + template, e);
+ }
+ }
+
+ private void updateIndexesNames() {
+ logIndexName = updateIndexName(LOG_DOC_TYPE);
+ eventIndexName = updateIndexName(EVENT_DOC_TYPE);
+ messageIndexName = updateIndexName(MSG_DOC_TYPE);
+ }
+
+ private String updateIndexName(String type) {
+ String indexName =
+ this.indexPrefix + "_" + type + "_" + SIMPLE_DATE_FORMAT.format(new Date());
+ try {
+ addIndex(indexName);
+ return indexName;
+ } catch (IOException e) {
+ logger.error("Failed to update log index name: {}", indexName, e);
+ throw new NonTransientException(e.getMessage(), e);
+ }
+ }
+
+ private void createWorkflowIndex() {
+ String indexName = getIndexName(WORKFLOW_DOC_TYPE);
+ try {
+ addIndex(indexName, "/mappings_docType_workflow.json");
+ } catch (IOException e) {
+ logger.error("Failed to initialize index '{}'", indexName, e);
+ }
+ }
+
+ private void createTaskIndex() {
+ String indexName = getIndexName(TASK_DOC_TYPE);
+ try {
+ addIndex(indexName, "/mappings_docType_task.json");
+ } catch (IOException e) {
+ logger.error("Failed to initialize index '{}'", indexName, e);
+ }
+ }
+
+ /**
+ * Waits for the ES cluster to become green.
+ *
+ * @throws Exception If there is an issue connecting with the ES cluster.
+ */
+ private void waitForHealthyCluster() throws Exception {
+ Map params = new HashMap<>();
+ params.put("wait_for_status", this.clusterHealthColor);
+ params.put("timeout", "30s");
+ Request request = new Request("GET", "/_cluster/health");
+ request.addParameters(params);
+ elasticSearchAdminClient.performRequest(request);
+ }
+
+ /**
+ * Adds an index to elasticsearch if it does not exist.
+ *
+ * @param index The name of the index to create.
+ * @param mappingFilename Index mapping filename
+ * @throws IOException If an error occurred during requests to ES.
+ */
+ private void addIndex(String index, final String mappingFilename) throws IOException {
+ logger.info("Adding index '{}'...", index);
+ String resourcePath = "/" + index;
+ if (doesResourceNotExist(resourcePath)) {
+ try {
+ ObjectNode setting = objectMapper.createObjectNode();
+ ObjectNode indexSetting = objectMapper.createObjectNode();
+ ObjectNode root = objectMapper.createObjectNode();
+ indexSetting.put("number_of_shards", properties.getIndexShardCount());
+ indexSetting.put("number_of_replicas", properties.getIndexReplicasCount());
+ JsonNode mappingNodeValue =
+ objectMapper.readTree(loadTypeMappingSource(mappingFilename));
+ root.set("settings", indexSetting);
+ root.set("mappings", mappingNodeValue);
+ Request request = new Request(HttpMethod.PUT, resourcePath);
+ request.setEntity(
+ new NStringEntity(
+ objectMapper.writeValueAsString(root),
+ ContentType.APPLICATION_JSON));
+ elasticSearchAdminClient.performRequest(request);
+ logger.info("Added '{}' index", index);
+ } catch (ResponseException e) {
+
+ boolean errorCreatingIndex = true;
+
+ Response errorResponse = e.getResponse();
+ if (errorResponse.getStatusLine().getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
+ JsonNode root =
+ objectMapper.readTree(EntityUtils.toString(errorResponse.getEntity()));
+ String errorCode = root.get("error").get("type").asText();
+ if ("index_already_exists_exception".equals(errorCode)) {
+ errorCreatingIndex = false;
+ }
+ }
+
+ if (errorCreatingIndex) {
+ throw e;
+ }
+ }
+ } else {
+ logger.info("Index '{}' already exists", index);
+ }
+ }
+
+ /**
+ * Adds an index to elasticsearch if it does not exist.
+ *
+ * @param index The name of the index to create.
+ * @throws IOException If an error occurred during requests to ES.
+ */
+ private void addIndex(final String index) throws IOException {
+
+ logger.info("Adding index '{}'...", index);
+
+ String resourcePath = "/" + index;
+
+ if (doesResourceNotExist(resourcePath)) {
+
+ try {
+ ObjectNode setting = objectMapper.createObjectNode();
+ ObjectNode indexSetting = objectMapper.createObjectNode();
+
+ indexSetting.put("number_of_shards", properties.getIndexShardCount());
+ indexSetting.put("number_of_replicas", properties.getIndexReplicasCount());
+
+ setting.set("settings", indexSetting);
+
+ Request request = new Request(HttpMethod.PUT, resourcePath);
+ request.setEntity(
+ new NStringEntity(setting.toString(), ContentType.APPLICATION_JSON));
+ elasticSearchAdminClient.performRequest(request);
+ logger.info("Added '{}' index", index);
+ } catch (ResponseException e) {
+
+ boolean errorCreatingIndex = true;
+
+ Response errorResponse = e.getResponse();
+ if (errorResponse.getStatusLine().getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
+ JsonNode root =
+ objectMapper.readTree(EntityUtils.toString(errorResponse.getEntity()));
+ String errorCode = root.get("error").get("type").asText();
+ if ("index_already_exists_exception".equals(errorCode)) {
+ errorCreatingIndex = false;
+ }
+ }
+
+ if (errorCreatingIndex) {
+ throw e;
+ }
+ }
+ } else {
+ logger.info("Index '{}' already exists", index);
+ }
+ }
+
+ /**
+ * Adds a mapping type to an index if it does not exist.
+ *
+ * @param index The name of the index.
+ * @param mappingType The name of the mapping type.
+ * @param mappingFilename The name of the mapping file to use to add the mapping if it does not
+ * exist.
+ * @throws IOException If an error occurred during requests to ES.
+ */
+ private void addMappingToIndex(
+ final String index, final String mappingType, final String mappingFilename)
+ throws IOException {
+
+ logger.info("Adding '{}' mapping to index '{}'...", mappingType, index);
+
+ String resourcePath = "/" + index + "/_mapping";
+
+ if (doesResourceNotExist(resourcePath)) {
+ HttpEntity entity =
+ new NByteArrayEntity(
+ loadTypeMappingSource(mappingFilename).getBytes(),
+ ContentType.APPLICATION_JSON);
+ Request request = new Request(HttpMethod.PUT, resourcePath);
+ request.setEntity(entity);
+ elasticSearchAdminClient.performRequest(request);
+ logger.info("Added '{}' mapping", mappingType);
+ } else {
+ logger.info("Mapping '{}' already exists", mappingType);
+ }
+ }
+
+ /**
+ * Determines whether a resource exists in ES. This will call a GET method to a particular path
+ * and return true if status 200; false otherwise.
+ *
+ * @param resourcePath The path of the resource to get.
+ * @return True if it exists; false otherwise.
+ * @throws IOException If an error occurred during requests to ES.
+ */
+ public boolean doesResourceExist(final String resourcePath) throws IOException {
+ Request request = new Request(HttpMethod.HEAD, resourcePath);
+ Response response = elasticSearchAdminClient.performRequest(request);
+ return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
+ }
+
+ /**
+ * The inverse of doesResourceExist.
+ *
+ * @param resourcePath The path of the resource to check.
+ * @return True if it does not exist; false otherwise.
+ * @throws IOException If an error occurred during requests to ES.
+ */
+ public boolean doesResourceNotExist(final String resourcePath) throws IOException {
+ return !doesResourceExist(resourcePath);
+ }
+
+ @Override
+ public void indexWorkflow(WorkflowSummary workflow) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ String workflowId = workflow.getWorkflowId();
+ byte[] docBytes = objectMapper.writeValueAsBytes(workflow);
+
+ IndexRequest request =
+ new IndexRequest(workflowIndexName)
+ .id(workflowId)
+ .source(docBytes, XContentType.JSON);
+ elasticSearchClient.index(request, RequestOptions.DEFAULT);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for indexing workflow: {}", endTime - startTime, workflowId);
+ Monitors.recordESIndexTime("index_workflow", WORKFLOW_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (Exception e) {
+ Monitors.error(className, "indexWorkflow");
+ logger.error("Failed to index workflow: {}", workflow.getWorkflowId(), e);
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncIndexWorkflow(WorkflowSummary workflow) {
+ return CompletableFuture.runAsync(() -> indexWorkflow(workflow), executorService);
+ }
+
+ @Override
+ public void indexTask(TaskSummary task) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ String taskId = task.getTaskId();
+
+ indexObject(taskIndexName, TASK_DOC_TYPE, taskId, task);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for indexing task:{} in workflow: {}",
+ endTime - startTime,
+ taskId,
+ task.getWorkflowId());
+ Monitors.recordESIndexTime("index_task", TASK_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (Exception e) {
+ logger.error("Failed to index task: {}", task.getTaskId(), e);
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncIndexTask(TaskSummary task) {
+ return CompletableFuture.runAsync(() -> indexTask(task), executorService);
+ }
+
+ @Override
+ public void addTaskExecutionLogs(List taskExecLogs) {
+ if (taskExecLogs.isEmpty()) {
+ return;
+ }
+
+ long startTime = Instant.now().toEpochMilli();
+ BulkRequest bulkRequest = new BulkRequest();
+ for (TaskExecLog log : taskExecLogs) {
+
+ byte[] docBytes;
+ try {
+ docBytes = objectMapper.writeValueAsBytes(log);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to convert task log to JSON for task {}", log.getTaskId());
+ continue;
+ }
+
+ IndexRequest request = new IndexRequest(logIndexName);
+ request.source(docBytes, XContentType.JSON);
+ bulkRequest.add(request);
+ }
+
+ try {
+ elasticSearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug("Time taken {} for indexing taskExecutionLogs", endTime - startTime);
+ Monitors.recordESIndexTime(
+ "index_task_execution_logs", LOG_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "logQueue", ((ThreadPoolExecutor) logExecutorService).getQueue().size());
+ } catch (Exception e) {
+ List taskIds =
+ taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList());
+ logger.error("Failed to index task execution logs for tasks: {}", taskIds, e);
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncAddTaskExecutionLogs(List logs) {
+ return CompletableFuture.runAsync(() -> addTaskExecutionLogs(logs), logExecutorService);
+ }
+
+ @Override
+ public List getTaskExecutionLogs(String taskId) {
+ try {
+ BoolQueryBuilder query = boolQueryBuilder("taskId='" + taskId + "'", "*");
+
+ // Create the searchObjectIdsViaExpression source
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(query);
+ searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.ASC));
+ searchSourceBuilder.size(properties.getTaskLogResultLimit());
+
+ // Generate the actual request to send to ES.
+ SearchRequest searchRequest = new SearchRequest(logIndexPrefix + "*");
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response =
+ elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
+
+ return mapTaskExecLogsResponse(response);
+ } catch (Exception e) {
+ logger.error("Failed to get task execution logs for task: {}", taskId, e);
+ }
+ return null;
+ }
+
+ private List mapTaskExecLogsResponse(SearchResponse response) throws IOException {
+ SearchHit[] hits = response.getHits().getHits();
+ List logs = new ArrayList<>(hits.length);
+ for (SearchHit hit : hits) {
+ String source = hit.getSourceAsString();
+ TaskExecLog tel = objectMapper.readValue(source, TaskExecLog.class);
+ logs.add(tel);
+ }
+ return logs;
+ }
+
+ @Override
+ public List getMessages(String queue) {
+ try {
+ BoolQueryBuilder query = boolQueryBuilder("queue='" + queue + "'", "*");
+
+ // Create the searchObjectIdsViaExpression source
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(query);
+ searchSourceBuilder.sort(new FieldSortBuilder("created").order(SortOrder.ASC));
+
+ // Generate the actual request to send to ES.
+ SearchRequest searchRequest = new SearchRequest(messageIndexPrefix + "*");
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response =
+ elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
+ return mapGetMessagesResponse(response);
+ } catch (Exception e) {
+ logger.error("Failed to get messages for queue: {}", queue, e);
+ }
+ return null;
+ }
+
+ private List mapGetMessagesResponse(SearchResponse response) throws IOException {
+ SearchHit[] hits = response.getHits().getHits();
+ TypeFactory factory = TypeFactory.defaultInstance();
+ MapType type = factory.constructMapType(HashMap.class, String.class, String.class);
+ List messages = new ArrayList<>(hits.length);
+ for (SearchHit hit : hits) {
+ String source = hit.getSourceAsString();
+ Map mapSource = objectMapper.readValue(source, type);
+ Message msg = new Message(mapSource.get("messageId"), mapSource.get("payload"), null);
+ messages.add(msg);
+ }
+ return messages;
+ }
+
+ @Override
+ public List getEventExecutions(String event) {
+ try {
+ BoolQueryBuilder query = boolQueryBuilder("event='" + event + "'", "*");
+
+ // Create the searchObjectIdsViaExpression source
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(query);
+ searchSourceBuilder.sort(new FieldSortBuilder("created").order(SortOrder.ASC));
+
+ // Generate the actual request to send to ES.
+ SearchRequest searchRequest = new SearchRequest(eventIndexPrefix + "*");
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response =
+ elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
+
+ return mapEventExecutionsResponse(response);
+ } catch (Exception e) {
+ logger.error("Failed to get executions for event: {}", event, e);
+ }
+ return null;
+ }
+
+ private List mapEventExecutionsResponse(SearchResponse response)
+ throws IOException {
+ SearchHit[] hits = response.getHits().getHits();
+ List executions = new ArrayList<>(hits.length);
+ for (SearchHit hit : hits) {
+ String source = hit.getSourceAsString();
+ EventExecution tel = objectMapper.readValue(source, EventExecution.class);
+ executions.add(tel);
+ }
+ return executions;
+ }
+
+ @Override
+ public void addMessage(String queue, Message message) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ Map doc = new HashMap<>();
+ doc.put("messageId", message.getId());
+ doc.put("payload", message.getPayload());
+ doc.put("queue", queue);
+ doc.put("created", System.currentTimeMillis());
+
+ indexObject(messageIndexName, MSG_DOC_TYPE, doc);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for indexing message: {}",
+ endTime - startTime,
+ message.getId());
+ Monitors.recordESIndexTime("add_message", MSG_DOC_TYPE, endTime - startTime);
+ } catch (Exception e) {
+ logger.error("Failed to index message: {}", message.getId(), e);
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncAddMessage(String queue, Message message) {
+ return CompletableFuture.runAsync(() -> addMessage(queue, message), executorService);
+ }
+
+ @Override
+ public void addEventExecution(EventExecution eventExecution) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ String id =
+ eventExecution.getName()
+ + "."
+ + eventExecution.getEvent()
+ + "."
+ + eventExecution.getMessageId()
+ + "."
+ + eventExecution.getId();
+
+ indexObject(eventIndexName, EVENT_DOC_TYPE, id, eventExecution);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for indexing event execution: {}",
+ endTime - startTime,
+ eventExecution.getId());
+ Monitors.recordESIndexTime("add_event_execution", EVENT_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "logQueue", ((ThreadPoolExecutor) logExecutorService).getQueue().size());
+ } catch (Exception e) {
+ logger.error("Failed to index event execution: {}", eventExecution.getId(), e);
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) {
+ return CompletableFuture.runAsync(
+ () -> addEventExecution(eventExecution), logExecutorService);
+ }
+
+ @Override
+ public SearchResult searchWorkflows(
+ String query, String freeText, int start, int count, List sort) {
+ try {
+ return searchObjectIdsViaExpression(
+ query, start, count, sort, freeText, WORKFLOW_DOC_TYPE);
+ } catch (Exception e) {
+ throw new NonTransientException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public SearchResult searchWorkflowSummary(
+ String query, String freeText, int start, int count, List sort) {
+ try {
+ return searchObjectsViaExpression(
+ query,
+ start,
+ count,
+ sort,
+ freeText,
+ WORKFLOW_DOC_TYPE,
+ false,
+ WorkflowSummary.class);
+ } catch (Exception e) {
+ throw new TransientException(e.getMessage(), e);
+ }
+ }
+
+ private SearchResult searchObjectsViaExpression(
+ String structuredQuery,
+ int start,
+ int size,
+ List sortOptions,
+ String freeTextQuery,
+ String docType,
+ boolean idOnly,
+ Class clazz)
+ throws ParserException, IOException {
+ QueryBuilder queryBuilder = boolQueryBuilder(structuredQuery, freeTextQuery);
+ return searchObjects(
+ getIndexName(docType), queryBuilder, start, size, sortOptions, idOnly, clazz);
+ }
+
+ @Override
+ public SearchResult searchTasks(
+ String query, String freeText, int start, int count, List sort) {
+ try {
+ return searchObjectIdsViaExpression(query, start, count, sort, freeText, TASK_DOC_TYPE);
+ } catch (Exception e) {
+ throw new NonTransientException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public SearchResult searchTaskSummary(
+ String query, String freeText, int start, int count, List sort) {
+ try {
+ return searchObjectsViaExpression(
+ query, start, count, sort, freeText, TASK_DOC_TYPE, false, TaskSummary.class);
+ } catch (Exception e) {
+ throw new TransientException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void removeWorkflow(String workflowId) {
+ long startTime = Instant.now().toEpochMilli();
+ DeleteRequest request = new DeleteRequest(workflowIndexName, workflowId);
+
+ try {
+ DeleteResponse response = elasticSearchClient.delete(request, RequestOptions.DEFAULT);
+
+ if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
+ logger.error("Index removal failed - document not found by id: {}", workflowId);
+ }
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for removing workflow: {}", endTime - startTime, workflowId);
+ Monitors.recordESIndexTime("remove_workflow", WORKFLOW_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (IOException e) {
+ logger.error("Failed to remove workflow {} from index", workflowId, e);
+ Monitors.error(className, "remove");
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncRemoveWorkflow(String workflowId) {
+ return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService);
+ }
+
+ @Override
+ public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {
+ try {
+ if (keys.length != values.length) {
+ throw new NonTransientException("Number of keys and values do not match");
+ }
+
+ long startTime = Instant.now().toEpochMilli();
+ UpdateRequest request = new UpdateRequest(workflowIndexName, workflowInstanceId);
+ Map source =
+ IntStream.range(0, keys.length)
+ .boxed()
+ .collect(Collectors.toMap(i -> keys[i], i -> values[i]));
+ request.doc(source);
+
+ logger.debug("Updating workflow {} with {}", workflowInstanceId, source);
+ elasticSearchClient.update(request, RequestOptions.DEFAULT);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for updating workflow: {}",
+ endTime - startTime,
+ workflowInstanceId);
+ Monitors.recordESIndexTime("update_workflow", WORKFLOW_DOC_TYPE, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (Exception e) {
+ logger.error("Failed to update workflow {}", workflowInstanceId, e);
+ Monitors.error(className, "update");
+ }
+ }
+
+ @Override
+ public void removeTask(String workflowId, String taskId) {
+ long startTime = Instant.now().toEpochMilli();
+
+ SearchResult taskSearchResult =
+ searchTasks(
+ String.format("(taskId='%s') AND (workflowId='%s')", taskId, workflowId),
+ "*",
+ 0,
+ 1,
+ null);
+
+ if (taskSearchResult.getTotalHits() == 0) {
+ logger.error("Task: {} does not belong to workflow: {}", taskId, workflowId);
+ Monitors.error(className, "removeTask");
+ return;
+ }
+
+ DeleteRequest request = new DeleteRequest(taskIndexName, taskId);
+
+ try {
+ DeleteResponse response = elasticSearchClient.delete(request, RequestOptions.DEFAULT);
+
+ if (response.getResult() != DocWriteResponse.Result.DELETED) {
+ logger.error("Index removal failed - task not found by id: {}", workflowId);
+ Monitors.error(className, "removeTask");
+ return;
+ }
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for removing task:{} of workflow: {}",
+ endTime - startTime,
+ taskId,
+ workflowId);
+ Monitors.recordESIndexTime("remove_task", "", endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (IOException e) {
+ logger.error(
+ "Failed to remove task {} of workflow: {} from index", taskId, workflowId, e);
+ Monitors.error(className, "removeTask");
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncRemoveTask(String workflowId, String taskId) {
+ return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
+ }
+
+ @Override
+ public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {
+ try {
+ if (keys.length != values.length) {
+ throw new IllegalArgumentException("Number of keys and values do not match");
+ }
+
+ long startTime = Instant.now().toEpochMilli();
+ UpdateRequest request = new UpdateRequest(taskIndexName, taskId);
+ Map source =
+ IntStream.range(0, keys.length)
+ .boxed()
+ .collect(Collectors.toMap(i -> keys[i], i -> values[i]));
+ request.doc(source);
+
+ logger.debug("Updating task: {} of workflow: {} with {}", taskId, workflowId, source);
+ elasticSearchClient.update(request, RequestOptions.DEFAULT);
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for updating task: {} of workflow: {}",
+ endTime - startTime,
+ taskId,
+ workflowId);
+ Monitors.recordESIndexTime("update_task", "", endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ } catch (Exception e) {
+ logger.error("Failed to update task: {} of workflow: {}", taskId, workflowId, e);
+ Monitors.error(className, "update");
+ }
+ }
+
+ @Override
+ public CompletableFuture asyncUpdateTask(
+ String workflowId, String taskId, String[] keys, Object[] values) {
+ return CompletableFuture.runAsync(
+ () -> updateTask(workflowId, taskId, keys, values), executorService);
+ }
+
+ @Override
+ public CompletableFuture asyncUpdateWorkflow(
+ String workflowInstanceId, String[] keys, Object[] values) {
+ return CompletableFuture.runAsync(
+ () -> updateWorkflow(workflowInstanceId, keys, values), executorService);
+ }
+
+ @Override
+ public String get(String workflowInstanceId, String fieldToGet) {
+ GetRequest request = new GetRequest(workflowIndexName, workflowInstanceId);
+ GetResponse response;
+ try {
+ response = elasticSearchClient.get(request, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ logger.error(
+ "Unable to get Workflow: {} from ElasticSearch index: {}",
+ workflowInstanceId,
+ workflowIndexName,
+ e);
+ return null;
+ }
+
+ if (response.isExists()) {
+ Map sourceAsMap = response.getSourceAsMap();
+ if (sourceAsMap.get(fieldToGet) != null) {
+ return sourceAsMap.get(fieldToGet).toString();
+ }
+ }
+
+ logger.debug(
+ "Unable to find Workflow: {} in ElasticSearch index: {}.",
+ workflowInstanceId,
+ workflowIndexName);
+ return null;
+ }
+
+ private SearchResult searchObjectIdsViaExpression(
+ String structuredQuery,
+ int start,
+ int size,
+ List sortOptions,
+ String freeTextQuery,
+ String docType)
+ throws ParserException, IOException {
+ QueryBuilder queryBuilder = boolQueryBuilder(structuredQuery, freeTextQuery);
+ return searchObjectIds(getIndexName(docType), queryBuilder, start, size, sortOptions);
+ }
+
+ private SearchResult searchObjectIdsViaExpression(
+ String structuredQuery,
+ int start,
+ int size,
+ List sortOptions,
+ String freeTextQuery,
+ String docType,
+ Class clazz)
+ throws ParserException, IOException {
+ QueryBuilder queryBuilder = boolQueryBuilder(structuredQuery, freeTextQuery);
+ return searchObjects(
+ getIndexName(docType), queryBuilder, start, size, sortOptions, false, clazz);
+ }
+
+ private SearchResult searchObjectIds(
+ String indexName, QueryBuilder queryBuilder, int start, int size) throws IOException {
+ return searchObjectIds(indexName, queryBuilder, start, size, null);
+ }
+
+ /**
+ * Tries to find object ids for a given query in an index.
+ *
+ * @param indexName The name of the index.
+ * @param queryBuilder The query to use for searching.
+ * @param start The start to use.
+ * @param size The total return size.
+ * @param sortOptions A list of string options to sort in the form VALUE:ORDER; where ORDER is
+ * optional and can be either ASC OR DESC.
+ * @return The SearchResults which includes the count and IDs that were found.
+ * @throws IOException If we cannot communicate with ES.
+ */
+ private SearchResult searchObjectIds(
+ String indexName,
+ QueryBuilder queryBuilder,
+ int start,
+ int size,
+ List sortOptions)
+ throws IOException {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(queryBuilder);
+ searchSourceBuilder.from(start);
+ searchSourceBuilder.size(size);
+
+ if (sortOptions != null && !sortOptions.isEmpty()) {
+
+ for (String sortOption : sortOptions) {
+ SortOrder order = SortOrder.ASC;
+ String field = sortOption;
+ int index = sortOption.indexOf(":");
+ if (index > 0) {
+ field = sortOption.substring(0, index);
+ order = SortOrder.valueOf(sortOption.substring(index + 1));
+ }
+ searchSourceBuilder.sort(new FieldSortBuilder(field).order(order));
+ }
+ }
+
+ // Generate the actual request to send to ES.
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response = elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
+
+ List result = new LinkedList<>();
+ response.getHits().forEach(hit -> result.add(hit.getId()));
+ long count = response.getHits().getTotalHits().value;
+ return new SearchResult<>(count, result);
+ }
+
+ private SearchResult searchObjects(
+ String indexName,
+ QueryBuilder queryBuilder,
+ int start,
+ int size,
+ List sortOptions,
+ boolean idOnly,
+ Class clazz)
+ throws IOException {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(queryBuilder);
+ searchSourceBuilder.from(start);
+ searchSourceBuilder.size(size);
+ if (idOnly) {
+ searchSourceBuilder.fetchSource(false);
+ }
+
+ if (sortOptions != null && !sortOptions.isEmpty()) {
+
+ for (String sortOption : sortOptions) {
+ SortOrder order = SortOrder.ASC;
+ String field = sortOption;
+ int index = sortOption.indexOf(":");
+ if (index > 0) {
+ field = sortOption.substring(0, index);
+ order = SortOrder.valueOf(sortOption.substring(index + 1));
+ }
+ searchSourceBuilder.sort(new FieldSortBuilder(field).order(order));
+ }
+ }
+
+ // Generate the actual request to send to ES.
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse response = elasticSearchClient.search(searchRequest, RequestOptions.DEFAULT);
+ return mapSearchResult(response, idOnly, clazz);
+ }
+
+ private SearchResult mapSearchResult(
+ SearchResponse response, boolean idOnly, Class clazz) {
+ SearchHits searchHits = response.getHits();
+ long count = searchHits.getTotalHits().value;
+ List result;
+ if (idOnly) {
+ result =
+ Arrays.stream(searchHits.getHits())
+ .map(hit -> clazz.cast(hit.getId()))
+ .collect(Collectors.toList());
+ } else {
+ result =
+ Arrays.stream(searchHits.getHits())
+ .map(
+ hit -> {
+ try {
+ return objectMapper.readValue(
+ hit.getSourceAsString(), clazz);
+ } catch (JsonProcessingException e) {
+ logger.error(
+ "Failed to de-serialize elasticsearch from source: {}",
+ hit.getSourceAsString(),
+ e);
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
+ return new SearchResult<>(count, result);
+ }
+
+ @Override
+ public List searchArchivableWorkflows(String indexName, long archiveTtlDays) {
+ QueryBuilder q =
+ QueryBuilders.boolQuery()
+ .must(
+ QueryBuilders.rangeQuery("endTime")
+ .lt(LocalDate.now().minusDays(archiveTtlDays).toString())
+ .gte(
+ LocalDate.now()
+ .minusDays(archiveTtlDays)
+ .minusDays(1)
+ .toString()))
+ .should(QueryBuilders.termQuery("status", "COMPLETED"))
+ .should(QueryBuilders.termQuery("status", "FAILED"))
+ .should(QueryBuilders.termQuery("status", "TIMED_OUT"))
+ .should(QueryBuilders.termQuery("status", "TERMINATED"))
+ .mustNot(QueryBuilders.existsQuery("archived"))
+ .minimumShouldMatch(1);
+
+ SearchResult workflowIds;
+ try {
+ workflowIds = searchObjectIds(indexName, q, 0, 1000);
+ } catch (IOException e) {
+ logger.error("Unable to communicate with ES to find archivable workflows", e);
+ return Collections.emptyList();
+ }
+
+ return workflowIds.getResults();
+ }
+
+ @Override
+ public long getWorkflowCount(String query, String freeText) {
+ try {
+ return getObjectCounts(query, freeText, WORKFLOW_DOC_TYPE);
+ } catch (Exception e) {
+ throw new NonTransientException(e.getMessage(), e);
+ }
+ }
+
+ private long getObjectCounts(String structuredQuery, String freeTextQuery, String docType)
+ throws ParserException, IOException {
+ QueryBuilder queryBuilder = boolQueryBuilder(structuredQuery, freeTextQuery);
+
+ String indexName = getIndexName(docType);
+ CountRequest countRequest = new CountRequest(new String[] {indexName}, queryBuilder);
+ CountResponse countResponse =
+ elasticSearchClient.count(countRequest, RequestOptions.DEFAULT);
+ return countResponse.getCount();
+ }
+
+ public List searchRecentRunningWorkflows(
+ int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) {
+ DateTime dateTime = new DateTime();
+ QueryBuilder q =
+ QueryBuilders.boolQuery()
+ .must(
+ QueryBuilders.rangeQuery("updateTime")
+ .gt(dateTime.minusHours(lastModifiedHoursAgoFrom)))
+ .must(
+ QueryBuilders.rangeQuery("updateTime")
+ .lt(dateTime.minusHours(lastModifiedHoursAgoTo)))
+ .must(QueryBuilders.termQuery("status", "RUNNING"));
+
+ SearchResult workflowIds;
+ try {
+ workflowIds =
+ searchObjectIds(
+ workflowIndexName,
+ q,
+ 0,
+ 5000,
+ Collections.singletonList("updateTime:ASC"));
+ } catch (IOException e) {
+ logger.error("Unable to communicate with ES to find recent running workflows", e);
+ return Collections.emptyList();
+ }
+
+ return workflowIds.getResults();
+ }
+
+ private void indexObject(final String index, final String docType, final Object doc) {
+ indexObject(index, docType, null, doc);
+ }
+
+ private void indexObject(
+ final String index, final String docType, final String docId, final Object doc) {
+
+ byte[] docBytes;
+ try {
+ docBytes = objectMapper.writeValueAsBytes(doc);
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to convert {} '{}' to byte string", docType, docId);
+ return;
+ }
+ IndexRequest request = new IndexRequest(index);
+ request.id(docId).source(docBytes, XContentType.JSON);
+
+ if (bulkRequests.get(docType) == null) {
+ bulkRequests.put(
+ docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
+ }
+
+ bulkRequests.get(docType).getBulkRequest().add(request);
+ if (bulkRequests.get(docType).getBulkRequest().numberOfActions() >= this.indexBatchSize) {
+ indexBulkRequest(docType);
+ }
+ }
+
+ private synchronized void indexBulkRequest(String docType) {
+ if (bulkRequests.get(docType).getBulkRequest() != null
+ && bulkRequests.get(docType).getBulkRequest().numberOfActions() > 0) {
+ synchronized (bulkRequests.get(docType).getBulkRequest()) {
+ indexWithRetry(
+ bulkRequests.get(docType).getBulkRequest().get(),
+ "Bulk Indexing " + docType,
+ docType);
+ bulkRequests.put(
+ docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
+ }
+ }
+ }
+
+ /**
+ * Performs an index operation with a retry.
+ *
+ * @param request The index request that we want to perform.
+ * @param operationDescription The type of operation that we are performing.
+ */
+ private void indexWithRetry(
+ final BulkRequest request, final String operationDescription, String docType) {
+ try {
+ long startTime = Instant.now().toEpochMilli();
+ retryTemplate.execute(
+ context -> elasticSearchClient.bulk(request, RequestOptions.DEFAULT));
+ long endTime = Instant.now().toEpochMilli();
+ logger.debug(
+ "Time taken {} for indexing object of type: {}", endTime - startTime, docType);
+ Monitors.recordESIndexTime("index_object", docType, endTime - startTime);
+ Monitors.recordWorkerQueueSize(
+ "indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
+ Monitors.recordWorkerQueueSize(
+ "logQueue", ((ThreadPoolExecutor) logExecutorService).getQueue().size());
+ } catch (Exception e) {
+ Monitors.error(className, "index");
+ logger.error("Failed to index {} for request type: {}", request, docType, e);
+ }
+ }
+
+ /**
+ * Flush the buffers if bulk requests have not been indexed for the past {@link
+ * ElasticSearchProperties#getAsyncBufferFlushTimeout()} seconds This is to prevent data loss in
+ * case the instance is terminated, while the buffer still holds documents to be indexed.
+ */
+ private void flushBulkRequests() {
+ bulkRequests.entrySet().stream()
+ .filter(
+ entry ->
+ (System.currentTimeMillis() - entry.getValue().getLastFlushTime())
+ >= asyncBufferFlushTimeout * 1000L)
+ .filter(
+ entry ->
+ entry.getValue().getBulkRequest() != null
+ && entry.getValue().getBulkRequest().numberOfActions() > 0)
+ .forEach(
+ entry -> {
+ logger.debug(
+ "Flushing bulk request buffer for type {}, size: {}",
+ entry.getKey(),
+ entry.getValue().getBulkRequest().numberOfActions());
+ indexBulkRequest(entry.getKey());
+ });
+ }
+
+ private static class BulkRequests {
+
+ private final long lastFlushTime;
+ private final BulkRequestWrapper bulkRequest;
+
+ long getLastFlushTime() {
+ return lastFlushTime;
+ }
+
+ BulkRequestWrapper getBulkRequest() {
+ return bulkRequest;
+ }
+
+ BulkRequests(long lastFlushTime, BulkRequest bulkRequest) {
+ this.lastFlushTime = lastFlushTime;
+ this.bulkRequest = new BulkRequestWrapper(bulkRequest);
+ }
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/Expression.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/Expression.java
new file mode 100644
index 000000000..7cb3d7f3b
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/Expression.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+
+import com.netflix.conductor.es7.dao.query.parser.internal.AbstractNode;
+import com.netflix.conductor.es7.dao.query.parser.internal.BooleanOp;
+import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
+
+/**
+ * @author Viren
+ */
+public class Expression extends AbstractNode implements FilterProvider {
+
+ private NameValue nameVal;
+
+ private GroupedExpression ge;
+
+ private BooleanOp op;
+
+ private Expression rhs;
+
+ public Expression(InputStream is) throws ParserException {
+ super(is);
+ }
+
+ @Override
+ protected void _parse() throws Exception {
+ byte[] peeked = peek(1);
+
+ if (peeked[0] == '(') {
+ this.ge = new GroupedExpression(is);
+ } else {
+ this.nameVal = new NameValue(is);
+ }
+
+ peeked = peek(3);
+ if (isBoolOpr(peeked)) {
+ // we have an expression next
+ this.op = new BooleanOp(is);
+ this.rhs = new Expression(is);
+ }
+ }
+
+ public boolean isBinaryExpr() {
+ return this.op != null;
+ }
+
+ public BooleanOp getOperator() {
+ return this.op;
+ }
+
+ public Expression getRightHandSide() {
+ return this.rhs;
+ }
+
+ public boolean isNameValue() {
+ return this.nameVal != null;
+ }
+
+ public NameValue getNameValue() {
+ return this.nameVal;
+ }
+
+ public GroupedExpression getGroupedExpression() {
+ return this.ge;
+ }
+
+ @Override
+ public QueryBuilder getFilterBuilder() {
+ QueryBuilder lhs = null;
+ if (nameVal != null) {
+ lhs = nameVal.getFilterBuilder();
+ } else {
+ lhs = ge.getFilterBuilder();
+ }
+
+ if (this.isBinaryExpr()) {
+ QueryBuilder rhsFilter = rhs.getFilterBuilder();
+ if (this.op.isAnd()) {
+ return QueryBuilders.boolQuery().must(lhs).must(rhsFilter);
+ } else {
+ return QueryBuilders.boolQuery().should(lhs).should(rhsFilter);
+ }
+ } else {
+ return lhs;
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (isBinaryExpr()) {
+ return "" + (nameVal == null ? ge : nameVal) + op + rhs;
+ } else {
+ return "" + (nameVal == null ? ge : nameVal);
+ }
+ }
+
+ public static Expression fromString(String value) throws ParserException {
+ return new Expression(new BufferedInputStream(new ByteArrayInputStream(value.getBytes())));
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/FilterProvider.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/FilterProvider.java
new file mode 100644
index 000000000..f4de4d36b
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/FilterProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser;
+
+import org.elasticsearch.index.query.QueryBuilder;
+
+/**
+ * @author Viren
+ */
+public interface FilterProvider {
+
+ /**
+ * @return FilterBuilder for elasticsearch
+ */
+ public QueryBuilder getFilterBuilder();
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/GroupedExpression.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/GroupedExpression.java
new file mode 100644
index 000000000..67aa8965c
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/GroupedExpression.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser;
+
+import java.io.InputStream;
+
+import org.elasticsearch.index.query.QueryBuilder;
+
+import com.netflix.conductor.es7.dao.query.parser.internal.AbstractNode;
+import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
+
+/**
+ * @author Viren
+ */
+public class GroupedExpression extends AbstractNode implements FilterProvider {
+
+ private Expression expression;
+
+ public GroupedExpression(InputStream is) throws ParserException {
+ super(is);
+ }
+
+ @Override
+ protected void _parse() throws Exception {
+ byte[] peeked = read(1);
+ assertExpected(peeked, "(");
+
+ this.expression = new Expression(is);
+
+ peeked = read(1);
+ assertExpected(peeked, ")");
+ }
+
+ @Override
+ public String toString() {
+ return "(" + expression + ")";
+ }
+
+ /**
+ * @return the expression
+ */
+ public Expression getExpression() {
+ return expression;
+ }
+
+ @Override
+ public QueryBuilder getFilterBuilder() {
+ return expression.getFilterBuilder();
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/NameValue.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/NameValue.java
new file mode 100644
index 000000000..b81c9f94b
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/NameValue.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser;
+
+import java.io.InputStream;
+
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+
+import com.netflix.conductor.es7.dao.query.parser.internal.AbstractNode;
+import com.netflix.conductor.es7.dao.query.parser.internal.ComparisonOp;
+import com.netflix.conductor.es7.dao.query.parser.internal.ComparisonOp.Operators;
+import com.netflix.conductor.es7.dao.query.parser.internal.ConstValue;
+import com.netflix.conductor.es7.dao.query.parser.internal.ListConst;
+import com.netflix.conductor.es7.dao.query.parser.internal.Name;
+import com.netflix.conductor.es7.dao.query.parser.internal.ParserException;
+import com.netflix.conductor.es7.dao.query.parser.internal.Range;
+
+/**
+ * @author Viren
+ *
+ * Represents an expression of the form as below:
+ * key OPR value
+ * OPR is the comparison operator which could be on the following:
+ * >, <, = , !=, IN, BETWEEN
+ *
+ */
+public class NameValue extends AbstractNode implements FilterProvider {
+
+ private Name name;
+
+ private ComparisonOp op;
+
+ private ConstValue value;
+
+ private Range range;
+
+ private ListConst valueList;
+
+ public NameValue(InputStream is) throws ParserException {
+ super(is);
+ }
+
+ @Override
+ protected void _parse() throws Exception {
+ this.name = new Name(is);
+ this.op = new ComparisonOp(is);
+
+ if (this.op.getOperator().equals(Operators.BETWEEN.value())) {
+ this.range = new Range(is);
+ }
+ if (this.op.getOperator().equals(Operators.IN.value())) {
+ this.valueList = new ListConst(is);
+ } else {
+ this.value = new ConstValue(is);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "" + name + op + value;
+ }
+
+ /**
+ * @return the name
+ */
+ public Name getName() {
+ return name;
+ }
+
+ /**
+ * @return the op
+ */
+ public ComparisonOp getOp() {
+ return op;
+ }
+
+ /**
+ * @return the value
+ */
+ public ConstValue getValue() {
+ return value;
+ }
+
+ @Override
+ public QueryBuilder getFilterBuilder() {
+ if (op.getOperator().equals(Operators.EQUALS.value())) {
+ return QueryBuilders.queryStringQuery(
+ name.getName() + ":" + value.getValue().toString());
+ } else if (op.getOperator().equals(Operators.BETWEEN.value())) {
+ return QueryBuilders.rangeQuery(name.getName())
+ .from(range.getLow())
+ .to(range.getHigh());
+ } else if (op.getOperator().equals(Operators.IN.value())) {
+ return QueryBuilders.termsQuery(name.getName(), valueList.getList());
+ } else if (op.getOperator().equals(Operators.NOT_EQUALS.value())) {
+ return QueryBuilders.queryStringQuery(
+ "NOT " + name.getName() + ":" + value.getValue().toString());
+ } else if (op.getOperator().equals(Operators.GREATER_THAN.value())) {
+ return QueryBuilders.rangeQuery(name.getName())
+ .from(value.getValue())
+ .includeLower(false)
+ .includeUpper(false);
+ } else if (op.getOperator().equals(Operators.IS.value())) {
+ if (value.getSysConstant().equals(ConstValue.SystemConsts.NULL)) {
+ return QueryBuilders.boolQuery()
+ .mustNot(
+ QueryBuilders.boolQuery()
+ .must(QueryBuilders.matchAllQuery())
+ .mustNot(QueryBuilders.existsQuery(name.getName())));
+ } else if (value.getSysConstant().equals(ConstValue.SystemConsts.NOT_NULL)) {
+ return QueryBuilders.boolQuery()
+ .mustNot(
+ QueryBuilders.boolQuery()
+ .must(QueryBuilders.matchAllQuery())
+ .must(QueryBuilders.existsQuery(name.getName())));
+ }
+ } else if (op.getOperator().equals(Operators.LESS_THAN.value())) {
+ return QueryBuilders.rangeQuery(name.getName())
+ .to(value.getValue())
+ .includeLower(false)
+ .includeUpper(false);
+ } else if (op.getOperator().equals(Operators.STARTS_WITH.value())) {
+ return QueryBuilders.prefixQuery(name.getName(), value.getUnquotedValue());
+ }
+
+ throw new IllegalStateException("Incorrect/unsupported operators");
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/AbstractNode.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/AbstractNode.java
new file mode 100644
index 000000000..bfbd3988f
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/AbstractNode.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * @author Viren
+ */
+public abstract class AbstractNode {
+
+ public static final Pattern WHITESPACE = Pattern.compile("\\s");
+
+ protected static Set comparisonOprs = new HashSet();
+
+ static {
+ comparisonOprs.add('>');
+ comparisonOprs.add('<');
+ comparisonOprs.add('=');
+ }
+
+ protected InputStream is;
+
+ protected AbstractNode(InputStream is) throws ParserException {
+ this.is = is;
+ this.parse();
+ }
+
+ protected boolean isNumber(String test) {
+ try {
+ // If you can convert to a big decimal value, then it is a number.
+ new BigDecimal(test);
+ return true;
+
+ } catch (NumberFormatException e) {
+ // Ignore
+ }
+ return false;
+ }
+
+ protected boolean isBoolOpr(byte[] buffer) {
+ if (buffer.length > 1 && buffer[0] == 'O' && buffer[1] == 'R') {
+ return true;
+ } else if (buffer.length > 2 && buffer[0] == 'A' && buffer[1] == 'N' && buffer[2] == 'D') {
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean isComparisonOpr(byte[] buffer) {
+ if (buffer[0] == 'I' && buffer[1] == 'N') {
+ return true;
+ } else if (buffer[0] == '!' && buffer[1] == '=') {
+ return true;
+ } else {
+ return comparisonOprs.contains((char) buffer[0]);
+ }
+ }
+
+ protected byte[] peek(int length) throws Exception {
+ return read(length, true);
+ }
+
+ protected byte[] read(int length) throws Exception {
+ return read(length, false);
+ }
+
+ protected String readToken() throws Exception {
+ skipWhitespace();
+ StringBuilder sb = new StringBuilder();
+ while (is.available() > 0) {
+ char c = (char) peek(1)[0];
+ if (c == ' ' || c == '\t' || c == '\n' || c == '\r') {
+ is.skip(1);
+ break;
+ } else if (c == '=' || c == '>' || c == '<' || c == '!') {
+ // do not skip
+ break;
+ }
+ sb.append(c);
+ is.skip(1);
+ }
+ return sb.toString().trim();
+ }
+
+ protected boolean isNumeric(char c) {
+ if (c == '-' || c == 'e' || (c >= '0' && c <= '9') || c == '.') {
+ return true;
+ }
+ return false;
+ }
+
+ protected void assertExpected(byte[] found, String expected) throws ParserException {
+ assertExpected(new String(found), expected);
+ }
+
+ protected void assertExpected(String found, String expected) throws ParserException {
+ if (!found.equals(expected)) {
+ throw new ParserException("Expected " + expected + ", found " + found);
+ }
+ }
+
+ protected void assertExpected(char found, char expected) throws ParserException {
+ if (found != expected) {
+ throw new ParserException("Expected " + expected + ", found " + found);
+ }
+ }
+
+ protected static void efor(int length, FunctionThrowingException consumer)
+ throws Exception {
+ for (int i = 0; i < length; i++) {
+ consumer.accept(i);
+ }
+ }
+
+ protected abstract void _parse() throws Exception;
+
+ // Public stuff here
+ private void parse() throws ParserException {
+ // skip white spaces
+ skipWhitespace();
+ try {
+ _parse();
+ } catch (Exception e) {
+ System.out.println("\t" + this.getClass().getSimpleName() + "->" + this.toString());
+ if (!(e instanceof ParserException)) {
+ throw new ParserException("Error parsing", e);
+ } else {
+ throw (ParserException) e;
+ }
+ }
+ skipWhitespace();
+ }
+
+ // Private methods
+
+ private byte[] read(int length, boolean peekOnly) throws Exception {
+ byte[] buf = new byte[length];
+ if (peekOnly) {
+ is.mark(length);
+ }
+ efor(length, (Integer c) -> buf[c] = (byte) is.read());
+ if (peekOnly) {
+ is.reset();
+ }
+ return buf;
+ }
+
+ protected void skipWhitespace() throws ParserException {
+ try {
+ while (is.available() > 0) {
+ byte c = peek(1)[0];
+ if (c == ' ' || c == '\t' || c == '\n' || c == '\r') {
+ // skip
+ read(1);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new ParserException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/BooleanOp.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/BooleanOp.java
new file mode 100644
index 000000000..f37c5f200
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/BooleanOp.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+import java.io.InputStream;
+
+/**
+ * @author Viren
+ */
+public class BooleanOp extends AbstractNode {
+
+ private String value;
+
+ public BooleanOp(InputStream is) throws ParserException {
+ super(is);
+ }
+
+ @Override
+ protected void _parse() throws Exception {
+ byte[] buffer = peek(3);
+ if (buffer.length > 1 && buffer[0] == 'O' && buffer[1] == 'R') {
+ this.value = "OR";
+ } else if (buffer.length > 2 && buffer[0] == 'A' && buffer[1] == 'N' && buffer[2] == 'D') {
+ this.value = "AND";
+ } else {
+ throw new ParserException("No valid boolean operator found...");
+ }
+ read(this.value.length());
+ }
+
+ @Override
+ public String toString() {
+ return " " + value + " ";
+ }
+
+ public String getOperator() {
+ return value;
+ }
+
+ public boolean isAnd() {
+ return "AND".equals(value);
+ }
+
+ public boolean isOr() {
+ return "OR".equals(value);
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ComparisonOp.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ComparisonOp.java
new file mode 100644
index 000000000..55d8d49a8
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ComparisonOp.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+import java.io.InputStream;
+
+/**
+ * @author Viren
+ */
+public class ComparisonOp extends AbstractNode {
+
+ public enum Operators {
+ BETWEEN("BETWEEN"),
+ EQUALS("="),
+ LESS_THAN("<"),
+ GREATER_THAN(">"),
+ IN("IN"),
+ NOT_EQUALS("!="),
+ IS("IS"),
+ STARTS_WITH("STARTS_WITH");
+
+ private final String value;
+
+ Operators(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+ }
+
+ static {
+ int max = 0;
+ for (Operators op : Operators.values()) {
+ max = Math.max(max, op.value().length());
+ }
+ maxOperatorLength = max;
+ }
+
+ private static final int maxOperatorLength;
+
+ private static final int betweenLen = Operators.BETWEEN.value().length();
+ private static final int startsWithLen = Operators.STARTS_WITH.value().length();
+
+ private String value;
+
+ public ComparisonOp(InputStream is) throws ParserException {
+ super(is);
+ }
+
+ @Override
+ protected void _parse() throws Exception {
+ byte[] peeked = peek(maxOperatorLength);
+ if (peeked[0] == '=' || peeked[0] == '>' || peeked[0] == '<') {
+ this.value = new String(peeked, 0, 1);
+ } else if (peeked[0] == 'I' && peeked[1] == 'N') {
+ this.value = "IN";
+ } else if (peeked[0] == 'I' && peeked[1] == 'S') {
+ this.value = "IS";
+ } else if (peeked[0] == '!' && peeked[1] == '=') {
+ this.value = "!=";
+ } else if (peeked.length >= betweenLen
+ && peeked[0] == 'B'
+ && peeked[1] == 'E'
+ && peeked[2] == 'T'
+ && peeked[3] == 'W'
+ && peeked[4] == 'E'
+ && peeked[5] == 'E'
+ && peeked[6] == 'N') {
+ this.value = Operators.BETWEEN.value();
+ } else if (peeked.length == startsWithLen
+ && new String(peeked).equals(Operators.STARTS_WITH.value())) {
+ this.value = Operators.STARTS_WITH.value();
+ } else {
+ throw new ParserException(
+ "Expecting an operator (=, >, <, !=, BETWEEN, IN, STARTS_WITH), but found none. Peeked=>"
+ + new String(peeked));
+ }
+
+ read(this.value.length());
+ }
+
+ @Override
+ public String toString() {
+ return " " + value + " ";
+ }
+
+ public String getOperator() {
+ return value;
+ }
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ConstValue.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ConstValue.java
new file mode 100644
index 000000000..edc9513c1
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ConstValue.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+import java.io.InputStream;
+
+/**
+ * @author Viren Constant value can be:
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+/**
+ * @author Viren
+ */
+@FunctionalInterface
+public interface FunctionThrowingException {
+
+ void accept(T t) throws Exception;
+}
diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ListConst.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ListConst.java
new file mode 100644
index 000000000..87227a863
--- /dev/null
+++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/query/parser/internal/ListConst.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2023 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.es7.dao.query.parser.internal;
+
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author Viren List of constants
+ */
+public class ListConst extends AbstractNode {
+
+ private List