From 34c00ae803b27c2deb5e77b43660ae6559210ee5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 2 Oct 2023 12:59:39 +0200 Subject: [PATCH] Add doc for Gen AI toolkit agents (#508) --- .../ai/GenAIToolKitFunctionAgentProvider.java | 313 ++--------- .../steps/AIChatCompletionsConfiguration.java | 229 ++++++++ .../steps/AITextCompletionsConfiguration.java | 209 ++++++++ .../agents/ai/steps/CastConfiguration.java | 61 +++ .../ComputeAIEmbeddingsConfiguration.java | 124 +++++ .../agents/ai/steps/ComputeConfiguration.java | 88 ++++ .../agents/ai/steps/DropConfiguration.java | 37 ++ .../ai/steps/DropFieldsConfiguration.java | 14 +- .../agents/ai/steps/FlattenConfiguration.java | 53 ++ .../ai/steps/MergeKeyValueConfiguration.java | 35 ++ .../agents/ai/steps/QueryConfiguration.java | 110 ++++ .../ai/steps/UnwrapKeyValueConfiguration.java | 45 ++ .../runtime/impl/k8s/GenAIAgentsTest.java | 8 +- .../KubernetesClusterRuntimeDockerTest.java | 48 +- ...GenAIToolKitFunctionAgentProviderTest.java | 487 +++++++++++++++++- 15 files changed, 1554 insertions(+), 307 deletions(-) create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AIChatCompletionsConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AITextCompletionsConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/CastConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeAIEmbeddingsConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/FlattenConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/MergeKeyValueConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/QueryConfiguration.java create mode 100644 langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/UnwrapKeyValueConfiguration.java diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java index 2933cb36c..3d7228434 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java @@ -21,17 +21,25 @@ import ai.langstream.api.model.Module; import ai.langstream.api.model.Pipeline; import ai.langstream.api.model.Resource; -import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runtime.ComponentType; import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.impl.agents.ai.steps.AIChatCompletionsConfiguration; +import ai.langstream.impl.agents.ai.steps.AITextCompletionsConfiguration; import ai.langstream.impl.agents.ai.steps.BaseGenAIStepConfiguration; +import ai.langstream.impl.agents.ai.steps.CastConfiguration; +import ai.langstream.impl.agents.ai.steps.ComputeAIEmbeddingsConfiguration; +import ai.langstream.impl.agents.ai.steps.ComputeConfiguration; +import ai.langstream.impl.agents.ai.steps.DropConfiguration; import ai.langstream.impl.agents.ai.steps.DropFieldsConfiguration; +import ai.langstream.impl.agents.ai.steps.FlattenConfiguration; +import ai.langstream.impl.agents.ai.steps.MergeKeyValueConfiguration; +import ai.langstream.impl.agents.ai.steps.QueryConfiguration; +import ai.langstream.impl.agents.ai.steps.UnwrapKeyValueConfiguration; import ai.langstream.impl.common.AbstractAgentProvider; import ai.langstream.impl.uti.ClassConfigValidator; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -42,235 +50,6 @@ @Slf4j public class GenAIToolKitFunctionAgentProvider extends AbstractAgentProvider { - protected static final StepConfigurationInitializer UNWRAP_KEY_VALUE = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - optionalField( - step, agentConfiguration, originalConfiguration, "unwrapKey", null); - } - }; - protected static final StepConfigurationInitializer CAST = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - requiredField(step, agentConfiguration, originalConfiguration, "schema-type"); - optionalField(step, agentConfiguration, originalConfiguration, "part", null); - } - }; - protected static final StepConfigurationInitializer FLATTEN = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - optionalField( - step, agentConfiguration, originalConfiguration, "delimiter", null); - optionalField(step, agentConfiguration, originalConfiguration, "part", null); - } - }; - protected static final StepConfigurationInitializer COMPUTE = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - requiredField(step, agentConfiguration, originalConfiguration, "fields"); - } - }; - protected static final StepConfigurationInitializer COMPUTE_AI_EMBEDDINGS = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - optionalField( - step, - agentConfiguration, - originalConfiguration, - "model", - "text-embedding-ada-002"); - optionalField( - step, agentConfiguration, originalConfiguration, "batch-size", null); - optionalField( - step, agentConfiguration, originalConfiguration, "concurrency", null); - optionalField( - step, - agentConfiguration, - originalConfiguration, - "flush-interval", - null); - requiredField( - step, agentConfiguration, originalConfiguration, "embeddings-field"); - requiredField(step, agentConfiguration, originalConfiguration, "text"); - } - }; - protected static final StepConfigurationInitializer QUERY = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map originalConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - - // reference to datasource - String datasource = (String) originalConfiguration.remove("datasource"); - if (datasource == null) { - // error - requiredField( - step, agentConfiguration, originalConfiguration, "datasource"); - return; - } - dataSourceConfigurationGenerator.generateDataSourceConfiguration(datasource); - - requiredField(step, agentConfiguration, originalConfiguration, "fields"); - requiredField(step, agentConfiguration, originalConfiguration, "query"); - requiredField(step, agentConfiguration, originalConfiguration, "output-field"); - optionalField( - step, agentConfiguration, originalConfiguration, "only-first", null); - } - }; - protected static final StepConfigurationInitializer AI_CHAT_COMPLETIONS = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map newConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - requiredField(step, agentConfiguration, newConfiguration, "completion-field"); - optionalField(step, agentConfiguration, newConfiguration, "log-field", null); - optionalField( - step, - agentConfiguration, - newConfiguration, - "min-chunks-per-message", - null); - optionalField( - step, - agentConfiguration, - newConfiguration, - "stream-response-completion-field", - null); - String streamTopic = - optionalField( - step, - agentConfiguration, - newConfiguration, - "stream-to-topic", - null); - if (streamTopic != null) { - Map topicConfiguration = - topicConfigurationGenerator.generateTopicConfiguration(streamTopic); - newConfiguration.put("streamTopicConfiguration", topicConfiguration); - } - Object messages = - requiredField(step, agentConfiguration, newConfiguration, "messages"); - if (messages instanceof Collection collection) { - for (Object o : collection) { - if (o instanceof Map map) { - map.keySet() - .forEach( - k -> { - if (!"role".equals(k) && !"content".equals(k)) { - throw new IllegalArgumentException( - "messages must be a list of objects, [{role: 'user', " - + "content: 'template'}]"); - } - }); - } else { - throw new IllegalArgumentException( - "messages must be a list of objects, [{role: 'user', content: 'template'}]"); - } - } - } else { - throw new IllegalArgumentException( - "messages must be a list of objects: [{role: 'user', content: 'template'}]"); - } - requiredField(step, agentConfiguration, newConfiguration, "model"); - optionalField(step, agentConfiguration, newConfiguration, "temperature", null); - optionalField(step, agentConfiguration, newConfiguration, "top-p", null); - optionalField(step, agentConfiguration, newConfiguration, "logit-bias", null); - optionalField(step, agentConfiguration, newConfiguration, "stop", null); - optionalField(step, agentConfiguration, newConfiguration, "max-tokens", null); - optionalField( - step, agentConfiguration, newConfiguration, "presence-penalty", null); - optionalField( - step, agentConfiguration, newConfiguration, "frequency-penalty", null); - optionalField(step, agentConfiguration, newConfiguration, "user", null); - } - }; - protected static final StepConfigurationInitializer AI_TEXT_COMPLETIONS = - new StepConfigurationInitializer() { - @Override - public void generateSteps( - Map step, - Map newConfiguration, - AgentConfiguration agentConfiguration, - DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { - requiredField(step, agentConfiguration, newConfiguration, "completion-field"); - optionalField(step, agentConfiguration, newConfiguration, "log-field", null); - optionalField( - step, - agentConfiguration, - newConfiguration, - "min-chunks-per-message", - null); - optionalField( - step, - agentConfiguration, - newConfiguration, - "stream-response-completion-field", - null); - String streamTopic = - optionalField( - step, - agentConfiguration, - newConfiguration, - "stream-to-topic", - null); - if (streamTopic != null) { - Map topicConfiguration = - topicConfigurationGenerator.generateTopicConfiguration(streamTopic); - newConfiguration.put("streamTopicConfiguration", topicConfiguration); - } - requiredField(step, agentConfiguration, newConfiguration, "prompt"); - requiredField(step, agentConfiguration, newConfiguration, "model"); - optionalField(step, agentConfiguration, newConfiguration, "temperature", null); - optionalField(step, agentConfiguration, newConfiguration, "top-p", null); - optionalField(step, agentConfiguration, newConfiguration, "logit-bias", null); - optionalField(step, agentConfiguration, newConfiguration, "stop", null); - optionalField(step, agentConfiguration, newConfiguration, "max-tokens", null); - optionalField( - step, agentConfiguration, newConfiguration, "presence-penalty", null); - optionalField( - step, agentConfiguration, newConfiguration, "frequency-penalty", null); - optionalField(step, agentConfiguration, newConfiguration, "user", null); - } - }; private static final Map STEP_TYPES; protected static final String SERVICE_VERTEX = "vertex-configuration"; protected static final String SERVICE_HUGGING_FACE = "hugging-face-configuration"; @@ -289,16 +68,16 @@ public Class getAgentConfigurationModelClass() { final Map steps = new HashMap<>(); steps.put("drop-fields", DropFieldsConfiguration.STEP); - steps.put("merge-key-value", baseConfig); - steps.put("unwrap-key-value", UNWRAP_KEY_VALUE); - steps.put("cast", CAST); - steps.put("flatten", FLATTEN); - steps.put("drop", baseConfig); - steps.put("compute", COMPUTE); - steps.put("compute-ai-embeddings", COMPUTE_AI_EMBEDDINGS); - steps.put("query", QUERY); - steps.put("ai-chat-completions", AI_CHAT_COMPLETIONS); - steps.put("ai-text-completions", AI_TEXT_COMPLETIONS); + steps.put("merge-key-value", MergeKeyValueConfiguration.STEP); + steps.put("unwrap-key-value", UnwrapKeyValueConfiguration.STEP); + steps.put("cast", CastConfiguration.STEP); + steps.put("flatten", FlattenConfiguration.STEP); + steps.put("drop", DropConfiguration.STEP); + steps.put("compute", ComputeConfiguration.STEP); + steps.put("compute-ai-embeddings", ComputeAIEmbeddingsConfiguration.STEP); + steps.put("query", QueryConfiguration.STEP); + steps.put("ai-chat-completions", AIChatCompletionsConfiguration.STEP); + steps.put("ai-text-completions", AITextCompletionsConfiguration.STEP); STEP_TYPES = Collections.unmodifiableMap(steps); } @@ -311,8 +90,8 @@ protected final ComponentType getComponentType(AgentConfiguration agentConfigura return ComponentType.PROCESSOR; } - interface TopicConfigurationGenerator { - Map generateTopicConfiguration(String topicName); + public interface TopicConfigurationGenerator { + void generateTopicConfiguration(String topicName); } public interface StepConfigurationInitializer { @@ -325,7 +104,8 @@ default void generateSteps( Map originalConfiguration, AgentConfiguration agentConfiguration, DataSourceConfigurationGenerator dataSourceConfigurationGenerator, - TopicConfigurationGenerator topicConfigurationGenerator) { + TopicConfigurationGenerator topicConfigurationGenerator, + AIServiceConfigurationGenerator aiServiceConfigurationGenerator) { final Class modelClazz = getAgentConfigurationModelClass(); ClassConfigValidator.validateAgentModelFromClass( agentConfiguration, modelClazz, originalConfiguration); @@ -338,8 +118,9 @@ default void generateSteps( if (value == null && !e.getValue().isRequired() && e.getValue().getDefaultValue() != null) { - step.put(e.getKey(), e.getValue().getDefaultValue()); - } else { + value = e.getValue().getDefaultValue(); + } + if (value != null) { step.put(e.getKey(), value); } }); @@ -361,9 +142,6 @@ protected void generateSteps( // we are mapping the original name to the ai-tools function name step.put("type", agentConfiguration.getType()); - // on every step you can put a "when" clause - optionalField(step, agentConfiguration, originalConfiguration, "when", null); - DataSourceConfigurationGenerator dataSourceConfigurationInjector = (resourceId) -> generateDataSourceConfiguration( @@ -375,10 +153,19 @@ protected void generateSteps( TopicConfigurationGenerator topicConfigurationGenerator = (topicName) -> { - TopicDefinition topicDefinition = module.resolveTopic(topicName); - return topicDefinition.getConfig(); + // only resolve the topic definition to verify topic is declared + module.resolveTopic(topicName); }; + AIServiceConfigurationGenerator aiServiceConfigurationGenerator = + (resourceId) -> + generateAIServiceConfiguration( + resourceId, + application, + configuration, + computeClusterRuntime, + pluginsRegistry); + STEP_TYPES .get(agentConfiguration.getType()) .generateSteps( @@ -386,28 +173,31 @@ protected void generateSteps( originalConfiguration, agentConfiguration, dataSourceConfigurationInjector, - topicConfigurationGenerator); + topicConfigurationGenerator, + aiServiceConfigurationGenerator); step.remove("composable"); steps.add(step); } - interface DataSourceConfigurationGenerator { + public interface DataSourceConfigurationGenerator { void generateDataSourceConfiguration(String resourceId); } - private void generateAIProvidersConfiguration( + public interface AIServiceConfigurationGenerator { + void generateAIServiceConfiguration(String resourceId); + } + + private void generateAIServiceConfiguration( + String resourceId, Application applicationInstance, - Map originalConfiguration, Map configuration, ComputeClusterRuntime clusterRuntime, PluginsRegistry pluginsRegistry) { - // let the user force the provider or detect it automatically - String resourceId = (String) originalConfiguration.get("ai-service"); if (resourceId != null) { Resource resource = applicationInstance.getResources().get(resourceId); - log.info("Generating ai provider configuration for {}", resourceId); + log.info("Generating ai service configuration for {}", resourceId); if (resource != null) { if (!AI_SERVICES.contains(resource.type())) { throw new IllegalArgumentException( @@ -486,13 +276,6 @@ protected Map computeAgentConfiguration( pluginsRegistry); Map configuration = new HashMap<>(); - generateAIProvidersConfiguration( - executionPlan.getApplication(), - originalConfiguration, - configuration, - clusterRuntime, - pluginsRegistry); - generateSteps( module, originalConfiguration, diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AIChatCompletionsConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AIChatCompletionsConfiguration.java new file mode 100644 index 000000000..e2ee6da5c --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AIChatCompletionsConfiguration.java @@ -0,0 +1,229 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import lombok.Data; + +@AgentConfig( + name = "Compute chat completions", + description = + """ + Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field. + """) +@Data +public class AIChatCompletionsConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return AIChatCompletionsConfiguration.class; + } + + @Override + public void generateSteps( + Map step, + Map originalConfiguration, + AgentConfiguration agentConfiguration, + GenAIToolKitFunctionAgentProvider.DataSourceConfigurationGenerator + dataSourceConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.TopicConfigurationGenerator + topicConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.AIServiceConfigurationGenerator + aiServiceConfigurationGenerator) { + GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer.super + .generateSteps( + step, + originalConfiguration, + agentConfiguration, + dataSourceConfigurationGenerator, + topicConfigurationGenerator, + aiServiceConfigurationGenerator); + + final String streamTopic = (String) step.get("stream-to-topic"); + if (streamTopic != null) { + topicConfigurationGenerator.generateTopicConfiguration(streamTopic); + } + + aiServiceConfigurationGenerator.generateAIServiceConfiguration( + (String) step.remove("ai-service")); + } + }; + + @Data + public static class ChatMessage { + @ConfigProperty( + description = + """ + Role of the message. The role is used to identify the speaker in the chat. + """, + required = true) + private String role; + + @ConfigProperty( + description = + """ + Content of the message. You can use the Mustache syntax. + """, + required = true) + private String content; + } + + @ConfigProperty( + description = + """ + The model to use for chat completions. The model must be available in the AI Service. + """, + required = true) + private String model; + + @ConfigProperty( + description = + """ + Messages to use for chat completions. You can use the Mustache syntax. + """, + required = true) + private List messages; + + @ConfigProperty( + description = + """ + Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead. + """) + @JsonProperty(value = "stream-to-topic") + private String streamToTopic; + + @ConfigProperty( + description = + """ + Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + """) + @JsonProperty(value = "stream-response-completion-field") + private String streamResponseCompletionField; + + @ConfigProperty( + description = + """ + Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. + The chunks are sent in the order they are received from the AI Service. + To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value. + """, + defaultValue = "20") + @JsonProperty(value = "min-chunks-per-message") + private int minChunksPerMessage = 20; + + @ConfigProperty( + description = + """ + Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + """) + @JsonProperty(value = "completion-field") + private String completionField; + + @ConfigProperty( + description = + """ + Enable streaming of the results. Use in conjunction with the stream-to-topic parameter. + """, + defaultValue = "true") + private boolean stream = true; + + @ConfigProperty( + description = + """ + Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + The log contains useful information for debugging the completion prompts. + """) + @JsonProperty(value = "log-field") + private String logField; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "max-tokens") + private Integer maxTokens; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + private Double temperature; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "top-p") + private Double topP; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "logit-bias") + private Map logitBias; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "user") + private String user; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "stop") + private List stop; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "presence-penalty") + private Double presencePenalty; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "frequency-penalty") + private Double frequencyPenalty; + + @ConfigProperty( + description = + """ + In case of multiple AI services configured, specify the id of the AI service to use. + """) + @JsonProperty(value = "ai-service") + private String aiService; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AITextCompletionsConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AITextCompletionsConfiguration.java new file mode 100644 index 000000000..7f6264372 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/AITextCompletionsConfiguration.java @@ -0,0 +1,209 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import lombok.Data; + +@AgentConfig( + name = "Compute text completions", + description = + """ + Sends the text to the AI Service to compute text completions. The result is stored in the specified field. + """) +@Data +public class AITextCompletionsConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return AITextCompletionsConfiguration.class; + } + + @Override + public void generateSteps( + Map step, + Map originalConfiguration, + AgentConfiguration agentConfiguration, + GenAIToolKitFunctionAgentProvider.DataSourceConfigurationGenerator + dataSourceConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.TopicConfigurationGenerator + topicConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.AIServiceConfigurationGenerator + aiServiceConfigurationGenerator) { + GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer.super + .generateSteps( + step, + originalConfiguration, + agentConfiguration, + dataSourceConfigurationGenerator, + topicConfigurationGenerator, + aiServiceConfigurationGenerator); + + final String streamTopic = (String) step.get("stream-to-topic"); + if (streamTopic != null) { + topicConfigurationGenerator.generateTopicConfiguration(streamTopic); + } + aiServiceConfigurationGenerator.generateAIServiceConfiguration( + (String) step.remove("ai-service")); + } + }; + + @ConfigProperty( + description = + """ + The model to use for text completions. The model must be available in the AI Service. + """, + required = true) + private String model; + + @ConfigProperty( + description = + """ + Prompt to use for text completions. You can use the Mustache syntax. + """, + required = true) + private List prompt; + + @ConfigProperty( + description = + """ + Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead. + """) + @JsonProperty(value = "stream-to-topic") + private String streamToTopic; + + @ConfigProperty( + description = + """ + Field to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + """) + @JsonProperty(value = "stream-response-completion-field") + private String streamResponseCompletionField; + + @ConfigProperty( + description = + """ + Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available. + The chunks are sent in the order they are received from the AI Service. + To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value. + """, + defaultValue = "20") + @JsonProperty(value = "min-chunks-per-message") + private int minChunksPerMessage = 20; + + @ConfigProperty( + description = + """ + Field to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + """) + @JsonProperty(value = "completion-field") + private String completionField; + + @ConfigProperty( + description = + """ + Enable streaming of the results. Use in conjunction with the stream-to-topic parameter. + """, + defaultValue = "true") + private boolean stream = true; + + @ConfigProperty( + description = + """ + Field to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value." to write the result in a specific field. + The log contains useful information for debugging the completion prompts. + """) + @JsonProperty(value = "log-field") + private String logField; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "max-tokens") + private Integer maxTokens; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + private Double temperature; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "top-p") + private Double topP; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "logit-bias") + private Map logitBias; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "user") + private String user; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "stop") + private List stop; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "presence-penalty") + private Double presencePenalty; + + @ConfigProperty( + description = + """ + Parameter for the completion request. The parameters are passed to the AI Service as is. + """) + @JsonProperty(value = "frequency-penalty") + private Double frequencyPenalty; + + @ConfigProperty( + description = + """ + In case of multiple AI services configured, specify the id of the AI service to use. + """) + @JsonProperty(value = "ai-service") + private String aiService; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/CastConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/CastConfiguration.java new file mode 100644 index 000000000..d7cb3bdad --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/CastConfiguration.java @@ -0,0 +1,61 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@AgentConfig( + name = "Cast record to another schema", + description = + """ + Transforms the data to a target compatible schema. + Some step operations like cast or compute involve conversions from a type to another. When this happens the rules are: + - timestamp, date and time related object conversions assume UTC time zone if it is not explicit. + - date and time related object conversions to/from STRING use the RFC3339 format. + - timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z). + - date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01). + - time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00). + """) +@Data +public class CastConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return CastConfiguration.class; + } + }; + + @ConfigProperty( + description = + """ + The target schema type. + """, + required = true) + @JsonProperty("schema-type") + private String schemaType; + + @ConfigProperty( + description = + """ + When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value. + """) + private String part; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeAIEmbeddingsConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeAIEmbeddingsConfiguration.java new file mode 100644 index 000000000..80906be02 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeAIEmbeddingsConfiguration.java @@ -0,0 +1,124 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import lombok.Data; + +@AgentConfig( + name = "Compute embeddings of the record", + description = + """ + Compute embeddings of the record. The embeddings are stored in the record under a specific field. + """) +@Data +public class ComputeAIEmbeddingsConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return ComputeAIEmbeddingsConfiguration.class; + } + + @Override + public void generateSteps( + Map step, + Map originalConfiguration, + AgentConfiguration agentConfiguration, + GenAIToolKitFunctionAgentProvider.DataSourceConfigurationGenerator + dataSourceConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.TopicConfigurationGenerator + topicConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.AIServiceConfigurationGenerator + aiServiceConfigurationGenerator) { + GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer.super + .generateSteps( + step, + originalConfiguration, + agentConfiguration, + dataSourceConfigurationGenerator, + topicConfigurationGenerator, + aiServiceConfigurationGenerator); + aiServiceConfigurationGenerator.generateAIServiceConfiguration( + (String) step.remove("ai-service")); + } + }; + + @ConfigProperty( + description = + """ + Model to use for the embeddings. The model must be available in the configured AI Service. + """, + defaultValue = "text-embedding-ada-002") + private String model = "text-embedding-ada-002"; + + @ConfigProperty( + description = + """ + Text to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example: + text: "{{{ value.field1 }}} {{{ value.field2 }}}" + """, + required = true) + private String text; + + @ConfigProperty( + description = + """ + Field where to store the embeddings. + """, + required = true) + @JsonProperty("embeddings-field") + private String embeddingsField; + + @ConfigProperty( + description = + """ + Batch size for submitting the embeddings requests. + """, + defaultValue = "10") + @JsonProperty("batch-size") + private int batchSize = 10; + + @ConfigProperty( + description = + """ + Max number of concurrent requests to the AI Service. + """, + defaultValue = "4") + private int concurrency = 4; + + @ConfigProperty( + description = + """ + Flushing is disabled by default in order to avoid latency spikes. + You should enable this feature in the case of background processing.""", + defaultValue = "0") + @JsonProperty("flush-interval") + private int flushInterval; + + @ConfigProperty( + description = + """ + In case of multiple AI services configured, specify the id of the AI service to use. + """) + @JsonProperty(value = "ai-service") + private String aiService; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeConfiguration.java new file mode 100644 index 000000000..3cbef2153 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/ComputeConfiguration.java @@ -0,0 +1,88 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import java.util.List; +import lombok.Data; + +@AgentConfig( + name = "Compute values from the record", + description = + """ + Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten. + """) +@Data +public class ComputeConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return ComputeConfiguration.class; + } + }; + + @Data + public static class Field { + @ConfigProperty( + description = + """ + The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message. + In addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.]. + Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.""", + required = true) + private String name; + + @ConfigProperty( + description = + """ + It is evaluated at runtime and the result of the evaluation is assigned to the field. + Refer to the language expression documentation for more information on the expression syntax. + """, + required = true) + private String expression; + + @ConfigProperty( + description = + """ + The type of the computed field. This + will translate to the schema type of the new field in the transformed message. + The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT. + The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used. + For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation. + """, + required = true) + private String type; + + @ConfigProperty( + description = + """ + If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression. + """, + defaultValue = "false") + private boolean optional; + } + + @ConfigProperty( + description = + """ + An array of objects describing how to calculate the field values + """, + required = true) + private List fields; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropConfiguration.java new file mode 100644 index 000000000..7dbbf069f --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropConfiguration.java @@ -0,0 +1,37 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import lombok.Data; + +@AgentConfig( + name = "Drop the record", + description = + """ + Drops the record from further processing. Use in conjunction with when to selectively drop records. + """) +@Data +public class DropConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return DropConfiguration.class; + } + }; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropFieldsConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropFieldsConfiguration.java index 67195aa11..eeb011a31 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropFieldsConfiguration.java +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/DropFieldsConfiguration.java @@ -21,7 +21,11 @@ import java.util.List; import lombok.Data; -@AgentConfig(name = "Drop fields from the input record") +@AgentConfig( + name = "Drop fields", + description = """ + Drops the record fields. + """) @Data public class DropFieldsConfiguration extends BaseGenAIStepConfiguration { public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = @@ -35,15 +39,15 @@ public Class getAgentConfigurationModelClass() { @ConfigProperty( description = """ - Fields to drop from the input record. - """, + Fields to drop from the input record. + """, required = true) private List fields; @ConfigProperty( description = """ - Part to drop. (value or key) - """) + Part to drop. (value or key) + """) private String part; } diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/FlattenConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/FlattenConfiguration.java new file mode 100644 index 000000000..f591eec1d --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/FlattenConfiguration.java @@ -0,0 +1,53 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import lombok.Data; + +@AgentConfig( + name = "Flatten record fields", + description = + """ + Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names. + """) +@Data +public class FlattenConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return FlattenConfiguration.class; + } + }; + + @ConfigProperty( + description = + """ + The delimiter to use when concatenating the field names. + """, + defaultValue = "_") + private String delimiter; + + @ConfigProperty( + description = + """ + When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value. + """) + private String part; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/MergeKeyValueConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/MergeKeyValueConfiguration.java new file mode 100644 index 000000000..0b74310e5 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/MergeKeyValueConfiguration.java @@ -0,0 +1,35 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import lombok.Data; + +@AgentConfig( + name = "Merge key-value format", + description = + "Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.") +@Data +public class MergeKeyValueConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return MergeKeyValueConfiguration.class; + } + }; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/QueryConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/QueryConfiguration.java new file mode 100644 index 000000000..bb73724a5 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/QueryConfiguration.java @@ -0,0 +1,110 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import lombok.Data; + +@AgentConfig( + name = "Query", + description = + """ + Perform a vector search or simple query against a datasource. + """) +@Data +public class QueryConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return QueryConfiguration.class; + } + + @Override + public void generateSteps( + Map step, + Map originalConfiguration, + AgentConfiguration agentConfiguration, + GenAIToolKitFunctionAgentProvider.DataSourceConfigurationGenerator + dataSourceConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.TopicConfigurationGenerator + topicConfigurationGenerator, + GenAIToolKitFunctionAgentProvider.AIServiceConfigurationGenerator + aiServiceConfigurationGenerator) { + GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer.super + .generateSteps( + step, + originalConfiguration, + agentConfiguration, + dataSourceConfigurationGenerator, + topicConfigurationGenerator, + aiServiceConfigurationGenerator); + String datasource = (String) step.remove("datasource"); + if (datasource == null) { + throw new IllegalStateException( + "datasource is required but this exception should have been raised before ?"); + } + dataSourceConfigurationGenerator.generateDataSourceConfiguration(datasource); + } + }; + + @ConfigProperty( + description = + """ + The query to use to extract the data. + """, + required = true) + private String query; + + @ConfigProperty( + description = + """ + Fields of the record to use as input parameters for the query. + """) + private List fields; + + @ConfigProperty( + description = + """ + The name of the field to use to store the query result. + """, + required = true) + @JsonProperty("output-field") + private String outputField; + + @ConfigProperty( + description = + """ + If true, only the first result of the query is stored in the output field. + """, + defaultValue = "false") + @JsonProperty("only-first") + private boolean onlyFirst; + + @ConfigProperty( + description = + """ + Reference to a datasource id configured in the application. + """, + required = true) + private String datasource; +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/UnwrapKeyValueConfiguration.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/UnwrapKeyValueConfiguration.java new file mode 100644 index 000000000..2a64a9031 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/steps/UnwrapKeyValueConfiguration.java @@ -0,0 +1,45 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.agents.ai.steps; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.impl.agents.ai.GenAIToolKitFunctionAgentProvider; +import lombok.Data; + +@AgentConfig( + name = "Unwrap key-value format", + description = + "If the record value is in KeyValue format, extracts the" + + " KeyValue's key or value and make it the record value.") +@Data +public class UnwrapKeyValueConfiguration extends BaseGenAIStepConfiguration { + public static final GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer STEP = + new GenAIToolKitFunctionAgentProvider.StepConfigurationInitializer() { + @Override + public Class getAgentConfigurationModelClass() { + return UnwrapKeyValueConfiguration.class; + } + }; + + @ConfigProperty( + description = + """ + Whether to unwrap the key instead of the value. + """, + defaultValue = "false") + private boolean unwrapKey; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/GenAIAgentsTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/GenAIAgentsTest.java index 61a7ef13d..7ded63dba 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/GenAIAgentsTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/GenAIAgentsTest.java @@ -344,13 +344,7 @@ public void testDontMergeGenAIToolKitAgentsWithExplicitLogTopic() throws Excepti DefaultAgentNode step = (DefaultAgentNode) agentImplementation; Map configuration = step.getConfiguration(); log.info("Configuration: {}", configuration); - Map openAIConfiguration = - (Map) configuration.get("openai"); - log.info("openAIConfiguration: {}", openAIConfiguration); - assertEquals("http://something", openAIConfiguration.get("url")); - assertEquals("xxcxcxc", openAIConfiguration.get("access-key")); - assertEquals("azure", openAIConfiguration.get("provider")); - + assertNull(configuration.get("openai")); List> steps = (List>) configuration.get("steps"); assertEquals(1, steps.size()); diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java index 9d85f4aca..cefb6a027 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/KubernetesClusterRuntimeDockerTest.java @@ -34,6 +34,7 @@ import ai.langstream.api.webservice.application.ApplicationCodeInfo; import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; import ai.langstream.deployer.k8s.api.crds.agents.AgentCustomResource; +import ai.langstream.deployer.k8s.util.SerializationUtil; import ai.langstream.impl.common.DefaultAgentNode; import ai.langstream.impl.deploy.ApplicationDeployer; import ai.langstream.impl.k8s.tests.KubeTestServer; @@ -203,28 +204,33 @@ public void testOpenAIComputeEmbeddingFunction() throws Exception { defaultErrorsAsMap.put("onFailure", "fail"); defaultErrorsAsMap.put("retries", 0); assertEquals( - new AgentSpec( - AgentSpec.ComponentType.PROCESSOR, - tenant, - "step1", - "app", - "compute-ai-embeddings", - Map.of( - "steps", - List.of( + SerializationUtil.prettyPrintJson( + new AgentSpec( + AgentSpec.ComponentType.PROCESSOR, + tenant, + "step1", + "app", + "compute-ai-embeddings", + Map.of( + "steps", + List.of( + Map.of( + "type", "compute-ai-embeddings", + "model", "text-embedding-ada-002", + "embeddings-field", + "value.embeddings", + "text", + "{{ value.name }} {{ value.description }}", + "batch-size", "10", + "concurrency", "4", + "flush-interval", "0")), + "openai", Map.of( - "type", "compute-ai-embeddings", - "model", "text-embedding-ada-002", - "embeddings-field", "value.embeddings", - "text", - "{{ value.name }} {{ value.description }}")), - "openai", - Map.of( - "url", "http://something", - "access-key", "xxcxcxc", - "provider", "azure")), - defaultErrorsAsMap), - runtimePodConfiguration.agent()); + "url", "http://something", + "access-key", "xxcxcxc", + "provider", "azure")), + defaultErrorsAsMap)), + SerializationUtil.prettyPrintJson(runtimePodConfiguration.agent())); assertEquals( new StreamingCluster( "kafka", diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java index 29d300aec..ea432099c 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java @@ -128,12 +128,382 @@ public void testStepsDoc() { Assertions.assertEquals( """ { - "ai-chat-completions" : { }, - "ai-text-completions" : { }, - "cast" : { }, - "compute" : { }, - "compute-ai-embeddings" : { }, + "ai-chat-completions" : { + "name" : "Compute chat completions", + "description" : "Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field.", + "properties" : { + "ai-service" : { + "description" : "In case of multiple AI services configured, specify the id of the AI service to use.", + "required" : false, + "type" : "string" + }, + "completion-field" : { + "description" : "Field to use to store the completion results in the output topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.", + "required" : false, + "type" : "string" + }, + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "frequency-penalty" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "log-field" : { + "description" : "Field to use to store the log of the completion results in the output topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.\\nThe log contains useful information for debugging the completion prompts.", + "required" : false, + "type" : "string" + }, + "logit-bias" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "object" + }, + "max-tokens" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "integer" + }, + "messages" : { + "description" : "Messages to use for chat completions. You can use the Mustache syntax.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Messages to use for chat completions. You can use the Mustache syntax.", + "required" : true, + "type" : "object", + "properties" : { + "content" : { + "description" : "Content of the message. You can use the Mustache syntax.", + "required" : true, + "type" : "string" + }, + "role" : { + "description" : "Role of the message. The role is used to identify the speaker in the chat.", + "required" : true, + "type" : "string" + } + } + } + }, + "min-chunks-per-message" : { + "description" : "Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available.\\nThe chunks are sent in the order they are received from the AI Service.\\nTo improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.", + "required" : false, + "type" : "integer", + "defaultValue" : "20" + }, + "model" : { + "description" : "The model to use for chat completions. The model must be available in the AI Service.", + "required" : true, + "type" : "string" + }, + "presence-penalty" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "stop" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "string" + } + }, + "stream" : { + "description" : "Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "stream-response-completion-field" : { + "description" : "Field to use to store the completion results in the stream-to-topic topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.", + "required" : false, + "type" : "string" + }, + "stream-to-topic" : { + "description" : "Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.", + "required" : false, + "type" : "string" + }, + "temperature" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "top-p" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "user" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "ai-text-completions" : { + "name" : "Compute text completions", + "description" : "Sends the text to the AI Service to compute text completions. The result is stored in the specified field.", + "properties" : { + "ai-service" : { + "description" : "In case of multiple AI services configured, specify the id of the AI service to use.", + "required" : false, + "type" : "string" + }, + "completion-field" : { + "description" : "Field to use to store the completion results in the output topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.", + "required" : false, + "type" : "string" + }, + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "frequency-penalty" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "log-field" : { + "description" : "Field to use to store the log of the completion results in the output topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.\\nThe log contains useful information for debugging the completion prompts.", + "required" : false, + "type" : "string" + }, + "logit-bias" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "object" + }, + "max-tokens" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "integer" + }, + "min-chunks-per-message" : { + "description" : "Minimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available.\\nThe chunks are sent in the order they are received from the AI Service.\\nTo improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.", + "required" : false, + "type" : "integer", + "defaultValue" : "20" + }, + "model" : { + "description" : "The model to use for text completions. The model must be available in the AI Service.", + "required" : true, + "type" : "string" + }, + "presence-penalty" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "prompt" : { + "description" : "Prompt to use for text completions. You can use the Mustache syntax.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Prompt to use for text completions. You can use the Mustache syntax.", + "required" : true, + "type" : "string" + } + }, + "stop" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "string" + } + }, + "stream" : { + "description" : "Enable streaming of the results. Use in conjunction with the stream-to-topic parameter.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "stream-response-completion-field" : { + "description" : "Field to use to store the completion results in the stream-to-topic topic. Use \\"value\\" to write the result without a structured schema. Use \\"value.\\" to write the result in a specific field.", + "required" : false, + "type" : "string" + }, + "stream-to-topic" : { + "description" : "Enable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.", + "required" : false, + "type" : "string" + }, + "temperature" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "top-p" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "number" + }, + "user" : { + "description" : "Parameter for the completion request. The parameters are passed to the AI Service as is.", + "required" : false, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "cast" : { + "name" : "Cast record to another schema", + "description" : "Transforms the data to a target compatible schema.\\nSome step operations like cast or compute involve conversions from a type to another. When this happens the rules are:\\n - timestamp, date and time related object conversions assume UTC time zone if it is not explicit.\\n - date and time related object conversions to/from STRING use the RFC3339 format.\\n - timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z).\\n - date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01).\\n - time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "part" : { + "description" : "When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.", + "required" : false, + "type" : "string" + }, + "schema-type" : { + "description" : "The target schema type.", + "required" : true, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "compute" : { + "name" : "Compute values from the record", + "description" : "Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "fields" : { + "description" : "An array of objects describing how to calculate the field values", + "required" : true, + "type" : "array", + "items" : { + "description" : "An array of objects describing how to calculate the field values", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "It is evaluated at runtime and the result of the evaluation is assigned to the field.\\nRefer to the language expression documentation for more information on the expression syntax.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message.\\nIn addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.].\\nPlease note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.", + "required" : true, + "type" : "string" + }, + "optional" : { + "description" : "If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + }, + "type" : { + "description" : "The type of the computed field. This\\n will translate to the schema type of the new field in the transformed message.\\n The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT.\\n The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used.\\n For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation.", + "required" : true, + "type" : "string" + } + } + } + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "compute-ai-embeddings" : { + "name" : "Compute embeddings of the record", + "description" : "Compute embeddings of the record. The embeddings are stored in the record under a specific field.", + "properties" : { + "ai-service" : { + "description" : "In case of multiple AI services configured, specify the id of the AI service to use.", + "required" : false, + "type" : "string" + }, + "batch-size" : { + "description" : "Batch size for submitting the embeddings requests.", + "required" : false, + "type" : "integer", + "defaultValue" : "10" + }, + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "concurrency" : { + "description" : "Max number of concurrent requests to the AI Service.", + "required" : false, + "type" : "integer", + "defaultValue" : "4" + }, + "embeddings-field" : { + "description" : "Field where to store the embeddings.", + "required" : true, + "type" : "string" + }, + "flush-interval" : { + "description" : "Flushing is disabled by default in order to avoid latency spikes.\\nYou should enable this feature in the case of background processing.", + "required" : false, + "type" : "integer", + "defaultValue" : "0" + }, + "model" : { + "description" : "Model to use for the embeddings. The model must be available in the configured AI Service.", + "required" : false, + "type" : "string", + "defaultValue" : "text-embedding-ada-002" + }, + "text" : { + "description" : "Text to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example:\\ntext: \\"{{{ value.field1 }}} {{{ value.field2 }}}\\"", + "required" : true, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, "drop" : { + "name" : "Drop the record", + "description" : "Drops the record from further processing. Use in conjunction with when to selectively drop records.", "properties" : { "composable" : { "description" : "Whether this step can be composed with other steps.", @@ -149,7 +519,8 @@ public void testStepsDoc() { } }, "drop-fields" : { - "name" : "Drop fields from the input record", + "name" : "Drop fields", + "description" : "Drops the record fields.", "properties" : { "composable" : { "description" : "Whether this step can be composed with other steps.", @@ -179,8 +550,37 @@ public void testStepsDoc() { } } }, - "flatten" : { }, + "flatten" : { + "name" : "Flatten record fields", + "description" : "Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "delimiter" : { + "description" : "The delimiter to use when concatenating the field names.", + "required" : false, + "type" : "string", + "defaultValue" : "_" + }, + "part" : { + "description" : "When used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.", + "required" : false, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, "merge-key-value" : { + "name" : "Merge key-value format", + "description" : "Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.", "properties" : { "composable" : { "description" : "Whether this step can be composed with other steps.", @@ -195,8 +595,77 @@ public void testStepsDoc() { } } }, - "query" : { }, - "unwrap-key-value" : { } + "query" : { + "name" : "Query", + "description" : "Perform a vector search or simple query against a datasource.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "datasource" : { + "description" : "Reference to a datasource id configured in the application.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "string" + } + }, + "only-first" : { + "description" : "If true, only the first result of the query is stored in the output field.", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + }, + "output-field" : { + "description" : "The name of the field to use to store the query result.", + "required" : true, + "type" : "string" + }, + "query" : { + "description" : "The query to use to extract the data.", + "required" : true, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "unwrap-key-value" : { + "name" : "Unwrap key-value format", + "description" : "If the record value is in KeyValue format, extracts the KeyValue's key or value and make it the record value.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "unwrapKey" : { + "description" : "Whether to unwrap the key instead of the value.", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + } }""", SerializationUtil.prettyPrintJson(model)); }