additionalConfiguration)
+ throws Exception {
+ String model = (String) additionalConfiguration.get("model");
+ String apiUrl = (String) this.providerConfiguration.get("api-url");
+ log.info(
+ "Creating Voyage embeddings service for model {} with API URL {}",
+ model,
+ apiUrl);
+ VoyageEmbeddingService.VoyageApiConfig.VoyageApiConfigBuilder apiBuilder =
+ VoyageEmbeddingService.VoyageApiConfig.builder()
+ .accessKey((String) this.providerConfiguration.get("access-key"))
+ .model(model);
+ if (apiUrl != null && !apiUrl.isEmpty()) {
+ apiBuilder.vgUrl(apiUrl);
+ }
+ return new VoyageEmbeddingService(apiBuilder.build());
+ }
+
+ public void close() {}
+ }
+}
diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java
new file mode 100644
index 000000000..cefc3e04c
--- /dev/null
+++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingService.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.streaming.ai.embeddings;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * EmbeddingsService implementation using Voyage REST API.
+ *
+ * The model requested there should be trained for "sentence similarity" task.
+ */
+@Slf4j
+public class VoyageEmbeddingService implements EmbeddingsService {
+ // https://docs.voyageai.com/reference/embeddings-api
+ @Data
+ @Builder
+ public static class VoyageApiConfig {
+ public String accessKey;
+
+ @Builder.Default public String vgUrl = VG_URL;
+
+ @Builder.Default public String model = "voyage-2";
+
+ public String input_type;
+ public String truncation;
+ public String encoding_format;
+ }
+
+ private static final String VG_URL = "https://api.voyageai.com/v1/embeddings";
+
+ private static final ObjectMapper om = EmbeddingsService.createObjectMapper();
+
+ private final VoyageApiConfig conf;
+ private final String model;
+ private final String token;
+ private final HttpClient httpClient;
+
+ private final URL modelUrl;
+
+ @Data
+ @Builder
+ public static class VoyagePojo {
+ @JsonAlias("input")
+ public List input;
+
+ @JsonAlias("model")
+ public String model;
+
+ @JsonAlias("input_type")
+ public String inputType;
+
+ @JsonAlias("truncation")
+ public Boolean truncation;
+
+ @JsonAlias("encoding_format")
+ public String encodingFormat;
+ }
+
+ public VoyageEmbeddingService(VoyageApiConfig conf) throws MalformedURLException {
+ this.conf = conf;
+ this.model = conf.model;
+ this.token = conf.accessKey;
+ this.modelUrl = new URL(conf.vgUrl);
+
+ this.httpClient =
+ HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_1_1) // Force HTTP/1.1
+ .build();
+ }
+
+ @Override
+ public CompletableFuture>> computeEmbeddings(List texts) {
+ VoyagePojo.VoyagePojoBuilder pojoBuilder =
+ VoyagePojo.builder().input(texts).model(this.model);
+
+ // Conditionally add properties if they are not null
+ if (this.conf.input_type != null) {
+ pojoBuilder.inputType(this.conf.input_type);
+ }
+ if (this.conf.truncation != null) {
+ pojoBuilder.truncation(Boolean.parseBoolean(this.conf.truncation));
+ }
+ if (this.conf.encoding_format != null) {
+ pojoBuilder.encodingFormat(this.conf.encoding_format);
+ }
+
+ VoyagePojo pojo = pojoBuilder.build();
+ try {
+ String jsonContent = om.writeValueAsString(pojo);
+
+ CompletableFuture bodyHandle = query(jsonContent);
+ return bodyHandle.thenApply(
+ body -> {
+ log.info("Got a query response from model {}", model);
+ try {
+ JsonNode rootNode = om.readTree(body);
+ JsonNode dataNode = rootNode.path("data");
+
+ List> embeddings = new ArrayList<>();
+ if (dataNode.isArray()) {
+ for (JsonNode dataItem : dataNode) {
+ JsonNode embeddingNode = dataItem.path("embedding");
+ if (embeddingNode.isArray()) {
+ List embedding = new ArrayList<>();
+ for (JsonNode value : embeddingNode) {
+ embedding.add(value.asDouble());
+ }
+ embeddings.add(embedding);
+ }
+ }
+ }
+ return embeddings;
+ } catch (Exception e) {
+ log.error("Error processing JSON", e);
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Failed to send or serialize request", e);
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private CompletableFuture query(String jsonPayload) throws Exception {
+ HttpRequest request =
+ HttpRequest.newBuilder()
+ .uri(modelUrl.toURI())
+ .header("Authorization", "Bearer " + token)
+ .POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
+ .build();
+ CompletableFuture> responseHandle =
+ httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
+ return responseHandle.thenApply(
+ response -> {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Model {} query response is {} {}",
+ model,
+ response,
+ response.body());
+ }
+
+ if (response.statusCode() != 200) {
+ log.warn(
+ "Model {} query failed with {} {}",
+ model,
+ response,
+ response.body());
+ throw new RuntimeException(
+ "Model "
+ + model
+ + " query failed with status "
+ + response.statusCode());
+ }
+
+ return response.body();
+ });
+ }
+}
diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java
index 88e3befbd..df757d853 100644
--- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java
+++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java
@@ -31,6 +31,8 @@ public class TransformStepConfig {
@JsonProperty private BedrockConfig bedrock;
+ @JsonProperty private VoyageConfig voyage;
+
@JsonProperty private OllamaConfig ollama;
@JsonProperty private Map datasource;
diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/VoyageConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/VoyageConfig.java
new file mode 100644
index 000000000..3aa6474d0
--- /dev/null
+++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/VoyageConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.streaming.ai.model.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+
+@Getter
+public class VoyageConfig {
+ // for API compute provider
+ @JsonProperty(value = "api-url", defaultValue = "https://api.voyageai.com/v1/embeddings")
+ private String apiUrl = "https://api.voyageai.com/v1/embeddings";
+
+ @JsonProperty(value = "access-key")
+ private String accessKey;
+}
diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider
index bdde5f0e7..4d24019eb 100644
--- a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider
+++ b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider
@@ -2,4 +2,5 @@ ai.langstream.ai.agents.services.impl.OpenAIServiceProvider
ai.langstream.ai.agents.services.impl.VertexAIProvider
ai.langstream.ai.agents.services.impl.HuggingFaceProvider
ai.langstream.ai.agents.services.impl.BedrockServiceProvider
-ai.langstream.ai.agents.services.impl.OllamaProvider
\ No newline at end of file
+ai.langstream.ai.agents.services.impl.OllamaProvider
+ai.langstream.ai.agents.services.impl.VoyageProvider
\ No newline at end of file
diff --git a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/VoyageProviderTest.java b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/VoyageProviderTest.java
new file mode 100644
index 000000000..326f36b54
--- /dev/null
+++ b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/VoyageProviderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.ai.agents.services.impl;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import ai.langstream.api.runner.code.MetricsReporter;
+import com.datastax.oss.streaming.ai.embeddings.EmbeddingsService;
+import com.datastax.oss.streaming.ai.services.ServiceProvider;
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import com.github.tomakehurst.wiremock.junit5.WireMockTest;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+@Slf4j
+@WireMockTest
+class VoyageProviderTest {
+
+ @Test
+ void testEmbeddings(WireMockRuntimeInfo wmRuntimeInfo) throws Exception {
+ stubFor(
+ post("/")
+ .willReturn(
+ ok(
+ """
+ {
+ "data": [
+ {
+ "object": "embedding",
+ "embedding": [
+ -0.9004754424095154,
+ 1.2847540378570557,
+ 1.1102418899536133,
+ -0.18884147703647614
+ ]
+ }
+ ]
+ }
+ """)));
+
+ VoyageProvider provider = new VoyageProvider();
+ ServiceProvider implementation =
+ provider.createImplementation(
+ Map.of("voyage", Map.of("api-url", wmRuntimeInfo.getHttpBaseUrl())),
+ MetricsReporter.DISABLED);
+
+ EmbeddingsService service =
+ implementation.getEmbeddingsService(Map.of("model", "voyage-large-2"));
+
+ List> result = service.computeEmbeddings(List.of("test")).get();
+ log.info("result: {}", result);
+ assertEquals(
+ List.of(
+ List.of(
+ -0.9004754424095154,
+ 1.2847540378570557,
+ 1.1102418899536133,
+ -0.18884147703647614)),
+ result);
+ }
+}
diff --git a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingServiceTest.java b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingServiceTest.java
new file mode 100644
index 000000000..1c5117e70
--- /dev/null
+++ b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/embeddings/VoyageEmbeddingServiceTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.oss.streaming.ai.embeddings;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+// disabled, just for experiments/usage demo
+@Slf4j
+public class VoyageEmbeddingServiceTest {
+ private String voyageApiKey = "xxxxxxxxxx"; // replace with your own API key
+
+ @Disabled
+ @Test
+ public void testVoyage2() throws Exception {
+ VoyageEmbeddingService.VoyageApiConfig conf =
+ VoyageEmbeddingService.VoyageApiConfig.builder()
+ .accessKey(voyageApiKey)
+ .model("voyage-2")
+ .build();
+ try (EmbeddingsService service = new VoyageEmbeddingService(conf)) {
+ List> result =
+ service.computeEmbeddings(List.of("hello world", "stranger things")).get();
+ result.forEach(System.out::println);
+ // check the length of the result list
+ assert result.size() == 2;
+ }
+ }
+
+ @Disabled
+ @Test
+ public void testVoyageLarge2() throws Exception {
+ VoyageEmbeddingService.VoyageApiConfig conf =
+ VoyageEmbeddingService.VoyageApiConfig.builder()
+ .accessKey(voyageApiKey)
+ .model("voyage-large-2")
+ .build();
+ try (EmbeddingsService service = new VoyageEmbeddingService(conf)) {
+ List> result =
+ service.computeEmbeddings(List.of("hello world", "stranger things")).get();
+ result.forEach(System.out::println);
+ // check the length of the result list
+ assert result.size() == 2;
+ }
+ }
+}
diff --git a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java
index a9503c958..044d60d35 100644
--- a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java
+++ b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/streaming/ai/jstl/JstlFunctionsTest.java
@@ -119,6 +119,13 @@ void testConcatNull() {
assertEquals("", JstlFunctions.concat(null, null));
}
+ @Test
+ void testSha256() {
+ String testString = "hello";
+ String expectedHash = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824";
+ assertEquals(expectedHash, JstlFunctions.sha256(testString));
+ }
+
@Test
void testNow() {
Instant fixedInstant = Instant.now();
diff --git a/langstream-agents/langstream-vector-agents/pom.xml b/langstream-agents/langstream-vector-agents/pom.xml
index 4cbf4224d..b856a96f8 100644
--- a/langstream-agents/langstream-vector-agents/pom.xml
+++ b/langstream-agents/langstream-vector-agents/pom.xml
@@ -75,7 +75,7 @@
io.pinecone
pinecone-client
- 0.2.3
+ 0.8.0