Skip to content

Commit

Permalink
Document Kafka Connect, text-processing, vector, re-rank agents (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Oct 2, 2023
1 parent 34c00ae commit 8b9a63d
Show file tree
Hide file tree
Showing 23 changed files with 952 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private void initTextSplitter(Map<String, Object> configuration) {
newLengthFunction = String::length;
break;
default:
newLengthFunction = new TikTokLengthFunction(lengthFunctionName);
newLengthFunction = new TiktokenLengthFunction(lengthFunctionName);
}
newTextSplitter =
new RecursiveCharacterTextSplitter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.knuddels.jtokkit.api.EncodingRegistry;
import com.knuddels.jtokkit.api.EncodingType;

public class TikTokLengthFunction implements LengthFunction {
/** Java implementation of <a href="https://github.com/openai/tiktoken">tiktoken</a>. */
public class TiktokenLengthFunction implements LengthFunction {

private static final EncodingRegistry REGISTRY = Encodings.newDefaultEncodingRegistry();
private final EncodingType encodingType;

public TikTokLengthFunction(String encoding) {
public TiktokenLengthFunction(String encoding) {
encodingType =
EncodingType.fromName(encoding)
.orElseThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.alibaba.fastjson.JSONObject;
import com.datastax.oss.streaming.ai.TransformContext;
import com.datastax.oss.streaming.ai.jstl.JstlEvaluator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.CollectionSchema;
import io.milvus.grpc.DescribeCollectionResponse;
Expand All @@ -45,9 +43,6 @@
@Slf4j
public class MilvusWriter implements VectorDatabaseWriterProvider {

private static final ObjectMapper MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@Override
public boolean supports(Map<String, Object> dataSourceConfig) {
return "milvus".equals(dataSourceConfig.get("service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected Class getAgentConfigModelClass(String type) {
return null;
}

protected boolean isAgentConfigModelAllowUnknownProperties(String type) {
return false;
}

protected AgentNodeMetadata computeAgentMetadata(
AgentConfiguration agentConfiguration,
ExecutionPlan physicalApplicationInstance,
Expand All @@ -133,7 +137,11 @@ protected Map<String, Object> computeAgentConfiguration(
final String type = agentConfiguration.getType();
final Class modelClass = getAgentConfigModelClass(type);
if (modelClass != null) {
ClassConfigValidator.validateAgentModelFromClass(agentConfiguration, modelClass);
ClassConfigValidator.validateAgentModelFromClass(
agentConfiguration,
modelClass,
agentConfiguration.getConfiguration(),
isAgentConfigModelAllowUnknownProperties(type));
}
return new HashMap<>(agentConfiguration.getConfiguration());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package ai.langstream.impl.noop;

import ai.langstream.api.doc.AgentConfigurationModel;
import ai.langstream.api.model.AgentConfiguration;
import ai.langstream.api.runtime.ComponentType;
import ai.langstream.impl.common.AbstractAgentProvider;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class NoOpAgentNodeProvider extends AbstractAgentProvider {
Expand All @@ -31,4 +33,9 @@ public NoOpAgentNodeProvider() {
protected ComponentType getComponentType(AgentConfiguration agentConfiguration) {
return ComponentType.PROCESSOR;
}

@Override
public Map<String, AgentConfigurationModel> generateSupportedTypesDocumentation() {
return Map.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,26 @@ public static void validateAgentModelFromClass(
@SneakyThrows
public static void validateAgentModelFromClass(
AgentConfiguration agentConfiguration, Class modelClazz, Map<String, Object> asMap) {
validateAgentModelFromClass(agentConfiguration, modelClazz, asMap, false);
}

@SneakyThrows
public static void validateAgentModelFromClass(
AgentConfiguration agentConfiguration,
Class modelClazz,
Map<String, Object> asMap,
boolean allowUnknownProperties) {
asMap = validatorMapper.readValue(validatorMapper.writeValueAsBytes(asMap), Map.class);

final AgentConfigurationModel agentConfigurationModel =
generateAgentModelFromClass(modelClazz);

validateProperties(
agentConfiguration, null, asMap, agentConfigurationModel.getProperties());
agentConfiguration,
null,
asMap,
agentConfigurationModel.getProperties(),
allowUnknownProperties);

try {
validatorMapper.convertValue(asMap, modelClazz);
Expand Down Expand Up @@ -129,8 +142,9 @@ private static void validateProperties(
AgentConfiguration agentConfiguration,
String parentProp,
Map<String, Object> asMap,
Map<String, AgentConfigurationModel.AgentConfigurationProperty> properties) {
if (asMap != null) {
Map<String, AgentConfigurationModel.AgentConfigurationProperty> properties,
boolean allowUnknownProperties) {
if (!allowUnknownProperties && asMap != null) {
for (String key : asMap.keySet()) {
if (!properties.containsKey(key)) {
final String fullPropertyKey =
Expand Down Expand Up @@ -176,7 +190,8 @@ private static void validateProperty(
agentConfiguration,
fullPropertyKey,
actualValue == null ? null : (Map<String, Object>) actualValue,
propertyValue.getProperties());
propertyValue.getProperties(),
false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime.CLUSTER_TYPE;

import ai.langstream.api.doc.AgentConfig;
import ai.langstream.api.model.AgentConfiguration;
import ai.langstream.api.runtime.ComponentType;
import ai.langstream.impl.agents.AbstractComposableAgentProvider;
Expand All @@ -38,4 +39,16 @@ public IdentityAgentProvider() {
protected ComponentType getComponentType(AgentConfiguration agentConfiguration) {
return ComponentType.PROCESSOR;
}

@Override
protected Class getAgentConfigModelClass(String type) {
return IdentityAgentConfig.class;
}

@AgentConfig(
name = "Identity function",
description =
"Simple agent to move data from the input to the output. "
+ "Could be used for testing or sample applications.")
public static class IdentityAgentConfig {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
*/
package ai.langstream.runtime.impl.k8s.agents;

import ai.langstream.api.doc.AgentConfig;
import ai.langstream.api.doc.ConfigProperty;
import ai.langstream.api.model.AgentConfiguration;
import ai.langstream.api.runtime.ComponentType;
import ai.langstream.impl.common.AbstractAgentProvider;
import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider;
import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Set;

public class KafkaConnectAgentsProvider extends AbstractAgentProvider {

public KafkaConnectAgentsProvider() {
super(Set.of("sink", "source"), List.of(KubernetesClusterRuntime.CLUSTER_TYPE));
super(
Set.of("sink", "source"),
List.of(
KubernetesClusterRuntime.CLUSTER_TYPE,
NoOpComputeClusterRuntimeProvider.CLUSTER_TYPE));
}

@Override
Expand All @@ -36,4 +44,54 @@ protected ComponentType getComponentType(AgentConfiguration agentConfiguration)
default -> throw new IllegalStateException();
};
}

@Override
protected boolean isAgentConfigModelAllowUnknownProperties(String type) {
return true;
}

@Override
protected Class getAgentConfigModelClass(String type) {
return switch (type) {
case "sink" -> KafkaSinkConnectAgentConfig.class;
case "source" -> KafkaSourceConnectAgentConfig.class;
default -> throw new IllegalStateException();
};
}

@AgentConfig(
name = "Kafka Connect Sink agent",
description =
"""
Run any Kafka Connect Sink.
All the configuration properties are passed to the Kafka Connect Sink.
""")
public static class KafkaSinkConnectAgentConfig {
@ConfigProperty(
description =
"""
Java main class for the Kafka Sink connector.
""",
required = true)
@JsonProperty("connector.class")
private String connectorClass;
}

@AgentConfig(
name = "Kafka Connect Source agent",
description =
"""
Run any Kafka Connect Source.
All the configuration properties are passed to the Kafka Connect Source.
""")
public static class KafkaSourceConnectAgentConfig {
@ConfigProperty(
description =
"""
Java main class for the Kafka Source connector.
""",
required = true)
@JsonProperty("connector.class")
private String connectorClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package ai.langstream.runtime.impl.k8s.agents;

import ai.langstream.api.doc.AgentConfigurationModel;
import ai.langstream.impl.agents.AbstractCompositeAgentProvider;
import ai.langstream.impl.noop.NoOpComputeClusterRuntimeProvider;
import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
import java.util.List;
import java.util.Map;

public class KubernetesCompositeAgentProvider extends AbstractCompositeAgentProvider {
public KubernetesCompositeAgentProvider() {
Expand All @@ -27,4 +29,9 @@ public KubernetesCompositeAgentProvider() {
KubernetesClusterRuntime.CLUSTER_TYPE,
NoOpComputeClusterRuntimeProvider.CLUSTER_TYPE));
}

@Override
public Map<String, AgentConfigurationModel> generateSupportedTypesDocumentation() {
return Map.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package ai.langstream.runtime.impl.k8s.agents;

import ai.langstream.api.doc.AgentConfig;
import ai.langstream.api.doc.ConfigProperty;
import ai.langstream.api.model.AgentConfiguration;
import ai.langstream.api.model.Application;
import ai.langstream.api.model.Module;
Expand All @@ -25,26 +27,31 @@
import ai.langstream.api.runtime.ExecutionPlan;
import ai.langstream.api.runtime.PluginsRegistry;
import ai.langstream.impl.agents.AbstractComposableAgentProvider;
import ai.langstream.impl.agents.ai.steps.QueryConfiguration;
import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class QueryVectorDBAgentProvider extends AbstractComposableAgentProvider {

protected static final String QUERY_VECTOR_DB = "query-vector-db";
protected static final String VECTOR_DB_SINK = "vector-db-sink";

public QueryVectorDBAgentProvider() {
super(
Set.of("query-vector-db", "vector-db-sink"),
List.of(KubernetesClusterRuntime.CLUSTER_TYPE));
Set.of(QUERY_VECTOR_DB, VECTOR_DB_SINK),
List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none"));
}

@Override
protected ComponentType getComponentType(AgentConfiguration agentConfiguration) {
return switch (agentConfiguration.getType()) {
case "query-vector-db" -> ComponentType.PROCESSOR;
case "vector-db-sink" -> ComponentType.SINK;
case QUERY_VECTOR_DB -> ComponentType.PROCESSOR;
case VECTOR_DB_SINK -> ComponentType.SINK;
default -> throw new IllegalStateException();
};
}
Expand All @@ -68,14 +75,9 @@ protected Map<String, Object> computeAgentConfiguration(

// get the datasource configuration and inject it into the agent configuration
String resourceId = (String) originalConfiguration.remove("datasource");
if (resourceId == null || resourceId.isEmpty()) {
throw new IllegalArgumentException(
"Missing required field 'datasource' in agent definition, type="
+ agentConfiguration.getType()
+ ", name="
+ agentConfiguration.getName()
+ ", id="
+ agentConfiguration.getId());
if (resourceId == null) {
throw new IllegalStateException(
"datasource is required but this exception should have been raised before ?");
}
generateDataSourceConfiguration(
resourceId,
Expand All @@ -102,16 +104,61 @@ private void generateDataSourceConfiguration(
if (!resource.type().equals("datasource")
&& !resource.type().equals("vector-database")) {
throw new IllegalArgumentException(
"Resource "
"Resource '"
+ resourceId
+ " is not type=datasource or type=vector-database");
+ "' is not type=datasource or type=vector-database");
}
if (configuration.containsKey("datasource")) {
throw new IllegalArgumentException("Only one datasource is supported");
}
configuration.put("datasource", resourceImplementation);
} else {
throw new IllegalArgumentException("Resource " + resourceId + " not found");
throw new IllegalArgumentException("Resource '" + resourceId + "' not found");
}
}

@Override
protected Class getAgentConfigModelClass(String type) {
return switch (type) {
case QUERY_VECTOR_DB -> QueryVectorDBConfig.class;
case VECTOR_DB_SINK -> VectorDBSinkConfig.class;
default -> throw new IllegalStateException(type);
};
}

@Override
protected boolean isAgentConfigModelAllowUnknownProperties(String type) {
return switch (type) {
case QUERY_VECTOR_DB -> false;
case VECTOR_DB_SINK -> true;
default -> throw new IllegalStateException(type);
};
}

@AgentConfig(
name = "Query a vector database",
description =
"""
Query a vector database using Vector Search capabilities.
""")
@Data
public static class QueryVectorDBConfig extends QueryConfiguration {}

@AgentConfig(
name = "Vector database sink",
description =
"""
Store vectors in a vector database.
Configuration properties depends on the vector database implementation, specified by the "datasource" property.
""")
@Data
public static class VectorDBSinkConfig {
@ConfigProperty(
description =
"""
The defined datasource ID to use to store the vectors.
""",
required = true)
private String datasource;
}
}
Loading

0 comments on commit 8b9a63d

Please sign in to comment.