Skip to content

Commit

Permalink
Merge pull request #338 from ubisoft/feature/add-opensearch
Browse files Browse the repository at this point in the history
[FEATURE] Add OpenSearch
  • Loading branch information
v1r3n authored Dec 20, 2024
2 parents 17ef680 + 4d2d07e commit 20bf7a9
Show file tree
Hide file tree
Showing 58 changed files with 33,702 additions and 1,976 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ allprojects {

resolutionStrategy.eachDependency { details ->
if (details.requested.group.startsWith('com.fasterxml.jackson.') ) {
details.useVersion "2.15.2"
details.useVersion "2.18.0"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ public Any deserialize(JsonParser p, DeserializationContext ctxt) throws IOExcep
JsonNode value = root.get(JSON_VALUE);

if (type == null || !type.isTextual()) {
ctxt.reportMappingException(
ctxt.reportBadDefinition(
type.getClass(),
"invalid '@type' field when deserializing ProtoBuf Any object");
}

if (value == null || !value.isTextual()) {
ctxt.reportMappingException(
ctxt.reportBadDefinition(
type.getClass(),
"invalid '@value' field when deserializing ProtoBuf Any object");
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Component;

import com.netflix.conductor.core.events.queue.Message;

/** DAO responsible for managing queuing for the tasks. */
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public interface QueueDAO {

/**
Expand Down
5 changes: 3 additions & 2 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ Refer to the table below for various supported backend and required configuratio
> | cassandra | conductor.db.type=cassandra |
>
Conductor using Elasticsearch for indexing the workflow data.
Currently, Elasticsearch 6 and 7 are supported.
Conductor is using Elasticsearch or OpenSearch for indexing the workflow data.
Currently, Elasticsearch 6, 7 and OpenSearch 2.18 are supported.

We welcome community contributions for other indexing backends.

Expand All @@ -49,6 +49,7 @@ TODO: Link to the helm charts
| [docker-compose.yaml](docker-compose.yaml) | Redis + Elasticsearch 7 |
| [docker-compose-postgres.yaml](docker-compose-postgres.yaml) | Postgres + Elasticsearch 7 |
| [docker-compose-mysql.yaml](docker-compose-mysql.yaml) | Mysql + Elasticsearch 7 |
| [docker-compose-redis-os.yaml](docker-compose-redis-os.yaml) | Redis + OpenSearch 2.18 |

### Network errors during UI build with yarn

Expand Down
80 changes: 80 additions & 0 deletions docker/docker-compose-redis-os.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: '2.3'

services:
conductor-server:
environment:
- CONFIG_PROP=config-redis-os.properties
- JAVA_OPTS=-Dpolyglot.engine.WarnInterpreterOnly=false
image: conductor:server
container_name: conductor-server
build:
context: ../
dockerfile: docker/server/Dockerfile
args:
YARN_OPTS: ${YARN_OPTS}
networks:
- internal
ports:
- 8080:8080
- 8127:5000
healthcheck:
test: ["CMD", "curl","-I" ,"-XGET", "http://localhost:8080/health"]
interval: 60s
timeout: 30s
retries: 12
links:
- conductor-opensearch:os
- conductor-redis:rs
depends_on:
conductor-opensearch:
condition: service_healthy
conductor-redis:
condition: service_healthy
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"

conductor-redis:
image: redis:6.2.3-alpine
volumes:
- ../server/config/redis.conf:/usr/local/etc/redis/redis.conf
networks:
- internal
ports:
- 6379:6379
healthcheck:
test: [ "CMD", "redis-cli","ping" ]

conductor-opensearch:
image: opensearchproject/opensearch:2.18.0
environment:
- plugins.security.disabled=true
- cluster.name=opensearch-cluster # Name the cluster
- node.name=conductor-opensearch # Name the node that will run in this container
- discovery.seed_hosts=conductor-opensearch # Nodes to look for when discovering the cluster
- cluster.initial_cluster_manager_nodes=conductor-opensearch # Nodes eligible to serve as cluster manager
volumes:
- esdata-conductor:/usr/share/opensearch/data
networks:
- internal
ports:
- 9201:9200
healthcheck:
test: curl http://localhost:9200/_cluster/health -o /dev/null
interval: 5s
timeout: 5s
retries: 12
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"

volumes:
esdata-conductor:
driver: local

networks:
internal:
35 changes: 35 additions & 0 deletions docker/server/config/config-redis-os.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Database persistence type.
# Below are the properties for redis
conductor.db.type=redis_standalone
conductor.queue.type=redis_standalone

conductor.redis.hosts=rs:6379:us-east-1c
conductor.redis-lock.serverAddress=redis://rs:6379
conductor.redis.taskDefCacheRefreshInterval=1
conductor.redis.workflowNamespacePrefix=conductor
conductor.redis.queueNamespacePrefix=conductor_queues

conductor.workflow-execution-lock.type=redis
conductor.app.workflowExecutionLockEnabled=true
conductor.app.lockTimeToTry=500

conductor.app.systemTaskWorkerThreadCount=20
conductor.app.systemTaskMaxPollCount=20


# Elastic search instance indexing is enabled.
conductor.indexing.enabled=true
conductor.indexing.type=opensearch
conductor.elasticsearch.url=http://os:9200
conductor.elasticsearch.indexName=conductor
conductor.elasticsearch.version=0
conductor.elasticsearch.indexReplicasCount=0
conductor.elasticsearch.clusterHealthColor=green

# Additional modules for metrics collection exposed to Prometheus (optional)
conductor.metrics-prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus

# Load sample kitchen sink workflow
loadSample=true

52 changes: 52 additions & 0 deletions os-persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
plugins {
id 'com.github.johnrengelman.shadow' version '7.0.0'
id 'java'
}

configurations {
// Prevent shaded dependencies from being published, while keeping them available to tests
shadow.extendsFrom compileOnly
testRuntime.extendsFrom compileOnly
}

dependencies {

implementation project(':conductor-common')
implementation project(':conductor-core')
implementation project(':conductor-common-persistence')

compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.retry:spring-retry'

implementation "commons-io:commons-io:${revCommonsIo}"
implementation "org.apache.commons:commons-lang3"
implementation "com.google.guava:guava:${revGuava}"

implementation 'com.fasterxml.jackson.core:jackson-core:2.18.0'

implementation 'org.opensearch.client:opensearch-java:2.18.0'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation "org.opensearch.client:opensearch-rest-client:2.18.0"
implementation "org.opensearch.client:opensearch-rest-high-level-client:2.18.0"

testImplementation "net.java.dev.jna:jna:5.7.0"
testImplementation "org.awaitility:awaitility:${revAwaitility}"
testImplementation "org.opensearch:opensearch-testcontainers:2.1.2"
testImplementation "org.testcontainers:testcontainers:1.20.4"
testImplementation project(':conductor-test-util').sourceSets.test.output
testImplementation 'org.springframework.retry:spring-retry'

}

// Drop the classifier and delete jar task actions to replace the regular jar artifact with the shadow artifact
shadowJar {
configurations = [project.configurations.shadow]
archiveClassifier = null

// Service files are not included by default.
mergeServiceFiles {
include 'META-INF/services/*'
include 'META-INF/maven/*'
}
}
jar.dependsOn shadowJar
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.os.config;

import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;

public class OpenSearchConditions {

private OpenSearchConditions() {}

public static class OpenSearchEnabled extends AllNestedConditions {

OpenSearchEnabled() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}

@SuppressWarnings("unused")
@ConditionalOnProperty(
name = "conductor.indexing.enabled",
havingValue = "true",
matchIfMissing = true)
static class enabledIndexing {}

@SuppressWarnings("unused")
@ConditionalOnProperty(name = "conductor.indexing.type", havingValue = "opensearch")
static class enabledOS {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.os.config;

import java.net.URL;
import java.util.List;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.os.dao.index.OpenSearchRestDAO;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(OpenSearchProperties.class)
@Conditional(OpenSearchConditions.OpenSearchEnabled.class)
public class OpenSearchConfiguration {

private static final Logger log = LoggerFactory.getLogger(OpenSearchConfiguration.class);

@Bean
public RestClient restClient(RestClientBuilder restClientBuilder) {
return restClientBuilder.build();
}

@Bean
public RestClientBuilder osRestClientBuilder(OpenSearchProperties properties) {
RestClientBuilder builder = RestClient.builder(convertToHttpHosts(properties.toURLs()));

if (properties.getRestClientConnectionRequestTimeout() > 0) {
builder.setRequestConfigCallback(
requestConfigBuilder ->
requestConfigBuilder.setConnectionRequestTimeout(
properties.getRestClientConnectionRequestTimeout()));
}

if (properties.getUsername() != null && properties.getPassword() != null) {
log.info(
"Configure OpenSearch with BASIC authentication. User:{}",
properties.getUsername());
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
properties.getUsername(), properties.getPassword()));
builder.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
log.info("Configure OpenSearch with no authentication.");
}
return builder;
}

@Primary
@Bean
public IndexDAO osIndexDAO(
RestClientBuilder restClientBuilder,
@Qualifier("osRetryTemplate") RetryTemplate retryTemplate,
OpenSearchProperties properties,
ObjectMapper objectMapper) {
String url = properties.getUrl();
return new OpenSearchRestDAO(restClientBuilder, retryTemplate, properties, objectMapper);
}

@Bean
public RetryTemplate osRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
return retryTemplate;
}

private HttpHost[] convertToHttpHosts(List<URL> hosts) {
return hosts.stream()
.map(host -> new HttpHost(host.getHost(), host.getPort(), host.getProtocol()))
.toArray(HttpHost[]::new);
}
}
Loading

0 comments on commit 20bf7a9

Please sign in to comment.