Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Add OpenSearch #338

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ allprojects {

resolutionStrategy.eachDependency { details ->
if (details.requested.group.startsWith('com.fasterxml.jackson.') ) {
details.useVersion "2.15.2"
details.useVersion "2.18.0"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ public Any deserialize(JsonParser p, DeserializationContext ctxt) throws IOExcep
JsonNode value = root.get(JSON_VALUE);

if (type == null || !type.isTextual()) {
ctxt.reportMappingException(
ctxt.reportBadDefinition(
type.getClass(),
"invalid '@type' field when deserializing ProtoBuf Any object");
}

if (value == null || !value.isTextual()) {
ctxt.reportMappingException(
ctxt.reportBadDefinition(
type.getClass(),
"invalid '@value' field when deserializing ProtoBuf Any object");
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/QueueDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Component;

import com.netflix.conductor.core.events.queue.Message;

/** DAO responsible for managing queuing for the tasks. */
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
public interface QueueDAO {

/**
Expand Down
5 changes: 3 additions & 2 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ Refer to the table below for various supported backend and required configuratio
> | cassandra | conductor.db.type=cassandra |
>

Conductor using Elasticsearch for indexing the workflow data.
Currently, Elasticsearch 6 and 7 are supported.
Conductor is using Elasticsearch or OpenSearch for indexing the workflow data.
Currently, Elasticsearch 6, 7 and OpenSearch 2.18 are supported.

We welcome community contributions for other indexing backends.

Expand All @@ -49,6 +49,7 @@ TODO: Link to the helm charts
| [docker-compose.yaml](docker-compose.yaml) | Redis + Elasticsearch 7 |
| [docker-compose-postgres.yaml](docker-compose-postgres.yaml) | Postgres + Elasticsearch 7 |
| [docker-compose-mysql.yaml](docker-compose-mysql.yaml) | Mysql + Elasticsearch 7 |
| [docker-compose-redis-os.yaml](docker-compose-redis-os.yaml) | Redis + OpenSearch 2.18 |

### Network errors during UI build with yarn

Expand Down
80 changes: 80 additions & 0 deletions docker/docker-compose-redis-os.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: '2.3'

services:
conductor-server:
environment:
- CONFIG_PROP=config-redis-os.properties
- JAVA_OPTS=-Dpolyglot.engine.WarnInterpreterOnly=false
image: conductor:server
container_name: conductor-server
build:
context: ../
dockerfile: docker/server/Dockerfile
args:
YARN_OPTS: ${YARN_OPTS}
networks:
- internal
ports:
- 8080:8080
- 8127:5000
healthcheck:
test: ["CMD", "curl","-I" ,"-XGET", "http://localhost:8080/health"]
interval: 60s
timeout: 30s
retries: 12
links:
- conductor-opensearch:os
- conductor-redis:rs
depends_on:
conductor-opensearch:
condition: service_healthy
conductor-redis:
condition: service_healthy
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"

conductor-redis:
image: redis:6.2.3-alpine
volumes:
- ../server/config/redis.conf:/usr/local/etc/redis/redis.conf
networks:
- internal
ports:
- 6379:6379
healthcheck:
test: [ "CMD", "redis-cli","ping" ]

conductor-opensearch:
image: opensearchproject/opensearch:2.18.0
environment:
- plugins.security.disabled=true
- cluster.name=opensearch-cluster # Name the cluster
- node.name=conductor-opensearch # Name the node that will run in this container
- discovery.seed_hosts=conductor-opensearch # Nodes to look for when discovering the cluster
- cluster.initial_cluster_manager_nodes=conductor-opensearch # Nodes eligible to serve as cluster manager
volumes:
- esdata-conductor:/usr/share/opensearch/data
networks:
- internal
ports:
- 9201:9200
healthcheck:
test: curl http://localhost:9200/_cluster/health -o /dev/null
interval: 5s
timeout: 5s
retries: 12
logging:
driver: "json-file"
options:
max-size: "1k"
max-file: "3"

volumes:
esdata-conductor:
driver: local

networks:
internal:
35 changes: 35 additions & 0 deletions docker/server/config/config-redis-os.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Database persistence type.
# Below are the properties for redis
conductor.db.type=redis_standalone
conductor.queue.type=redis_standalone

conductor.redis.hosts=rs:6379:us-east-1c
conductor.redis-lock.serverAddress=redis://rs:6379
conductor.redis.taskDefCacheRefreshInterval=1
conductor.redis.workflowNamespacePrefix=conductor
conductor.redis.queueNamespacePrefix=conductor_queues

conductor.workflow-execution-lock.type=redis
conductor.app.workflowExecutionLockEnabled=true
conductor.app.lockTimeToTry=500

conductor.app.systemTaskWorkerThreadCount=20
conductor.app.systemTaskMaxPollCount=20


# Elastic search instance indexing is enabled.
conductor.indexing.enabled=true
conductor.indexing.type=opensearch
conductor.elasticsearch.url=http://os:9200
conductor.elasticsearch.indexName=conductor
conductor.elasticsearch.version=0
conductor.elasticsearch.indexReplicasCount=0
conductor.elasticsearch.clusterHealthColor=green

# Additional modules for metrics collection exposed to Prometheus (optional)
conductor.metrics-prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus

# Load sample kitchen sink workflow
loadSample=true

52 changes: 52 additions & 0 deletions os-persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
plugins {
id 'com.github.johnrengelman.shadow' version '7.0.0'
id 'java'
}

configurations {
// Prevent shaded dependencies from being published, while keeping them available to tests
shadow.extendsFrom compileOnly
testRuntime.extendsFrom compileOnly
}

dependencies {

implementation project(':conductor-common')
implementation project(':conductor-core')
implementation project(':conductor-common-persistence')

compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.retry:spring-retry'

implementation "commons-io:commons-io:${revCommonsIo}"
implementation "org.apache.commons:commons-lang3"
implementation "com.google.guava:guava:${revGuava}"

implementation 'com.fasterxml.jackson.core:jackson-core:2.18.0'

implementation 'org.opensearch.client:opensearch-java:2.18.0'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation "org.opensearch.client:opensearch-rest-client:2.18.0"
implementation "org.opensearch.client:opensearch-rest-high-level-client:2.18.0"

testImplementation "net.java.dev.jna:jna:5.7.0"
testImplementation "org.awaitility:awaitility:${revAwaitility}"
testImplementation "org.opensearch:opensearch-testcontainers:2.1.2"
testImplementation "org.testcontainers:testcontainers:1.20.4"
testImplementation project(':conductor-test-util').sourceSets.test.output
testImplementation 'org.springframework.retry:spring-retry'

}

// Drop the classifier and delete jar task actions to replace the regular jar artifact with the shadow artifact
shadowJar {
configurations = [project.configurations.shadow]
archiveClassifier = null

// Service files are not included by default.
mergeServiceFiles {
include 'META-INF/services/*'
include 'META-INF/maven/*'
}
}
jar.dependsOn shadowJar
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.os.config;

import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;

public class OpenSearchConditions {

private OpenSearchConditions() {}

public static class OpenSearchEnabled extends AllNestedConditions {

OpenSearchEnabled() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}

@SuppressWarnings("unused")
@ConditionalOnProperty(
name = "conductor.indexing.enabled",
havingValue = "true",
matchIfMissing = true)
static class enabledIndexing {}

@SuppressWarnings("unused")
@ConditionalOnProperty(name = "conductor.indexing.type", havingValue = "opensearch")
static class enabledOS {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.os.config;

import java.net.URL;
import java.util.List;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.os.dao.index.OpenSearchRestDAO;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(OpenSearchProperties.class)
@Conditional(OpenSearchConditions.OpenSearchEnabled.class)
public class OpenSearchConfiguration {

private static final Logger log = LoggerFactory.getLogger(OpenSearchConfiguration.class);

@Bean
public RestClient restClient(RestClientBuilder restClientBuilder) {
return restClientBuilder.build();
}

@Bean
public RestClientBuilder osRestClientBuilder(OpenSearchProperties properties) {
RestClientBuilder builder = RestClient.builder(convertToHttpHosts(properties.toURLs()));

if (properties.getRestClientConnectionRequestTimeout() > 0) {
builder.setRequestConfigCallback(
requestConfigBuilder ->
requestConfigBuilder.setConnectionRequestTimeout(
properties.getRestClientConnectionRequestTimeout()));
}

if (properties.getUsername() != null && properties.getPassword() != null) {
log.info(
"Configure OpenSearch with BASIC authentication. User:{}",
properties.getUsername());
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
properties.getUsername(), properties.getPassword()));
builder.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
log.info("Configure OpenSearch with no authentication.");
}
return builder;
}

@Primary
@Bean
public IndexDAO osIndexDAO(
RestClientBuilder restClientBuilder,
@Qualifier("osRetryTemplate") RetryTemplate retryTemplate,
OpenSearchProperties properties,
ObjectMapper objectMapper) {
String url = properties.getUrl();
return new OpenSearchRestDAO(restClientBuilder, retryTemplate, properties, objectMapper);
}

@Bean
public RetryTemplate osRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
return retryTemplate;
}

private HttpHost[] convertToHttpHosts(List<URL> hosts) {
return hosts.stream()
.map(host -> new HttpHost(host.getHost(), host.getPort(), host.getProtocol()))
.toArray(HttpHost[]::new);
}
}
Loading
Loading