From 8b9a63dcdd6dc068dd01b49b7879a51f47e4a094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 2 Oct 2023 15:27:46 +0200 Subject: [PATCH] Document Kafka Connect, text-processing, vector, re-rank agents (#509) --- .../agents/text/TextSplitterAgent.java | 2 +- ...ction.java => TiktokenLengthFunction.java} | 5 +- .../agents/vector/milvus/MilvusWriter.java | 5 - .../impl/common/AbstractAgentProvider.java | 10 +- .../impl/noop/NoOpAgentNodeProvider.java | 7 + .../impl/uti/ClassConfigValidator.java | 23 ++- .../k8s/agents/IdentityAgentProvider.java | 13 ++ .../agents/KafkaConnectAgentsProvider.java | 60 +++++- .../KubernetesCompositeAgentProvider.java | 7 + .../agents/QueryVectorDBAgentProvider.java | 77 +++++-- .../impl/k8s/agents/ReRankAgentProvider.java | 107 ++++++++++ .../agents/TextProcessingAgentsProvider.java | 178 +++++++++++++++- .../runtime/impl/k8s/CompositeAgentTest.java | 102 ++++----- .../impl/k8s/KafkaConnectAgentsTest.java | 2 - .../k8s/agents/AgentValidationTestUtil.java | 82 ++++++++ .../KafkaConnectAgentsProviderTest.java | 195 ++++++++++++++++++ ...GenAIToolKitFunctionAgentProviderTest.java | 39 +--- .../QueryVectorDBAgentProviderTest.java | 175 ++++++++++++++++ .../k8s/agents/S3SourceAgentProviderTest.java | 39 +--- .../TestGenericSinkAgentProvider.java | 69 ------- .../TestGenericSourceAgentProvider.java | 68 ------ ...i.langstream.api.runtime.AgentNodeProvider | 2 - .../kafka/TextProcessingAgentsRunnerIT.java | 4 +- 23 files changed, 952 insertions(+), 319 deletions(-) rename langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/{TikTokLengthFunction.java => TiktokenLengthFunction.java} (87%) create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/AgentValidationTestUtil.java create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java delete mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java delete mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java diff --git a/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TextSplitterAgent.java b/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TextSplitterAgent.java index 1894db27e..bca1f7e7a 100644 --- a/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TextSplitterAgent.java +++ b/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TextSplitterAgent.java @@ -67,7 +67,7 @@ private void initTextSplitter(Map configuration) { newLengthFunction = String::length; break; default: - newLengthFunction = new TikTokLengthFunction(lengthFunctionName); + newLengthFunction = new TiktokenLengthFunction(lengthFunctionName); } newTextSplitter = new RecursiveCharacterTextSplitter( diff --git a/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TikTokLengthFunction.java b/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TiktokenLengthFunction.java similarity index 87% rename from langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TikTokLengthFunction.java rename to langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TiktokenLengthFunction.java index 6970b588f..90f3bc772 100644 --- a/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TikTokLengthFunction.java +++ b/langstream-agents/langstream-agents-text-processing/src/main/java/ai/langstream/agents/text/TiktokenLengthFunction.java @@ -20,12 +20,13 @@ import com.knuddels.jtokkit.api.EncodingRegistry; import com.knuddels.jtokkit.api.EncodingType; -public class TikTokLengthFunction implements LengthFunction { +/** Java implementation of tiktoken. */ +public class TiktokenLengthFunction implements LengthFunction { private static final EncodingRegistry REGISTRY = Encodings.newDefaultEncodingRegistry(); private final EncodingType encodingType; - public TikTokLengthFunction(String encoding) { + public TiktokenLengthFunction(String encoding) { encodingType = EncodingType.fromName(encoding) .orElseThrow( diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java index b05e1edb0..2a79f145a 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java @@ -23,8 +23,6 @@ import com.alibaba.fastjson.JSONObject; import com.datastax.oss.streaming.ai.TransformContext; import com.datastax.oss.streaming.ai.jstl.JstlEvaluator; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import io.milvus.client.MilvusServiceClient; import io.milvus.grpc.CollectionSchema; import io.milvus.grpc.DescribeCollectionResponse; @@ -45,9 +43,6 @@ @Slf4j public class MilvusWriter implements VectorDatabaseWriterProvider { - private static final ObjectMapper MAPPER = - new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - @Override public boolean supports(Map dataSourceConfig) { return "milvus".equals(dataSourceConfig.get("service")); diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java index 5c4399d68..afa983650 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java @@ -114,6 +114,10 @@ protected Class getAgentConfigModelClass(String type) { return null; } + protected boolean isAgentConfigModelAllowUnknownProperties(String type) { + return false; + } + protected AgentNodeMetadata computeAgentMetadata( AgentConfiguration agentConfiguration, ExecutionPlan physicalApplicationInstance, @@ -133,7 +137,11 @@ protected Map computeAgentConfiguration( final String type = agentConfiguration.getType(); final Class modelClass = getAgentConfigModelClass(type); if (modelClass != null) { - ClassConfigValidator.validateAgentModelFromClass(agentConfiguration, modelClass); + ClassConfigValidator.validateAgentModelFromClass( + agentConfiguration, + modelClass, + agentConfiguration.getConfiguration(), + isAgentConfigModelAllowUnknownProperties(type)); } return new HashMap<>(agentConfiguration.getConfiguration()); } diff --git a/langstream-core/src/main/java/ai/langstream/impl/noop/NoOpAgentNodeProvider.java b/langstream-core/src/main/java/ai/langstream/impl/noop/NoOpAgentNodeProvider.java index acde479fe..16995f12a 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/noop/NoOpAgentNodeProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/noop/NoOpAgentNodeProvider.java @@ -15,10 +15,12 @@ */ package ai.langstream.impl.noop; +import ai.langstream.api.doc.AgentConfigurationModel; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.common.AbstractAgentProvider; import java.util.List; +import java.util.Map; import java.util.Set; public class NoOpAgentNodeProvider extends AbstractAgentProvider { @@ -31,4 +33,9 @@ public NoOpAgentNodeProvider() { protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { return ComponentType.PROCESSOR; } + + @Override + public Map generateSupportedTypesDocumentation() { + return Map.of(); + } } diff --git a/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java b/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java index bb0674abf..62d75a4d5 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java +++ b/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java @@ -87,13 +87,26 @@ public static void validateAgentModelFromClass( @SneakyThrows public static void validateAgentModelFromClass( AgentConfiguration agentConfiguration, Class modelClazz, Map asMap) { + validateAgentModelFromClass(agentConfiguration, modelClazz, asMap, false); + } + + @SneakyThrows + public static void validateAgentModelFromClass( + AgentConfiguration agentConfiguration, + Class modelClazz, + Map asMap, + boolean allowUnknownProperties) { asMap = validatorMapper.readValue(validatorMapper.writeValueAsBytes(asMap), Map.class); final AgentConfigurationModel agentConfigurationModel = generateAgentModelFromClass(modelClazz); validateProperties( - agentConfiguration, null, asMap, agentConfigurationModel.getProperties()); + agentConfiguration, + null, + asMap, + agentConfigurationModel.getProperties(), + allowUnknownProperties); try { validatorMapper.convertValue(asMap, modelClazz); @@ -129,8 +142,9 @@ private static void validateProperties( AgentConfiguration agentConfiguration, String parentProp, Map asMap, - Map properties) { - if (asMap != null) { + Map properties, + boolean allowUnknownProperties) { + if (!allowUnknownProperties && asMap != null) { for (String key : asMap.keySet()) { if (!properties.containsKey(key)) { final String fullPropertyKey = @@ -176,7 +190,8 @@ private static void validateProperty( agentConfiguration, fullPropertyKey, actualValue == null ? null : (Map) actualValue, - propertyValue.getProperties()); + propertyValue.getProperties(), + false); } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/IdentityAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/IdentityAgentProvider.java index f443205e8..1bcdc0ecc 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/IdentityAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/IdentityAgentProvider.java @@ -17,6 +17,7 @@ import static ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime.CLUSTER_TYPE; +import ai.langstream.api.doc.AgentConfig; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.agents.AbstractComposableAgentProvider; @@ -38,4 +39,16 @@ public IdentityAgentProvider() { protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { return ComponentType.PROCESSOR; } + + @Override + protected Class getAgentConfigModelClass(String type) { + return IdentityAgentConfig.class; + } + + @AgentConfig( + name = "Identity function", + description = + "Simple agent to move data from the input to the output. " + + "Could be used for testing or sample applications.") + public static class IdentityAgentConfig {} } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProvider.java index e378e355e..a446205b7 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProvider.java @@ -15,17 +15,25 @@ */ package ai.langstream.runtime.impl.k8s.agents; +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.common.AbstractAgentProvider; +import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Set; public class KafkaConnectAgentsProvider extends AbstractAgentProvider { public KafkaConnectAgentsProvider() { - super(Set.of("sink", "source"), List.of(KubernetesClusterRuntime.CLUSTER_TYPE)); + super( + Set.of("sink", "source"), + List.of( + KubernetesClusterRuntime.CLUSTER_TYPE, + NoOpComputeClusterRuntimeProvider.CLUSTER_TYPE)); } @Override @@ -36,4 +44,54 @@ protected ComponentType getComponentType(AgentConfiguration agentConfiguration) default -> throw new IllegalStateException(); }; } + + @Override + protected boolean isAgentConfigModelAllowUnknownProperties(String type) { + return true; + } + + @Override + protected Class getAgentConfigModelClass(String type) { + return switch (type) { + case "sink" -> KafkaSinkConnectAgentConfig.class; + case "source" -> KafkaSourceConnectAgentConfig.class; + default -> throw new IllegalStateException(); + }; + } + + @AgentConfig( + name = "Kafka Connect Sink agent", + description = + """ + Run any Kafka Connect Sink. + All the configuration properties are passed to the Kafka Connect Sink. + """) + public static class KafkaSinkConnectAgentConfig { + @ConfigProperty( + description = + """ + Java main class for the Kafka Sink connector. + """, + required = true) + @JsonProperty("connector.class") + private String connectorClass; + } + + @AgentConfig( + name = "Kafka Connect Source agent", + description = + """ + Run any Kafka Connect Source. + All the configuration properties are passed to the Kafka Connect Source. + """) + public static class KafkaSourceConnectAgentConfig { + @ConfigProperty( + description = + """ + Java main class for the Kafka Source connector. + """, + required = true) + @JsonProperty("connector.class") + private String connectorClass; + } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KubernetesCompositeAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KubernetesCompositeAgentProvider.java index 6c0e97b4f..bb88aa881 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KubernetesCompositeAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/KubernetesCompositeAgentProvider.java @@ -15,10 +15,12 @@ */ package ai.langstream.runtime.impl.k8s.agents; +import ai.langstream.api.doc.AgentConfigurationModel; import ai.langstream.impl.agents.AbstractCompositeAgentProvider; import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; import java.util.List; +import java.util.Map; public class KubernetesCompositeAgentProvider extends AbstractCompositeAgentProvider { public KubernetesCompositeAgentProvider() { @@ -27,4 +29,9 @@ public KubernetesCompositeAgentProvider() { KubernetesClusterRuntime.CLUSTER_TYPE, NoOpComputeClusterRuntimeProvider.CLUSTER_TYPE)); } + + @Override + public Map generateSupportedTypesDocumentation() { + return Map.of(); + } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java index 70b417ed2..0d0e23415 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java @@ -15,6 +15,8 @@ */ package ai.langstream.runtime.impl.k8s.agents; +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.model.Application; import ai.langstream.api.model.Module; @@ -25,26 +27,31 @@ import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.agents.AbstractComposableAgentProvider; +import ai.langstream.impl.agents.ai.steps.QueryConfiguration; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; import java.util.List; import java.util.Map; import java.util.Set; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j public class QueryVectorDBAgentProvider extends AbstractComposableAgentProvider { + protected static final String QUERY_VECTOR_DB = "query-vector-db"; + protected static final String VECTOR_DB_SINK = "vector-db-sink"; + public QueryVectorDBAgentProvider() { super( - Set.of("query-vector-db", "vector-db-sink"), - List.of(KubernetesClusterRuntime.CLUSTER_TYPE)); + Set.of(QUERY_VECTOR_DB, VECTOR_DB_SINK), + List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); } @Override protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { return switch (agentConfiguration.getType()) { - case "query-vector-db" -> ComponentType.PROCESSOR; - case "vector-db-sink" -> ComponentType.SINK; + case QUERY_VECTOR_DB -> ComponentType.PROCESSOR; + case VECTOR_DB_SINK -> ComponentType.SINK; default -> throw new IllegalStateException(); }; } @@ -68,14 +75,9 @@ protected Map computeAgentConfiguration( // get the datasource configuration and inject it into the agent configuration String resourceId = (String) originalConfiguration.remove("datasource"); - if (resourceId == null || resourceId.isEmpty()) { - throw new IllegalArgumentException( - "Missing required field 'datasource' in agent definition, type=" - + agentConfiguration.getType() - + ", name=" - + agentConfiguration.getName() - + ", id=" - + agentConfiguration.getId()); + if (resourceId == null) { + throw new IllegalStateException( + "datasource is required but this exception should have been raised before ?"); } generateDataSourceConfiguration( resourceId, @@ -102,16 +104,61 @@ private void generateDataSourceConfiguration( if (!resource.type().equals("datasource") && !resource.type().equals("vector-database")) { throw new IllegalArgumentException( - "Resource " + "Resource '" + resourceId - + " is not type=datasource or type=vector-database"); + + "' is not type=datasource or type=vector-database"); } if (configuration.containsKey("datasource")) { throw new IllegalArgumentException("Only one datasource is supported"); } configuration.put("datasource", resourceImplementation); } else { - throw new IllegalArgumentException("Resource " + resourceId + " not found"); + throw new IllegalArgumentException("Resource '" + resourceId + "' not found"); } } + + @Override + protected Class getAgentConfigModelClass(String type) { + return switch (type) { + case QUERY_VECTOR_DB -> QueryVectorDBConfig.class; + case VECTOR_DB_SINK -> VectorDBSinkConfig.class; + default -> throw new IllegalStateException(type); + }; + } + + @Override + protected boolean isAgentConfigModelAllowUnknownProperties(String type) { + return switch (type) { + case QUERY_VECTOR_DB -> false; + case VECTOR_DB_SINK -> true; + default -> throw new IllegalStateException(type); + }; + } + + @AgentConfig( + name = "Query a vector database", + description = + """ + Query a vector database using Vector Search capabilities. + """) + @Data + public static class QueryVectorDBConfig extends QueryConfiguration {} + + @AgentConfig( + name = "Vector database sink", + description = + """ + Store vectors in a vector database. + Configuration properties depends on the vector database implementation, specified by the "datasource" property. + """) + @Data + public static class VectorDBSinkConfig { + @ConfigProperty( + description = + """ + The defined datasource ID to use to store the vectors. + """, + required = true) + private String datasource; + } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ReRankAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ReRankAgentProvider.java index 9c40f449d..2964221b0 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ReRankAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ReRankAgentProvider.java @@ -15,12 +15,16 @@ */ package ai.langstream.runtime.impl.k8s.agents; +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.agents.AbstractComposableAgentProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Set; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -36,4 +40,107 @@ public ReRankAgentProvider() { protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { return ComponentType.PROCESSOR; } + + @Override + protected Class getAgentConfigModelClass(String type) { + return ReRankConfiguration.class; + } + + @AgentConfig( + name = "Re-rank", + description = + """ + Agent for re-ranking documents based on a query. + """) + @Data + public static class ReRankConfiguration { + @ConfigProperty( + description = + """ + The field that contains the documents to sort. + """, + required = true) + private String field; + + @ConfigProperty( + description = + """ + The field that will hold the results, it can be the same as "field" to override it. + """, + required = true) + @JsonProperty("output-field") + private String outputField; + + @ConfigProperty( + description = + """ + Algorithm to use for re-ranking. 'none' or 'MMR'. + """, + defaultValue = "none") + private String algorithm; + + @ConfigProperty( + description = + """ + Field that contains the embeddings of the documents to sort. + """) + @JsonProperty("query-embeddings") + private String queryEmbeddings; + + @ConfigProperty( + description = + """ + Field that already contains the text that has been embedded. + """) + @JsonProperty("query-text") + private String queryText; + + @ConfigProperty( + description = + """ + Result field for the embeddings. + """) + @JsonProperty("embeddings-field") + private String embeddingsField; + + @ConfigProperty( + description = + """ + Result field for the text. + """) + @JsonProperty("text-field") + private String textField; + + @ConfigProperty( + description = + """ + Maximum number of documents to keep. + """, + defaultValue = "100") + private int max; + + @ConfigProperty( + description = + """ + Parameter for MMR algorithm. + """, + defaultValue = "0.5") + private double lambda; + + @ConfigProperty( + description = + """ + Parameter for B25 algorithm. + """, + defaultValue = "1.5") + private double k1; + + @ConfigProperty( + description = + """ + Parameter for B25 algorithm. + """, + defaultValue = "0.75") + private double b; + } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/TextProcessingAgentsProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/TextProcessingAgentsProvider.java index f4ee112e9..af9df2937 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/TextProcessingAgentsProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/TextProcessingAgentsProvider.java @@ -15,25 +15,34 @@ */ package ai.langstream.runtime.impl.k8s.agents; +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.agents.AbstractComposableAgentProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Set; +import lombok.Data; import lombok.extern.slf4j.Slf4j; /** Implements support for Text Processing Agents. */ @Slf4j public class TextProcessingAgentsProvider extends AbstractComposableAgentProvider { + protected static final String TEXT_EXTRACTOR = "text-extractor"; + protected static final String LANGUAGE_DETECTOR = "language-detector"; + protected static final String TEXT_SPLITTER = "text-splitter"; + protected static final String TEXT_NORMALISER = "text-normaliser"; + protected static final String DOCUMENT_TO_JSON = "document-to-json"; private static final Set SUPPORTED_AGENT_TYPES = Set.of( - "text-extractor", - "language-detector", - "text-splitter", - "text-normaliser", - "document-to-json"); + TEXT_EXTRACTOR, + LANGUAGE_DETECTOR, + TEXT_SPLITTER, + TEXT_NORMALISER, + DOCUMENT_TO_JSON); public TextProcessingAgentsProvider() { super(SUPPORTED_AGENT_TYPES, List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); @@ -43,4 +52,163 @@ public TextProcessingAgentsProvider() { protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { return ComponentType.PROCESSOR; } + + @Override + protected Class getAgentConfigModelClass(String type) { + return switch (type) { + case TEXT_EXTRACTOR -> TextExtractorConfig.class; + case LANGUAGE_DETECTOR -> LanguageDetectorConfig.class; + case TEXT_SPLITTER -> TextSplitterConfig.class; + case TEXT_NORMALISER -> TextNormaliserConfig.class; + case DOCUMENT_TO_JSON -> DocumentToJsonConfig.class; + default -> throw new IllegalArgumentException("Unsupported agent type: " + type); + }; + } + + @AgentConfig( + name = "Text extractor", + description = + """ + Extracts text content from different document formats like PDF, JSON, XML, ODF, HTML and many others. + """) + @Data + public static class TextExtractorConfig {} + + @AgentConfig( + name = "Language detector", + description = + """ + Detect the language of a message’s data and limit further processing based on language codes. + """) + @Data + public static class LanguageDetectorConfig { + @ConfigProperty( + description = + """ + The name of the message header to write the language code to. + """, + defaultValue = "language") + private String property; + + @ConfigProperty( + description = + """ + Define a list of allowed language codes. If the message language is not in this list, the message is dropped. + """) + private List allowedLanguages; + } + + @AgentConfig( + name = "Text splitter", + description = """ + Split message content in chunks. + """) + @Data + public static class TextSplitterConfig { + @ConfigProperty( + description = + """ + Splitter implementation to use. Currently supported: RecursiveCharacterTextSplitter. + """, + defaultValue = "RecursiveCharacterTextSplitter") + private String splitter_type; + + @ConfigProperty( + description = + """ + RecursiveCharacterTextSplitter splitter option. The separator to use for splitting. + Checkout https://github.com/knuddelsgmbh/jtokkit for more details. + """, + defaultValue = "\"\\n\\n\", \"\\n\", \" \", \"\"") + private List separators; + + @ConfigProperty( + description = + """ + RecursiveCharacterTextSplitter splitter option. Whether or not to keep separators. + Checkout https://github.com/knuddelsgmbh/jtokkit for more details. + """, + defaultValue = "false") + private boolean keep_separator; + + @ConfigProperty( + description = + """ + RecursiveCharacterTextSplitter splitter option. Chunk size of each message. + Checkout https://github.com/knuddelsgmbh/jtokkit for more details. + """, + defaultValue = "200") + private int chunk_size; + + @ConfigProperty( + description = + """ + RecursiveCharacterTextSplitter splitter option. Chunk overlap of the previous message. + Checkout https://github.com/knuddelsgmbh/jtokkit for more details. + """, + defaultValue = "100") + private int chunk_overlap; + + @ConfigProperty( + description = + """ + RecursiveCharacterTextSplitter splitter option. Options are: r50k_base, p50k_base, p50k_edit and cl100k_base. + Checkout https://github.com/knuddelsgmbh/jtokkit for more details. + """, + defaultValue = "cl100k_base") + private String length_function; + } + + @AgentConfig( + name = "Text normaliser", + description = """ + Apply normalisation to the text. + """) + @Data + public static class TextNormaliserConfig { + @ConfigProperty( + description = + """ + Whether to make the text lowercase. + """, + defaultValue = "true") + @JsonProperty("make-lowercase") + private boolean makeLowercase; + + @ConfigProperty( + description = + """ + Whether to trim spaces from the text. + """, + defaultValue = "true") + @JsonProperty("trim-spaces") + private boolean trimSpaces; + } + + @AgentConfig( + name = "Document to JSON", + description = + """ + Convert raw text document to JSON. The result will be a JSON object with the text content in the specified field. + """) + @Data + public static class DocumentToJsonConfig { + @ConfigProperty( + description = + """ + Field name to write the text content to. + """, + defaultValue = "text") + @JsonProperty("text-field") + private String textField; + + @ConfigProperty( + description = + """ + Whether to copy the message properties/headers in the output message. + """, + defaultValue = "true") + @JsonProperty("copy-properties") + private boolean copyProperties; + } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/CompositeAgentTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/CompositeAgentTest.java index 7bde42e30..7d98cff44 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/CompositeAgentTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/CompositeAgentTest.java @@ -74,14 +74,12 @@ public void testMerge2TextProcessorAgents() throws Exception { id: "step1" type: "text-extractor" input: "input-topic" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" output: "output-topic" configuration: - param2: "value2" + property: "value2" """), buildInstanceYaml(), null) @@ -118,13 +116,10 @@ public void testMerge2TextProcessorAgents() throws Exception { (List>) configuration.get("processors"); assertEquals(2, agents.size()); assertEquals("text-extractor", agents.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) agents.get(0).get("configuration")).get("param1")); assertEquals("language-detector", agents.get(1).get("agentType")); assertEquals( "value2", - ((Map) agents.get(1).get("configuration")).get("param2")); + ((Map) agents.get(1).get("configuration")).get("property")); Topic inputTopic = (Topic) defaultAgentNode.getInputConnectionImplementation(); assertEquals("input-topic", inputTopic.topicName()); @@ -152,19 +147,17 @@ public void testMerge3TextProcessorAgents() throws Exception { id: "step1" type: "text-extractor" input: "input-topic" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "language-detector-2" id: "step3" type: "language-detector" output: "output-topic" configuration: - param3: "value3" + allowedLanguages: ["en"] """), buildInstanceYaml(), null) @@ -201,17 +194,15 @@ public void testMerge3TextProcessorAgents() throws Exception { (List>) configuration.get("processors"); assertEquals(3, agents.size()); assertEquals("text-extractor", agents.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) agents.get(0).get("configuration")).get("param1")); assertEquals("language-detector", agents.get(1).get("agentType")); assertEquals( "value2", - ((Map) agents.get(1).get("configuration")).get("param2")); + ((Map) agents.get(1).get("configuration")).get("property")); assertEquals("language-detector", agents.get(2).get("agentType")); assertEquals( - "value3", - ((Map) agents.get(2).get("configuration")).get("param3")); + List.of("en"), + ((Map) agents.get(2).get("configuration")) + .get("allowedLanguages")); Topic inputTopic = (Topic) defaultAgentNode.getInputConnectionImplementation(); assertEquals("input-topic", inputTopic.topicName()); @@ -239,13 +230,11 @@ public void testMixWithNonComposableAgents() throws Exception { id: "step1" type: "text-extractor" input: "input-topic" - configuration: - param1: "value1" - name: "language-detector" id: "step1b" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "drop-if-not-english" id: "step2" type: "drop" @@ -257,7 +246,7 @@ public void testMixWithNonComposableAgents() throws Exception { type: "language-detector" output: "output-topic" configuration: - param3: "value3" + allowedLanguages: ["en"] """), buildInstanceYaml(), null) @@ -312,13 +301,10 @@ public void testMixWithNonComposableAgents() throws Exception { (List>) configuration.get("processors"); assertEquals(2, agents.size()); assertEquals("text-extractor", agents.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) agents.get(0).get("configuration")).get("param1")); assertEquals("language-detector", agents.get(1).get("agentType")); assertEquals( "value2", - ((Map) agents.get(1).get("configuration")).get("param2")); + ((Map) agents.get(1).get("configuration")).get("property")); Topic inputTopic = (Topic) defaultAgentNode.getInputConnectionImplementation(); assertEquals("input-topic", inputTopic.topicName()); @@ -342,7 +328,7 @@ public void testMixWithNonComposableAgents() throws Exception { implementation.getAgentImplementation(module, "step3"); DefaultAgentNode defaultAgentNodeLast = (DefaultAgentNode) agentImplementationLast; Map configurationLast = defaultAgentNodeLast.getConfiguration(); - assertEquals("value3", configurationLast.get("param3")); + assertEquals(List.of("en"), configurationLast.get("allowedLanguages")); Topic inputTopicLast = (Topic) defaultAgentNodeLast.getInputConnectionImplementation(); assertEquals("agent-step3-input", inputTopicLast.topicName()); @@ -373,19 +359,17 @@ public void testMergeSourceWithProcessors() throws Exception { - name: "text-extractor" id: "step1" type: "text-extractor" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "language-detector-2" id: "step3" type: "language-detector" output: "output-topic" configuration: - param3: "value3" + allowedLanguages: ["en"] """), buildInstanceYaml(), null) @@ -424,17 +408,15 @@ public void testMergeSourceWithProcessors() throws Exception { (List>) configuration.get("processors"); assertEquals(3, processors.size()); assertEquals("text-extractor", processors.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) processors.get(0).get("configuration")).get("param1")); assertEquals("language-detector", processors.get(1).get("agentType")); assertEquals( "value2", - ((Map) processors.get(1).get("configuration")).get("param2")); + ((Map) processors.get(1).get("configuration")).get("property")); assertEquals("language-detector", processors.get(2).get("agentType")); assertEquals( - "value3", - ((Map) processors.get(2).get("configuration")).get("param3")); + List.of("en"), + ((Map) processors.get(2).get("configuration")) + .get("allowedLanguages")); Topic outputTopic = (Topic) defaultAgentNode.getOutputConnectionImplementation(); assertEquals("output-topic", outputTopic.topicName()); @@ -460,18 +442,16 @@ public void testMergeSinkWithProcessorAgents() throws Exception { id: "step1" type: "text-extractor" input: "input-topic" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "language-detector-2" id: "step3" type: "language-detector" configuration: - param3: "value3" + allowedLanguages: ["en"] - name: "generic-composable-sink" type: "generic-composable-sink" configuration: @@ -507,17 +487,15 @@ public void testMergeSinkWithProcessorAgents() throws Exception { (List>) configuration.get("processors"); assertEquals(3, processors.size()); assertEquals("text-extractor", processors.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) processors.get(0).get("configuration")).get("param1")); assertEquals("language-detector", processors.get(1).get("agentType")); assertEquals( "value2", - ((Map) processors.get(1).get("configuration")).get("param2")); + ((Map) processors.get(1).get("configuration")).get("property")); assertEquals("language-detector", processors.get(2).get("agentType")); assertEquals( - "value3", - ((Map) processors.get(2).get("configuration")).get("param3")); + List.of("en"), + ((Map) processors.get(2).get("configuration")) + .get("allowedLanguages")); Topic inputTopic = (Topic) defaultAgentNode.getInputConnectionImplementation(); assertEquals("input-topic", inputTopic.topicName()); @@ -549,18 +527,16 @@ public void testMergeSourceAndSinkWithProcessorAgents() throws Exception { - name: "text-extractor" id: "step1" type: "text-extractor" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "language-detector-2" id: "step3" type: "language-detector" configuration: - param3: "value3" + allowedLanguages: ["en"] - name: "generic-composable-sink" type: "generic-composable-sink" configuration: @@ -598,17 +574,15 @@ public void testMergeSourceAndSinkWithProcessorAgents() throws Exception { (List>) configuration.get("processors"); assertEquals(3, processors.size()); assertEquals("text-extractor", processors.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) processors.get(0).get("configuration")).get("param1")); assertEquals("language-detector", processors.get(1).get("agentType")); assertEquals( "value2", - ((Map) processors.get(1).get("configuration")).get("param2")); + ((Map) processors.get(1).get("configuration")).get("property")); assertEquals("language-detector", processors.get(2).get("agentType")); assertEquals( - "value3", - ((Map) processors.get(2).get("configuration")).get("param3")); + List.of("en"), + ((Map) processors.get(2).get("configuration")) + .get("allowedLanguages")); assertNull(defaultAgentNode.getInputConnectionImplementation()); assertNull(defaultAgentNode.getOutputConnectionImplementation()); @@ -639,13 +613,11 @@ public void testMergeSourceAndSinkWithProcessorAgentsWithIntermediateTopics() th - name: "text-extractor" id: "step1" type: "text-extractor" - configuration: - param1: "value1" - name: "language-detector" id: "step2" type: "language-detector" configuration: - param2: "value2" + property: "value2" - name: "requires-buffer-topic" id: "bad-step" type: "drop" @@ -655,7 +627,7 @@ public void testMergeSourceAndSinkWithProcessorAgentsWithIntermediateTopics() th id: "step3" type: "language-detector" configuration: - param3: "value3" + allowedLanguages: ["en"] - name: "generic-composable-sink" type: "generic-composable-sink" configuration: @@ -702,13 +674,10 @@ public void testMergeSourceAndSinkWithProcessorAgentsWithIntermediateTopics() th (List>) configuration.get("processors"); assertEquals(2, processors.size()); assertEquals("text-extractor", processors.get(0).get("agentType")); - assertEquals( - "value1", - ((Map) processors.get(0).get("configuration")).get("param1")); assertEquals("language-detector", processors.get(1).get("agentType")); assertEquals( "value2", - ((Map) processors.get(1).get("configuration")).get("param2")); + ((Map) processors.get(1).get("configuration")).get("property")); Topic outputTopic = (Topic) defaultFirstNode.getOutputConnectionImplementation(); assertEquals("agent-bad-step-input", outputTopic.topicName()); @@ -734,8 +703,9 @@ public void testMergeSourceAndSinkWithProcessorAgentsWithIntermediateTopics() th assertEquals(1, processors.size()); assertEquals("language-detector", processors.get(0).get("agentType")); assertEquals( - "value3", - ((Map) processors.get(0).get("configuration")).get("param3")); + List.of("en"), + ((Map) processors.get(0).get("configuration")) + .get("allowedLanguages")); Map sink = (Map) configurationThirdNode.get("sink"); assertEquals("generic-composable-sink", sink.get("agentType")); diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KafkaConnectAgentsTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KafkaConnectAgentsTest.java index d561f8489..5e6c31b48 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KafkaConnectAgentsTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KafkaConnectAgentsTest.java @@ -100,7 +100,6 @@ public void testConfigureKafkaConnectSink() throws Exception { Map configuration = step.getConfiguration(); log.info("Configuration: {}", configuration); assertEquals("FileStreamSink", configuration.get("connector.class")); - assertEquals("input-topic", configuration.get("topics")); assertEquals("/tmp/test.sink.txt", configuration.get("file")); assertEquals(ComponentType.SINK, step.getComponentType()); } @@ -153,7 +152,6 @@ public void testConfigureKafkaConnectSources() throws Exception { Map configuration = step.getConfiguration(); log.info("Configuration: {}", configuration); assertEquals("FileStreamSource", configuration.get("connector.class")); - assertEquals("output-topic", configuration.get("topic")); assertEquals("/tmp/test.txt", configuration.get("file")); assertEquals(ComponentType.SOURCE, step.getComponentType()); } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/AgentValidationTestUtil.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/AgentValidationTestUtil.java new file mode 100644 index 000000000..3843bc784 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/AgentValidationTestUtil.java @@ -0,0 +1,82 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import static org.junit.jupiter.api.Assertions.fail; + +import ai.langstream.api.model.Application; +import ai.langstream.api.runtime.ClusterRuntimeRegistry; +import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.impl.deploy.ApplicationDeployer; +import ai.langstream.impl.parser.ModelBuilder; +import java.util.Map; +import lombok.Cleanup; + +public class AgentValidationTestUtil { + + public static void validate(String pipeline, String expectErrMessage) throws Exception { + if (expectErrMessage != null && expectErrMessage.isBlank()) { + throw new IllegalArgumentException("expectErrMessage cannot be blank"); + } + Application applicationInstance = + ModelBuilder.buildApplicationInstance( + Map.of( + "module.yaml", + pipeline, + "configuration.yaml", + """ + configuration: + resources: + - type: "datasource" + name: "cassandra" + configuration: + service: "cassandra" + contact-points: "xx" + loadBalancing-localDc: "xx" + port: 999 + """), + """ + instance: + streamingCluster: + type: "noop" + computeCluster: + type: "none" + """, + null) + .getApplication(); + + @Cleanup + ApplicationDeployer deployer = + ApplicationDeployer.builder() + .registry(new ClusterRuntimeRegistry()) + .pluginsRegistry(new PluginsRegistry()) + .build(); + + try { + ExecutionPlan implementation = + deployer.createImplementation("app", applicationInstance); + if (expectErrMessage != null) { + fail("Expected error message: " + expectErrMessage); + } + } catch (IllegalArgumentException e) { + if (expectErrMessage != null && e.getMessage().contains(expectErrMessage)) { + return; + } + fail("Expected error message: " + expectErrMessage + " but got: " + e.getMessage()); + } + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java new file mode 100644 index 000000000..206efb5c0 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java @@ -0,0 +1,195 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import ai.langstream.api.doc.AgentConfigurationModel; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.deployer.k8s.util.SerializationUtil; +import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class KafkaConnectAgentsProviderTest { + @Test + @SneakyThrows + public void testValidationSource() { + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + type: "source" + input: in + output: out + configuration: + connector.class: "io.confluent.connect.s3.S3SourceConnector" + """, + null); + + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + input: in + output: out + type: "source" + configuration: {} + """, + "Found error on an agent configuration (agent: 'my-source', type: 'source'). Property 'connector.class' is required"); + + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + type: "source" + input: in + output: out + configuration: + connector.class: "io.confluent.connect.s3.S3SourceConnector" + whatever.config: + inner: "value" + """, + null); + } + + @Test + @SneakyThrows + public void testValidationSink() { + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + type: "sink" + input: in + output: out + configuration: + connector.class: "io.confluent.connect.s3.S3SourceConnector" + """, + null); + + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + input: in + output: out + type: "sink" + configuration: {} + """, + "Found error on an agent configuration (agent: 'my-source', type: 'sink'). Property 'connector.class' is required"); + + validate( + """ + topics: + - name: in + - name: out + pipeline: + - name: "my-source" + type: "sink" + input: in + output: out + configuration: + connector.class: "io.confluent.connect.s3.S3SourceConnector" + whatever.config: + inner: "value" + """, + null); + } + + private void validate(String pipeline, String expectErrMessage) throws Exception { + AgentValidationTestUtil.validate(pipeline, expectErrMessage); + } + + @Test + @SneakyThrows + public void testDocumentation() { + final Map model = + new PluginsRegistry() + .lookupAgentImplementation( + "s3-source", + new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) + .generateSupportedTypesDocumentation(); + + Assertions.assertEquals( + """ + { + "s3-source" : { + "name" : "S3 Source", + "description" : "Reads data from S3 bucket", + "properties" : { + "access-key" : { + "description" : "Access key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + }, + "bucketName" : { + "description" : "The name of the bucket to read from.", + "required" : false, + "type" : "string", + "defaultValue" : "langstream-source" + }, + "endpoint" : { + "description" : "The endpoint of the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "http://minio-endpoint.-not-set:9090" + }, + "file-extensions" : { + "description" : "Comma separated list of file extensions to filter by.", + "required" : false, + "type" : "string", + "defaultValue" : "pdf,docx,html,htm,md,txt" + }, + "idle-time" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "integer", + "defaultValue" : "5" + }, + "region" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "string" + }, + "secret-key" : { + "description" : "Secret key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + } + } + } + }""", + SerializationUtil.prettyPrintJson(model)); + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java index ea432099c..6e3c8ada7 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java @@ -18,16 +18,10 @@ import static org.junit.jupiter.api.Assertions.*; import ai.langstream.api.doc.AgentConfigurationModel; -import ai.langstream.api.model.Application; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.deployer.k8s.util.SerializationUtil; -import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; -import ai.langstream.impl.parser.ModelBuilder; import java.util.Map; -import lombok.Cleanup; import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -81,38 +75,7 @@ public void testValidationDropFields() { } private void validate(String pipeline, String expectErrMessage) throws Exception { - Application applicationInstance = - ModelBuilder.buildApplicationInstance( - Map.of("module.yaml", pipeline), - """ - instance: - streamingCluster: - type: "noop" - computeCluster: - type: "none" - """, - null) - .getApplication(); - - @Cleanup - ApplicationDeployer deployer = - ApplicationDeployer.builder() - .registry(new ClusterRuntimeRegistry()) - .pluginsRegistry(new PluginsRegistry()) - .build(); - - try { - ExecutionPlan implementation = - deployer.createImplementation("app", applicationInstance); - if (expectErrMessage != null) { - fail("Expected error message: " + expectErrMessage); - } - } catch (IllegalArgumentException e) { - if (expectErrMessage != null && e.getMessage().contains(expectErrMessage)) { - return; - } - fail("Expected error message: " + expectErrMessage + " but got: " + e.getMessage()); - } + AgentValidationTestUtil.validate(pipeline, expectErrMessage); } @Test diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java new file mode 100644 index 000000000..6ff333789 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java @@ -0,0 +1,175 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import static org.junit.jupiter.api.Assertions.*; + +import ai.langstream.api.doc.AgentConfigurationModel; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.deployer.k8s.util.SerializationUtil; +import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class QueryVectorDBAgentProviderTest { + + @Test + @SneakyThrows + public void testValidationQueryDb() { + validate( + """ + pipeline: + - name: "db" + type: "query-vector-db" + configuration: + output-field: result + query: "select xxx" + datasource: "cassandra" + unknown-field: "..." + """, + "Found error on an agent configuration (agent: 'db', type: 'query-vector-db'). Property 'unknown-field' is unknown"); + + validate( + """ + pipeline: + - name: "db" + type: "query-vector-db" + configuration: + output-field: result + query: "select xxx" + datasource: "not exists" + """, + "Resource 'not exists' not found"); + + validate( + """ + pipeline: + - name: "db" + type: "query-vector-db" + configuration: + output-field: result + query: "select xxx" + datasource: "cassandra" + """, + null); + } + + @Test + @SneakyThrows + public void testWriteDb() { + validate( + """ + pipeline: + - name: "db" + type: "vector-db-sink" + configuration: + datasource: "not exists" + unknown-field: "..." + """, + "Resource 'not exists' not found"); + + validate( + """ + pipeline: + - name: "db" + type: "vector-db-sink" + configuration: + unknown-field: "..." + """, + "Found error on an agent configuration (agent: 'db', type: 'vector-db-sink'). Property 'datasource' is required"); + + validate( + """ + pipeline: + - name: "db" + type: "vector-db-sink" + configuration: + datasource: "cassandra" + unknown-field: "..." + """, + null); + } + + private void validate(String pipeline, String expectErrMessage) throws Exception { + AgentValidationTestUtil.validate(pipeline, expectErrMessage); + } + + @Test + @SneakyThrows + public void testDocumentation() { + final Map model = + new PluginsRegistry() + .lookupAgentImplementation( + "s3-source", + new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) + .generateSupportedTypesDocumentation(); + + Assertions.assertEquals( + """ + { + "s3-source" : { + "name" : "S3 Source", + "description" : "Reads data from S3 bucket", + "properties" : { + "access-key" : { + "description" : "Access key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + }, + "bucketName" : { + "description" : "The name of the bucket to read from.", + "required" : false, + "type" : "string", + "defaultValue" : "langstream-source" + }, + "endpoint" : { + "description" : "The endpoint of the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "http://minio-endpoint.-not-set:9090" + }, + "file-extensions" : { + "description" : "Comma separated list of file extensions to filter by.", + "required" : false, + "type" : "string", + "defaultValue" : "pdf,docx,html,htm,md,txt" + }, + "idle-time" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "integer", + "defaultValue" : "5" + }, + "region" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "string" + }, + "secret-key" : { + "description" : "Secret key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + } + } + } + }""", + SerializationUtil.prettyPrintJson(model)); + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java index 5ce23f3bf..05ae34c5e 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java @@ -18,16 +18,10 @@ import static org.junit.jupiter.api.Assertions.*; import ai.langstream.api.doc.AgentConfigurationModel; -import ai.langstream.api.model.Application; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.deployer.k8s.util.SerializationUtil; -import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider; -import ai.langstream.impl.parser.ModelBuilder; import java.util.Map; -import lombok.Cleanup; import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -94,38 +88,7 @@ public void testValidation() { } private void validate(String pipeline, String expectErrMessage) throws Exception { - Application applicationInstance = - ModelBuilder.buildApplicationInstance( - Map.of("module.yaml", pipeline), - """ - instance: - streamingCluster: - type: "noop" - computeCluster: - type: "none" - """, - null) - .getApplication(); - - @Cleanup - ApplicationDeployer deployer = - ApplicationDeployer.builder() - .registry(new ClusterRuntimeRegistry()) - .pluginsRegistry(new PluginsRegistry()) - .build(); - - try { - ExecutionPlan implementation = - deployer.createImplementation("app", applicationInstance); - if (expectErrMessage != null) { - fail("Expected error message: " + expectErrMessage); - } - } catch (IllegalArgumentException e) { - if (expectErrMessage != null && e.getMessage().contains(expectErrMessage)) { - return; - } - throw e; - } + AgentValidationTestUtil.validate(pipeline, expectErrMessage); } @Test diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java deleted file mode 100644 index 68d8f8091..000000000 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ai.langstream.runtime.impl.testagents; - -import ai.langstream.api.model.AgentConfiguration; -import ai.langstream.api.model.Module; -import ai.langstream.api.model.Pipeline; -import ai.langstream.api.runtime.ComponentType; -import ai.langstream.api.runtime.ComputeClusterRuntime; -import ai.langstream.api.runtime.ConnectionImplementation; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; -import ai.langstream.api.runtime.Topic; -import ai.langstream.impl.common.AbstractAgentProvider; -import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class TestGenericSinkAgentProvider extends AbstractAgentProvider { - - public TestGenericSinkAgentProvider() { - super(Set.of("sink"), List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); - } - - @Override - protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { - return ComponentType.SINK; - } - - @Override - protected Map computeAgentConfiguration( - AgentConfiguration agentConfiguration, - Module module, - Pipeline pipeline, - ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime, - PluginsRegistry pluginsRegistry) { - Map copy = - super.computeAgentConfiguration( - agentConfiguration, - module, - pipeline, - executionPlan, - clusterRuntime, - pluginsRegistry); - - // we can auto-wire the "topics" configuration property - ConnectionImplementation connectionImplementation = - executionPlan.getConnectionImplementation(module, agentConfiguration.getInput()); - if (connectionImplementation instanceof Topic topic) { - copy.put("topics", topic.topicName()); - } - return copy; - } -} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java deleted file mode 100644 index b1e613aee..000000000 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ai.langstream.runtime.impl.testagents; - -import ai.langstream.api.model.AgentConfiguration; -import ai.langstream.api.model.Module; -import ai.langstream.api.model.Pipeline; -import ai.langstream.api.runtime.ComponentType; -import ai.langstream.api.runtime.ComputeClusterRuntime; -import ai.langstream.api.runtime.ConnectionImplementation; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; -import ai.langstream.api.runtime.Topic; -import ai.langstream.impl.common.AbstractAgentProvider; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class TestGenericSourceAgentProvider extends AbstractAgentProvider { - - public TestGenericSourceAgentProvider() { - super(Set.of("source"), List.of("none")); - } - - @Override - protected ComponentType getComponentType(AgentConfiguration agentConfiguration) { - return ComponentType.SOURCE; - } - - @Override - protected Map computeAgentConfiguration( - AgentConfiguration agentConfiguration, - Module module, - Pipeline pipeline, - ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime, - PluginsRegistry pluginsRegistry) { - Map copy = - super.computeAgentConfiguration( - agentConfiguration, - module, - pipeline, - executionPlan, - clusterRuntime, - pluginsRegistry); - - // we can auto-wire the "topic" configuration property - ConnectionImplementation connectionImplementation = - executionPlan.getConnectionImplementation(module, agentConfiguration.getOutput()); - if (connectionImplementation instanceof Topic topic) { - copy.put("topic", topic.topicName()); - } - return copy; - } -} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider index 6d9f62ec0..1483f35f3 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider @@ -1,4 +1,2 @@ ai.langstream.runtime.impl.testagents.TestGenericAgentProvider -ai.langstream.runtime.impl.testagents.TestGenericSinkAgentProvider -ai.langstream.runtime.impl.testagents.TestGenericSourceAgentProvider ai.langstream.runtime.impl.testagents.TestGenericComposableSinkAgentProvider \ No newline at end of file diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java index 79e3e1695..7b99c8b7a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java @@ -65,8 +65,8 @@ public void testFullLanguageProcessingPipeline() throws Exception { type: "text-normaliser" output: "output-topic" configuration: - makeLowercase: true - trimSpaces: true + make-lowercase: true + trim-spaces: true """); try (ApplicationRuntime applicationRuntime =