From 5b6e6bdc2646efd05bb2b0f8806505cfd7fc5f6b Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Wed, 18 Oct 2023 15:22:11 +0200 Subject: [PATCH] Add support for topic schemas in Pulsar (#608) --- .../kafka/runner/KafkaProducerWrapper.java | 2 +- ...PulsarTopicConnectionsRuntimeProvider.java | 83 ++++- .../pulsar/PulsarStreamingClusterRuntime.java | 8 +- .../langstream/AbstractApplicationRunner.java | 236 +------------ .../langstream/assets/DeployAssetsTest.java | 4 +- .../kafka/AbstractKafkaApplicationRunner.java | 268 +++++++++++++++ .../kafka/AstraDBAssetQueryWriteIT.java | 3 +- .../langstream/kafka/AsyncProcessingIT.java | 3 +- .../kafka/BedrockCompletionsIT.java | 3 +- .../kafka/CassandraAssetQueryWriteIT.java | 3 +- .../CassandraVectorAssetQueryWriteIT.java | 3 +- .../langstream/kafka/ChatCompletionsIT.java | 3 +- .../langstream/kafka/ComputeEmbeddingsIT.java | 3 +- .../langstream/kafka/ErrorHandlingTest.java | 15 +- .../kafka/FlareControllerAgentRunnerIT.java | 3 +- .../langstream/kafka/FlowControlRunnerIT.java | 3 +- .../langstream/kafka/GenIAgentsRunnerIT.java | 3 +- .../kafka/HttpRequestAgentRunnerIT.java | 3 +- .../ai/langstream/kafka/JdbcDatabaseIT.java | 3 +- .../kafka/KafkaConnectSinkRunnerIT.java | 3 +- .../kafka/KafkaConnectSourceRunnerIT.java | 3 +- .../kafka/KafkaRunnerDockerTest.java | 7 +- .../ai/langstream/kafka/KafkaSchemaTest.java | 3 +- .../kafka/MilvusVectorAssetQueryWriteIT.java | 3 +- .../langstream/kafka/OpenSearchVectorIT.java | 3 +- .../kafka/PlaceholderEndToEndTest.java | 3 +- .../ai/langstream/kafka/QueryPineconeIT.java | 3 +- .../langstream/kafka/RerankAgentRunnerIT.java | 3 +- .../kafka/SolrAssetQueryWriteIT.java | 3 +- .../langstream/kafka/TextCompletionsIT.java | 3 +- .../kafka/TextProcessingAgentsRunnerIT.java | 3 +- .../pulsar/PulsarContainerExtension.java | 102 ++++++ .../pulsar/PulsarRunnerDockerTest.java | 316 ++++++++---------- 33 files changed, 625 insertions(+), 485 deletions(-) create mode 100644 langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java create mode 100644 langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java diff --git a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaProducerWrapper.java b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaProducerWrapper.java index d069eec66..511e6e060 100644 --- a/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaProducerWrapper.java +++ b/langstream-kafka-runtime/src/main/java/ai/langstream/kafka/runner/KafkaProducerWrapper.java @@ -54,7 +54,7 @@ @Slf4j class KafkaProducerWrapper implements TopicProducer { - final Map, Serializer> BASE_SERIALIZERS = + static final Map, Serializer> BASE_SERIALIZERS = Map.ofEntries( entry(String.class, new StringSerializer()), entry(Boolean.class, new BooleanSerializer()), diff --git a/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java b/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java index 56145d38f..67583e787 100644 --- a/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java +++ b/langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java @@ -16,6 +16,7 @@ package ai.langstream.pulsar.runner; import static ai.langstream.pulsar.PulsarClientUtils.buildPulsarAdmin; +import static java.util.Map.entry; import ai.langstream.api.model.Application; import ai.langstream.api.model.SchemaDefinition; @@ -37,8 +38,16 @@ import ai.langstream.pulsar.PulsarTopic; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -524,6 +533,26 @@ private class PulsarTopicProducer implements TopicProducer { Producer producer; Schema schema; + static final Map, Schema> BASE_SCHEMAS = + Map.ofEntries( + entry(String.class, Schema.STRING), + entry(Boolean.class, Schema.BOOL), + entry(Byte.class, Schema.INT8), + entry(Short.class, Schema.INT16), + entry(Integer.class, Schema.INT32), + entry(Long.class, Schema.INT64), + entry(Float.class, Schema.FLOAT), + entry(Double.class, Schema.DOUBLE), + entry(byte[].class, Schema.BYTES), + entry(Date.class, Schema.DATE), + entry(Timestamp.class, Schema.TIMESTAMP), + entry(Time.class, Schema.TIME), + entry(LocalDate.class, Schema.LOCAL_DATE), + entry(LocalTime.class, Schema.LOCAL_TIME), + entry(LocalDateTime.class, Schema.LOCAL_DATE_TIME), + entry(Instant.class, Schema.INSTANT), + entry(ByteBuffer.class, Schema.BYTEBUFFER)); + public PulsarTopicProducer(Map configuration) { this.configuration = configuration; } @@ -531,12 +560,26 @@ public PulsarTopicProducer(Map configuration) { @Override @SneakyThrows public void start() { - String topic = (String) configuration.remove("topic"); - schema = (Schema) configuration.remove("schema"); - if (schema == null) { - schema = (Schema) Schema.STRING; + if (configuration.containsKey("valueSchema")) { + SchemaDefinition valueSchemaDefinition = + mapper.convertValue( + configuration.remove("valueSchema"), SchemaDefinition.class); + Schema valueSchema = Schema.getSchema(getSchemaInfo(valueSchemaDefinition)); + if (configuration.containsKey("keySchema")) { + SchemaDefinition keySchemaDefinition = + mapper.convertValue( + configuration.remove("keySchema"), SchemaDefinition.class); + Schema keySchema = Schema.getSchema(getSchemaInfo(keySchemaDefinition)); + schema = (Schema) Schema.KeyValue(keySchema, valueSchema); + } else { + schema = (Schema) valueSchema; + } + producer = + client.newProducer(schema) + .topic((String) configuration.remove("topic")) + .loadConf(configuration) + .create(); } - producer = client.newProducer(schema).topic(topic).loadConf(configuration).create(); } @Override @@ -552,9 +595,39 @@ public void close() { } } + private Schema getSchema(Class klass) { + Schema schema = BASE_SCHEMAS.get(klass); + if (schema == null) { + throw new IllegalArgumentException("Cannot infer schema for " + klass); + } + return schema; + } + @Override public CompletableFuture write(Record r) { totalIn.addAndGet(1); + if (schema == null) { + try { + if (r.value() == null) { + throw new IllegalStateException( + "Cannot infer schema because value is null"); + } + Schema valueSchema = getSchema(r.value().getClass()); + if (r.key() != null) { + Schema keySchema = getSchema(r.key().getClass()); + schema = (Schema) Schema.KeyValue(keySchema, valueSchema); + } else { + schema = (Schema) valueSchema; + } + producer = + client.newProducer(schema) + .topic((String) configuration.remove("topic")) + .loadConf(configuration) + .create(); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } log.info("Writing message {}", r); // TODO: handle KV diff --git a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java index 9f946f714..44cf95b60 100644 --- a/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java +++ b/langstream-pulsar/src/main/java/ai/langstream/pulsar/PulsarStreamingClusterRuntime.java @@ -75,9 +75,15 @@ public Map createProducerConfiguration( PulsarTopic pulsarTopic = (PulsarTopic) outputConnectionImplementation; Map configuration = new HashMap<>(); - // TODO: handle other configurations and schema + // TODO: handle other configurations configuration.put("topic", pulsarTopic.name().toPulsarName()); + if (pulsarTopic.keySchema() != null) { + configuration.put("keySchema", pulsarTopic.keySchema()); + } + if (pulsarTopic.valueSchema() != null) { + configuration.put("valueSchema", pulsarTopic.valueSchema()); + } return configuration; } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java index 95b18decb..0a0c33732 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java @@ -15,10 +15,6 @@ */ package ai.langstream; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.api.model.Application; @@ -32,16 +28,13 @@ import ai.langstream.impl.k8s.tests.KubeTestServer; import ai.langstream.impl.nar.NarFileHandler; import ai.langstream.impl.parser.ModelBuilder; -import ai.langstream.kafka.extensions.KafkaContainerExtension; import ai.langstream.runtime.agent.AgentRunner; import ai.langstream.runtime.agent.api.AgentInfo; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; import io.fabric8.kubernetes.api.model.Secret; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -52,15 +45,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -78,9 +63,6 @@ public abstract class AbstractApplicationRunner { @RegisterExtension protected static final KubeTestServer kubeServer = new KubeTestServer(); - @RegisterExtension - protected static final KafkaContainerExtension kafkaContainer = new KafkaContainerExtension(); - protected static ApplicationDeployer applicationDeployer; private static NarFileHandler narFileHandler; private static TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry; @@ -156,28 +138,6 @@ protected ApplicationRuntime deployApplicationWithSecrets( return new ApplicationRuntime(tenant, appId, applicationInstance, implementation, secrets); } - protected String buildInstanceYaml() { - String inputTopic = "input-topic-" + UUID.randomUUID(); - String outputTopic = "output-topic-" + UUID.randomUUID(); - String streamTopic = "stream-topic-" + UUID.randomUUID(); - return """ - instance: - globals: - input-topic: %s - output-topic: %s - stream-topic: %s - streamingCluster: - type: "kafka" - configuration: - admin: - bootstrap.servers: "%s" - computeCluster: - type: "kubernetes" - """ - .formatted( - inputTopic, outputTopic, streamTopic, kafkaContainer.getBootstrapServers()); - } - @BeforeAll public static void setup() throws Exception { Path codeDirectory = Paths.get("target/test-jdbc-drivers"); @@ -200,121 +160,6 @@ public static void setup() throws Exception { .build(); } - protected KafkaProducer createProducer() { - return new KafkaProducer<>( - Map.of( - "bootstrap.servers", - kafkaContainer.getBootstrapServers(), - "key.serializer", - "org.apache.kafka.common.serialization.StringSerializer", - "value.serializer", - "org.apache.kafka.common.serialization.StringSerializer")); - } - - protected void sendMessage(String topic, Object content, KafkaProducer producer) - throws Exception { - sendMessage(topic, content, List.of(), producer); - } - - protected void sendMessage( - String topic, Object content, List
headers, KafkaProducer producer) - throws Exception { - sendMessage(topic, "key", content, headers, producer); - } - - protected void sendMessage( - String topic, Object key, Object content, List
headers, KafkaProducer producer) - throws Exception { - producer.send( - new ProducerRecord<>( - topic, null, System.currentTimeMillis(), key, content, headers)) - .get(); - producer.flush(); - } - - protected List waitForMessages(KafkaConsumer consumer, List expected) { - List result = new ArrayList<>(); - List received = new ArrayList<>(); - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .untilAsserted( - () -> { - ConsumerRecords poll = - consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord record : poll) { - log.info("Received message {}", record); - received.add(record.value()); - result.add(record); - } - log.info("Result: {}", received); - received.forEach(r -> log.info("Received |{}|", r)); - - assertEquals(expected.size(), received.size()); - for (int i = 0; i < expected.size(); i++) { - Object expectedValue = expected.get(i); - Object actualValue = received.get(i); - if (expectedValue instanceof Consumer fn) { - fn.accept(actualValue); - } else if (expectedValue instanceof byte[]) { - assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue); - } else { - log.info("expected: {}", expectedValue); - log.info("got: {}", actualValue); - assertEquals(expectedValue, actualValue); - } - } - }); - - return result; - } - - protected List waitForMessagesInAnyOrder( - KafkaConsumer consumer, Collection expected) { - List result = new ArrayList<>(); - List received = new ArrayList<>(); - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .untilAsserted( - () -> { - ConsumerRecords poll = - consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord record : poll) { - log.info("Received message {}", record); - received.add(record.value()); - result.add(record); - } - log.info("Result: {}", received); - received.forEach(r -> log.info("Received |{}|", r)); - - assertEquals(expected.size(), received.size()); - for (Object expectedValue : expected) { - // this doesn't work for byte[] - assertFalse(expectedValue instanceof byte[]); - assertTrue( - received.contains(expectedValue), - "Expected value " - + expectedValue - + " not found in " - + received); - } - - for (Object receivedValue : received) { - // this doesn't work for byte[] - assertFalse(receivedValue instanceof byte[]); - assertTrue( - expected.contains(receivedValue), - "Received value " - + receivedValue - + " not found in " - + expected); - } - }); - - return result; - } - public record AgentRunResult(Map info) {} protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws Exception { @@ -341,7 +186,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws }); // execute all the pods ExecutorService executorService = Executors.newCachedThreadPool(); - List futures = new ArrayList<>(); + List> futures = new ArrayList<>(); for (RuntimePodConfiguration podConfiguration : pods) { CompletableFuture handle = new CompletableFuture<>(); futures.add(handle); @@ -419,84 +264,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws return new AgentRunResult(allAgentsInfo); } - private volatile boolean validateConsumerOffsets = true; - - public boolean isValidateConsumerOffsets() { - return validateConsumerOffsets; - } - - public void setValidateConsumerOffsets(boolean validateConsumerOffsets) { - this.validateConsumerOffsets = validateConsumerOffsets; - } - - private void validateAgentInfoBeforeStop(AgentInfo agentInfo) { - if (!validateConsumerOffsets) { - return; - } - agentInfo - .serveWorkerStatus() - .forEach( - workerStatus -> { - String agentType = workerStatus.getAgentType(); - log.info("Checking Agent type {}", agentType); - switch (agentType) { - case "topic-source": - Map info = workerStatus.getInfo(); - log.info("Topic source info {}", info); - Map consumerInfo = - (Map) info.get("consumer"); - if (consumerInfo != null) { - Map committedOffsets = - (Map) - consumerInfo.get("committedOffsets"); - log.info("Committed offsets {}", committedOffsets); - committedOffsets.forEach( - (topic, offset) -> { - assertNotNull(offset); - assertTrue(((Number) offset).intValue() >= 0); - }); - Map uncommittedOffsets = - (Map) - consumerInfo.get("uncommittedOffsets"); - log.info("Uncommitted offsets {}", uncommittedOffsets); - uncommittedOffsets.forEach( - (topic, number) -> { - assertNotNull(number); - assertTrue( - ((Number) number).intValue() <= 0, - "for topic " - + topic - + " we have some uncommitted offsets: " - + number); - }); - } - default: - // ignore - } - }); - } - - protected KafkaConsumer createConsumer(String topic) { - KafkaConsumer consumer = - new KafkaConsumer<>( - Map.of( - "bootstrap.servers", - kafkaContainer.getBootstrapServers(), - "key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer", - "value.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer", - "group.id", - "testgroup-" + UUID.randomUUID(), - "auto.offset.reset", - "earliest")); - consumer.subscribe(List.of(topic)); - return consumer; - } - - protected static AdminClient getKafkaAdmin() { - return kafkaContainer.getAdmin(); - } + protected void validateAgentInfoBeforeStop(AgentInfo agentInfo) {} @AfterAll public static void teardown() { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/assets/DeployAssetsTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/assets/DeployAssetsTest.java index da085fa95..11807323a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/assets/DeployAssetsTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/assets/DeployAssetsTest.java @@ -17,9 +17,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.api.model.AssetDefinition; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.kafka.AbstractKafkaApplicationRunner; import ai.langstream.mockagents.MockAssetManagerCodeProvider; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class DeployAssetsTest extends AbstractApplicationRunner { +class DeployAssetsTest extends AbstractKafkaApplicationRunner { @Test public void testDeployAsset() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java new file mode 100644 index 000000000..85b560c8c --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java @@ -0,0 +1,268 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.kafka; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import ai.langstream.AbstractApplicationRunner; +import ai.langstream.kafka.extensions.KafkaContainerExtension; +import ai.langstream.runtime.agent.api.AgentInfo; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.extension.RegisterExtension; + +@Slf4j +public abstract class AbstractKafkaApplicationRunner extends AbstractApplicationRunner { + + @RegisterExtension + protected static final KafkaContainerExtension kafkaContainer = new KafkaContainerExtension(); + + private volatile boolean validateConsumerOffsets = true; + + public boolean isValidateConsumerOffsets() { + return validateConsumerOffsets; + } + + public void setValidateConsumerOffsets(boolean validateConsumerOffsets) { + this.validateConsumerOffsets = validateConsumerOffsets; + } + + protected String buildInstanceYaml() { + String inputTopic = "input-topic-" + UUID.randomUUID(); + String outputTopic = "output-topic-" + UUID.randomUUID(); + String streamTopic = "stream-topic-" + UUID.randomUUID(); + return """ + instance: + globals: + input-topic: %s + output-topic: %s + stream-topic: %s + streamingCluster: + type: "kafka" + configuration: + admin: + bootstrap.servers: "%s" + computeCluster: + type: "kubernetes" + """ + .formatted( + inputTopic, outputTopic, streamTopic, kafkaContainer.getBootstrapServers()); + } + + protected KafkaProducer createProducer() { + return new KafkaProducer<>( + Map.of( + "bootstrap.servers", + kafkaContainer.getBootstrapServers(), + "key.serializer", + "org.apache.kafka.common.serialization.StringSerializer", + "value.serializer", + "org.apache.kafka.common.serialization.StringSerializer")); + } + + protected void sendMessage(String topic, Object content, KafkaProducer producer) + throws Exception { + sendMessage(topic, content, List.of(), producer); + } + + protected void sendMessage( + String topic, Object content, List
headers, KafkaProducer producer) + throws Exception { + sendMessage(topic, "key", content, headers, producer); + } + + protected void sendMessage( + String topic, Object key, Object content, List
headers, KafkaProducer producer) + throws Exception { + producer.send( + new ProducerRecord<>( + topic, null, System.currentTimeMillis(), key, content, headers)) + .get(); + producer.flush(); + } + + protected List waitForMessages(KafkaConsumer consumer, List expected) { + List result = new ArrayList<>(); + List received = new ArrayList<>(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords poll = + consumer.poll(Duration.ofSeconds(2)); + for (ConsumerRecord record : poll) { + log.info("Received message {}", record); + received.add(record.value()); + result.add(record); + } + log.info("Result: {}", received); + received.forEach(r -> log.info("Received |{}|", r)); + + assertEquals(expected.size(), received.size()); + for (int i = 0; i < expected.size(); i++) { + Object expectedValue = expected.get(i); + Object actualValue = received.get(i); + if (expectedValue instanceof Consumer fn) { + fn.accept(actualValue); + } else if (expectedValue instanceof byte[]) { + assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue); + } else { + log.info("expected: {}", expectedValue); + log.info("got: {}", actualValue); + assertEquals(expectedValue, actualValue); + } + } + }); + + return result; + } + + protected List waitForMessagesInAnyOrder( + KafkaConsumer consumer, Collection expected) { + List result = new ArrayList<>(); + List received = new ArrayList<>(); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted( + () -> { + ConsumerRecords poll = + consumer.poll(Duration.ofSeconds(2)); + for (ConsumerRecord record : poll) { + log.info("Received message {}", record); + received.add(record.value()); + result.add(record); + } + log.info("Result: {}", received); + received.forEach(r -> log.info("Received |{}|", r)); + + assertEquals(expected.size(), received.size()); + for (Object expectedValue : expected) { + // this doesn't work for byte[] + assertFalse(expectedValue instanceof byte[]); + assertTrue( + received.contains(expectedValue), + "Expected value " + + expectedValue + + " not found in " + + received); + } + + for (Object receivedValue : received) { + // this doesn't work for byte[] + assertFalse(receivedValue instanceof byte[]); + assertTrue( + expected.contains(receivedValue), + "Received value " + + receivedValue + + " not found in " + + expected); + } + }); + + return result; + } + + protected KafkaConsumer createConsumer(String topic) { + KafkaConsumer consumer = + new KafkaConsumer<>( + Map.of( + "bootstrap.servers", + kafkaContainer.getBootstrapServers(), + "key.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer", + "group.id", + "testgroup-" + UUID.randomUUID(), + "auto.offset.reset", + "earliest")); + consumer.subscribe(List.of(topic)); + return consumer; + } + + protected static AdminClient getKafkaAdmin() { + return kafkaContainer.getAdmin(); + } + + @Override + protected void validateAgentInfoBeforeStop(AgentInfo agentInfo) { + if (!validateConsumerOffsets) { + return; + } + agentInfo + .serveWorkerStatus() + .forEach( + workerStatus -> { + String agentType = workerStatus.getAgentType(); + log.info("Checking Agent type {}", agentType); + switch (agentType) { + case "topic-source": + Map info = workerStatus.getInfo(); + log.info("Topic source info {}", info); + Map consumerInfo = + (Map) info.get("consumer"); + if (consumerInfo != null) { + Map committedOffsets = + (Map) + consumerInfo.get("committedOffsets"); + log.info("Committed offsets {}", committedOffsets); + committedOffsets.forEach( + (topic, offset) -> { + assertNotNull(offset); + assertTrue(((Number) offset).intValue() >= 0); + }); + Map uncommittedOffsets = + (Map) + consumerInfo.get("uncommittedOffsets"); + log.info("Uncommitted offsets {}", uncommittedOffsets); + uncommittedOffsets.forEach( + (topic, number) -> { + assertNotNull(number); + assertTrue( + ((Number) number).intValue() <= 0, + "for topic " + + topic + + " we have some uncommitted offsets: " + + number); + }); + } + default: + // ignore + } + }); + } +} diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraDBAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraDBAssetQueryWriteIT.java index 1d6056bf4..4892369df 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraDBAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AstraDBAssetQueryWriteIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; @@ -28,7 +27,7 @@ @Slf4j @Disabled -class AstraDBAssetQueryWriteIT extends AbstractApplicationRunner { +class AstraDBAssetQueryWriteIT extends AbstractKafkaApplicationRunner { static final String SECRETS_PATH = ""; diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AsyncProcessingIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AsyncProcessingIT.java index a1bf1efb4..b5a1d643d 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AsyncProcessingIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AsyncProcessingIT.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.mockagents.MockProcessorAgentsCodeProvider; import ai.langstream.runtime.agent.AgentRunner; import java.util.HashSet; @@ -35,7 +34,7 @@ import org.junit.jupiter.params.provider.ValueSource; @Slf4j -class AsyncProcessingIT extends AbstractApplicationRunner { +class AsyncProcessingIT extends AbstractKafkaApplicationRunner { @Test public void testProcessMultiThreadOutOfOrder() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/BedrockCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/BedrockCompletionsIT.java index 905b2189a..988445313 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/BedrockCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/BedrockCompletionsIT.java @@ -21,7 +21,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import ai.langstream.AbstractApplicationRunner; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import com.github.tomakehurst.wiremock.matching.MultiValuePattern; @@ -39,7 +38,7 @@ @Slf4j @WireMockTest -class BedrockCompletionsIT extends AbstractApplicationRunner { +class BedrockCompletionsIT extends AbstractKafkaApplicationRunner { static WireMockRuntimeInfo wireMockRuntimeInfo; @BeforeAll diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java index 3bf1b1011..33f4d8d8f 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraAssetQueryWriteIT.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -import ai.langstream.AbstractApplicationRunner; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -39,7 +38,7 @@ @Slf4j @Testcontainers -class CassandraAssetQueryWriteIT extends AbstractApplicationRunner { +class CassandraAssetQueryWriteIT extends AbstractKafkaApplicationRunner { @Container private CassandraContainer cassandra = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java index 91ffa3277..6f462b5b3 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/CassandraVectorAssetQueryWriteIT.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; -import ai.langstream.AbstractApplicationRunner; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -41,7 +40,7 @@ @Slf4j @Testcontainers -class CassandraVectorAssetQueryWriteIT extends AbstractApplicationRunner { +class CassandraVectorAssetQueryWriteIT extends AbstractKafkaApplicationRunner { @Container private CassandraContainer cassandra = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java index 6e3dc10c0..d81d904db 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java @@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.api.model.Application; import ai.langstream.api.model.Connection; import ai.langstream.api.model.Module; @@ -48,7 +47,7 @@ @Slf4j @WireMockTest -class ChatCompletionsIT extends AbstractApplicationRunner { +class ChatCompletionsIT extends AbstractKafkaApplicationRunner { static WireMockRuntimeInfo wireMockRuntimeInfo; diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java index 47959c2f7..7e3f8e1d5 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java @@ -24,7 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.api.model.Application; import ai.langstream.api.model.Connection; import ai.langstream.api.model.Module; @@ -56,7 +55,7 @@ @Slf4j @WireMockTest -class ComputeEmbeddingsIT extends AbstractApplicationRunner { +class ComputeEmbeddingsIT extends AbstractKafkaApplicationRunner { @AllArgsConstructor private static class EmbeddingsConfig { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ErrorHandlingTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ErrorHandlingTest.java index 2e2452ad5..ec95c7b7a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ErrorHandlingTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ErrorHandlingTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.mockagents.MockProcessorAgentsCodeProvider; import ai.langstream.runtime.agent.AgentRunner; import java.util.ArrayList; @@ -32,7 +31,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class ErrorHandlingTest extends AbstractApplicationRunner { +class ErrorHandlingTest extends AbstractKafkaApplicationRunner { @Test public void testDiscardErrors() throws Exception { @@ -71,7 +70,7 @@ public void testDiscardErrors() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer(); @@ -117,7 +116,7 @@ public void testDeadLetter() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer(); @@ -176,7 +175,7 @@ public void testFailOnErrors() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer()) { @@ -245,7 +244,7 @@ public void testDiscardErrorsOnSink() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, inputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer(); ) { @@ -301,7 +300,7 @@ public void testFailOnErrorsOnSink() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, inputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer()) { @@ -357,7 +356,7 @@ public void testDeadLetterOnSink() throws Exception { fail-on-content: "fail-me" """ .formatted(inputTopic, inputTopic)); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { try (KafkaProducer producer = createProducer(); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlareControllerAgentRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlareControllerAgentRunnerIT.java index e01fae2c7..91933ba44 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlareControllerAgentRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlareControllerAgentRunnerIT.java @@ -19,7 +19,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import ai.langstream.AbstractApplicationRunner; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.util.List; @@ -38,7 +37,7 @@ @Slf4j @Testcontainers @WireMockTest -class FlareControllerAgentRunnerIT extends AbstractApplicationRunner { +class FlareControllerAgentRunnerIT extends AbstractKafkaApplicationRunner { @Container static GenericContainer database = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlowControlRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlowControlRunnerIT.java index 620adae67..5da32bacd 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlowControlRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/FlowControlRunnerIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -27,7 +26,7 @@ @Slf4j @Testcontainers -class FlowControlRunnerIT extends AbstractApplicationRunner { +class FlowControlRunnerIT extends AbstractKafkaApplicationRunner { @Test public void testSimpleFlowControl() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/GenIAgentsRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/GenIAgentsRunnerIT.java index 000c23c0a..02a993669 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/GenIAgentsRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/GenIAgentsRunnerIT.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.api.runner.code.AgentStatusResponse; import java.nio.charset.StandardCharsets; import java.util.List; @@ -31,7 +30,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class GenIAgentsRunnerIT extends AbstractApplicationRunner { +class GenIAgentsRunnerIT extends AbstractKafkaApplicationRunner { @Test public void testRunAITools() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java index f48fdd9bf..b3ac58999 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/HttpRequestAgentRunnerIT.java @@ -22,7 +22,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import ai.langstream.AbstractApplicationRunner; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.util.List; @@ -35,7 +34,7 @@ @Slf4j @WireMockTest -class HttpRequestAgentRunnerIT extends AbstractApplicationRunner { +class HttpRequestAgentRunnerIT extends AbstractKafkaApplicationRunner { static WireMockRuntimeInfo wireMockRuntimeInfo; diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/JdbcDatabaseIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/JdbcDatabaseIT.java index 243920290..5a4911b87 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/JdbcDatabaseIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/JdbcDatabaseIT.java @@ -17,7 +17,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import ai.langstream.AbstractApplicationRunner; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.List; @@ -37,7 +36,7 @@ @Slf4j @Testcontainers -class JdbcDatabaseIT extends AbstractApplicationRunner { +class JdbcDatabaseIT extends AbstractKafkaApplicationRunner { static final ObjectMapper MAPPER = new ObjectMapper(); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSinkRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSinkRunnerIT.java index 305e35463..634ff41fa 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSinkRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSinkRunnerIT.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -import ai.langstream.AbstractApplicationRunner; import com.google.common.base.Strings; import java.util.Collection; import java.util.List; @@ -35,7 +34,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class KafkaConnectSinkRunnerIT extends AbstractApplicationRunner { +class KafkaConnectSinkRunnerIT extends AbstractKafkaApplicationRunner { @Test @Disabled diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSourceRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSourceRunnerIT.java index a1a1def8e..cf4be5cb6 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSourceRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaConnectSourceRunnerIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; @@ -30,7 +29,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class KafkaConnectSourceRunnerIT extends AbstractApplicationRunner { +class KafkaConnectSourceRunnerIT extends AbstractKafkaApplicationRunner { @Test public void testRunKafkaConnectSource() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaRunnerDockerTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaRunnerDockerTest.java index 7883d9501..dac0ae0c2 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaRunnerDockerTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaRunnerDockerTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,7 +31,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class KafkaRunnerDockerTest extends AbstractApplicationRunner { +class KafkaRunnerDockerTest extends AbstractKafkaApplicationRunner { @Test public void testConnectToTopics() throws Exception { @@ -57,7 +56,7 @@ public void testConnectToTopics() throws Exception { output: "output-topic" """); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { Set topics = getKafkaAdmin().listTopics().names().get(); @@ -97,7 +96,7 @@ public void testApplyRetention() throws Exception { input: "input-topic-with-retention" """); - try (AbstractApplicationRunner.ApplicationRuntime applicationRuntime = + try (AbstractKafkaApplicationRunner.ApplicationRuntime applicationRuntime = deployApplication( tenant, "app", application, buildInstanceYaml(), expectedAgents)) { Set topics = getKafkaAdmin().listTopics().names().get(); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaSchemaTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaSchemaTest.java index cd6ed3e07..f126728b6 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaSchemaTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/KafkaSchemaTest.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.kafka.extensions.KafkaRegistryContainerExtension; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -34,7 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j -class KafkaSchemaTest extends AbstractApplicationRunner { +class KafkaSchemaTest extends AbstractKafkaApplicationRunner { @RegisterExtension static KafkaRegistryContainerExtension registry = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java index 0bcf7b9c0..52fde0b36 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -25,7 +24,7 @@ import org.junit.jupiter.api.Test; @Slf4j -class MilvusVectorAssetQueryWriteIT extends AbstractApplicationRunner { +class MilvusVectorAssetQueryWriteIT extends AbstractKafkaApplicationRunner { @Test @Disabled() // "This test requires a running Milvus instance" diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java index 744ff9344..7ac82180a 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/OpenSearchVectorIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -29,7 +28,7 @@ @Slf4j @Testcontainers -class OpenSearchVectorIT extends AbstractApplicationRunner { +class OpenSearchVectorIT extends AbstractKafkaApplicationRunner { @Container static OpensearchContainer OPENSEARCH = new OpensearchContainer(DockerImageName.parse("opensearchproject/opensearch:2")) diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/PlaceholderEndToEndTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/PlaceholderEndToEndTest.java index 234d94ae0..3d6942b8c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/PlaceholderEndToEndTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/PlaceholderEndToEndTest.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.kafka.extensions.KafkaRegistryContainerExtension; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -34,7 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j -class PlaceholderEndToEndTest extends AbstractApplicationRunner { +class PlaceholderEndToEndTest extends AbstractKafkaApplicationRunner { @RegisterExtension static KafkaRegistryContainerExtension registry = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/QueryPineconeIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/QueryPineconeIT.java index 8ab0df9ad..5261bcf72 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/QueryPineconeIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/QueryPineconeIT.java @@ -19,7 +19,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import ai.langstream.AbstractApplicationRunner; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; import java.util.List; @@ -31,7 +30,7 @@ @Slf4j @WireMockTest -class QueryPineconeIT extends AbstractApplicationRunner { +class QueryPineconeIT extends AbstractKafkaApplicationRunner { @Test public void testQueryPinecone(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/RerankAgentRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/RerankAgentRunnerIT.java index 41ea936f9..07d7249af 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/RerankAgentRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/RerankAgentRunnerIT.java @@ -17,7 +17,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import ai.langstream.AbstractApplicationRunner; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; @@ -37,7 +36,7 @@ @Slf4j @Testcontainers -class RerankAgentRunnerIT extends AbstractApplicationRunner { +class RerankAgentRunnerIT extends AbstractKafkaApplicationRunner { @Container static GenericContainer database = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/SolrAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/SolrAssetQueryWriteIT.java index 09a8e42e1..338bd27b4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/SolrAssetQueryWriteIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/SolrAssetQueryWriteIT.java @@ -17,7 +17,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.ai.agents.commons.jstl.JstlFunctions; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; @@ -34,7 +33,7 @@ @Slf4j @Testcontainers -class SolrAssetQueryWriteIT extends AbstractApplicationRunner { +class SolrAssetQueryWriteIT extends AbstractKafkaApplicationRunner { @Container private GenericContainer solrCloudContainer = diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextCompletionsIT.java index 564d0733e..b6497eb32 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextCompletionsIT.java @@ -21,7 +21,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static org.junit.jupiter.api.Assertions.assertTrue; -import ai.langstream.AbstractApplicationRunner; import ai.langstream.api.model.Application; import ai.langstream.api.model.Connection; import ai.langstream.api.model.Module; @@ -42,7 +41,7 @@ @Slf4j @WireMockTest -class TextCompletionsIT extends AbstractApplicationRunner { +class TextCompletionsIT extends AbstractKafkaApplicationRunner { static WireMockRuntimeInfo wireMockRuntimeInfo; diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java index 7b99c8b7a..e5cd39140 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/TextProcessingAgentsRunnerIT.java @@ -15,7 +15,6 @@ */ package ai.langstream.kafka; -import ai.langstream.AbstractApplicationRunner; import java.util.List; import java.util.Map; import java.util.UUID; @@ -27,7 +26,7 @@ import org.junit.jupiter.params.provider.ValueSource; @Slf4j -class TextProcessingAgentsRunnerIT extends AbstractApplicationRunner { +class TextProcessingAgentsRunnerIT extends AbstractKafkaApplicationRunner { @Test public void testFullLanguageProcessingPipeline() throws Exception { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java new file mode 100644 index 000000000..7b35419b8 --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarContainerExtension.java @@ -0,0 +1,102 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.pulsar; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback { + private PulsarContainer pulsarContainer; + + private Network network; + + private PulsarAdmin admin; + private PulsarClient client; + + @Override + public void afterAll(ExtensionContext extensionContext) throws PulsarClientException { + if (client != null) { + client.close(); + } + if (admin != null) { + admin.close(); + } + if (pulsarContainer != null) { + pulsarContainer.close(); + } + if (network != null) { + network.close(); + } + } + + @Override + public void beforeAll(ExtensionContext extensionContext) + throws PulsarClientException, PulsarAdminException { + network = Network.newNetwork(); + pulsarContainer = + new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0")) + .withNetwork(network) + .withLogConsumer( + outputFrame -> + log.debug( + "pulsar> {}", outputFrame.getUtf8String().trim())); + // start Pulsar and wait for it to be ready to accept requests + pulsarContainer.start(); + + admin = + PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080)) + .build(); + + client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build(); + + try { + admin.namespaces().createNamespace("public/default"); + } catch (PulsarAdminException.ConflictException exists) { + // ignore + } + } + + public String getBrokerUrl() { + return pulsarContainer.getPulsarBrokerUrl(); + } + + public String getHttpServiceUrl() { + return pulsarContainer.getHttpServiceUrl(); + } + + public PulsarContainer getPulsarContainer() { + return pulsarContainer; + } + + public PulsarAdmin getAdmin() { + return admin; + } + + public PulsarClient getClient() { + return client; + } +} diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarRunnerDockerTest.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarRunnerDockerTest.java index 5dbca084e..34b7d3dd7 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarRunnerDockerTest.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarRunnerDockerTest.java @@ -15,214 +15,162 @@ */ package ai.langstream.pulsar; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import ai.langstream.AbstractApplicationRunner; -import ai.langstream.api.model.Application; -import ai.langstream.api.model.Connection; -import ai.langstream.api.model.Module; -import ai.langstream.api.model.TopicDefinition; -import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; -import ai.langstream.api.runtime.ClusterRuntimeRegistry; -import ai.langstream.api.runtime.ExecutionPlan; -import ai.langstream.api.runtime.PluginsRegistry; -import ai.langstream.api.runtime.Topic; -import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; -import ai.langstream.impl.deploy.ApplicationDeployer; -import ai.langstream.impl.k8s.tests.KubeTestServer; -import ai.langstream.impl.nar.NarFileHandler; -import ai.langstream.impl.parser.ModelBuilder; -import ai.langstream.runtime.agent.AgentRunner; -import ai.langstream.runtime.agent.api.AgentInfo; -import ai.langstream.runtime.api.agent.RuntimePodConfiguration; -import io.fabric8.kubernetes.api.model.Secret; -import java.time.Duration; -import java.util.List; +import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.Cleanup; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.containers.PulsarContainer; -import org.testcontainers.utility.DockerImageName; @Slf4j -class PulsarRunnerDockerTest { +class PulsarRunnerDockerTest extends AbstractApplicationRunner { - private static final String IMAGE = "apachepulsar/pulsar:3.1.0"; - - private static PulsarContainer pulsarContainer; - - @RegisterExtension static final KubeTestServer kubeServer = new KubeTestServer(); - - private static PulsarAdmin admin; + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); @Test public void testRunAITools() throws Exception { - final String appId = "application"; - kubeServer.spyAgentCustomResources("tenant", appId + "-step1"); - final Map secrets = - kubeServer.spyAgentCustomResourcesSecrets("tenant", appId + "-step1"); - - Application applicationInstance = - ModelBuilder.buildApplicationInstance( - Map.of( - "module.yaml", - """ - module: "module-1" - id: "pipeline-1" - topics: - - name: "input-topic" - creation-mode: create-if-not-exists - - name: "output-topic" - creation-mode: create-if-not-exists - pipeline: - - name: "drop-description" - id: "step1" - type: "drop-fields" - input: "input-topic" - output: "output-topic" - configuration: - fields: - - "description" - """), - buildInstanceYaml(), - null) - .getApplication(); - - @Cleanup - NarFileHandler narFileHandler = - new NarFileHandler( - AbstractApplicationRunner.agentsDirectory, - List.of(), - Thread.currentThread().getContextClassLoader()); - narFileHandler.scan(); - - @Cleanup - ApplicationDeployer deployer = - ApplicationDeployer.builder() - .registry(new ClusterRuntimeRegistry()) - .topicConnectionsRuntimeRegistry( - new TopicConnectionsRuntimeRegistry() - .setPackageLoader(narFileHandler)) - .pluginsRegistry(new PluginsRegistry()) - .build(); - - Module module = applicationInstance.getModule("module-1"); - - ExecutionPlan implementation = deployer.createImplementation(appId, applicationInstance); - assertTrue( - implementation.getConnectionImplementation( - module, - Connection.fromTopic(TopicDefinition.fromName("input-topic"))) - instanceof Topic); - deployer.setup("tenant", implementation); - deployer.deploy("tenant", implementation, null); - assertEquals(1, secrets.size()); - final Secret secret = secrets.values().iterator().next(); - final RuntimePodConfiguration runtimePodConfiguration = - AgentResourcesFactory.readRuntimePodConfigurationFromSecret(secret); - - try (PulsarClient client = - PulsarClient.builder() - .serviceUrl(pulsarContainer.getPulsarBrokerUrl()) - .build(); - Producer producer = - client.newProducer(Schema.STRING).topic("input-topic").create(); - org.apache.pulsar.client.api.Consumer consumer = - client.newConsumer(Schema.STRING) - .topic("output-topic") - .subscriptionName("test") - .subscribe()) { - - // produce one message to the input-topic - producer.newMessage() - .value("{\"name\": \"some name\", \"description\": \"some description\"}") - .key("key") - .properties(Map.of("header-key", "header-value")) - .send(); - producer.flush(); - - AtomicInteger numLoops = new AtomicInteger(); - AgentRunner.runAgent( - runtimePodConfiguration, - null, - AbstractApplicationRunner.agentsDirectory, - new AgentInfo(), - () -> numLoops.incrementAndGet() <= 5, - null, - false, - narFileHandler); - - // receive one message from the output-topic (written by the PodJavaRuntime) - Message record = consumer.receive(); - assertEquals("{\"name\":\"some name\"}", record.getValue()); - assertEquals("header-value", record.getProperties().get("header-key")); + String tenant = "tenant"; + String[] expectedAgents = {"app-step1"}; + String inputTopic = "input-topic-" + UUID.randomUUID(); + String outputTopic = "output-topic-" + UUID.randomUUID(); + + Map application = + Map.of( + "module.yaml", + """ + module: "module-1" + id: "pipeline-1" + topics: + - name: "%s" + creation-mode: create-if-not-exists + - name: "%s" + creation-mode: create-if-not-exists + pipeline: + - name: "drop-description" + id: "step1" + type: "drop-fields" + input: "%s" + output: "%s" + configuration: + fields: + - "description" + """ + .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + try (Producer producer = createProducer(inputTopic); + Consumer consumer = createConsumer(outputTopic)) { + + producer.newMessage() + .value("{\"name\": \"some name\", \"description\": \"some description\"}") + .property("header-key", "header-value") + .send(); + producer.flush(); + + executeAgentRunners(applicationRuntime); + + Message record = consumer.receive(30, TimeUnit.SECONDS); + assertEquals("{\"name\":\"some name\"}", record.getValue().getNativeObject()); + assertEquals("header-value", record.getProperties().get("header-key")); + } } } - private static String buildInstanceYaml() { - return """ - instance: - computeCluster: - type: "kubernetes" - streamingCluster: - type: "pulsar" + @Test + public void testTopicSchema() throws Exception { + String tenant = "topic-schema"; + String[] expectedAgents = {"app-step1"}; + String inputTopic = "input-topic-" + UUID.randomUUID(); + String outputTopic = "output-topic-" + UUID.randomUUID(); + + Map application = + Map.of( + "module.yaml", + """ + module: "module-1" + id: "pipeline-1" + topics: + - name: "%s" + creation-mode: create-if-not-exists + - name: "%s" + creation-mode: create-if-not-exists + schema: + type: "bytes" + pipeline: + - name: "drop-description" + id: "step1" + type: "drop-fields" + input: "%s" + output: "%s" configuration: - admin: - serviceUrl: "%s" - service: - serviceUrl: "%s" - defaultTenant: "public" - defaultNamespace: "default" + fields: + - "description" """ - .formatted( - "http://localhost:" + pulsarContainer.getMappedPort(8080), - "pulsar://localhost:" + pulsarContainer.getMappedPort(6650)); + .formatted(inputTopic, outputTopic, inputTopic, outputTopic)); + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, "app", application, buildInstanceYaml(), expectedAgents)) { + try (Producer producer = createProducer(inputTopic); + Consumer consumer = createConsumer(outputTopic)) { + + producer.newMessage() + .value("{\"name\": \"some name\", \"description\": \"some description\"}") + .send(); + producer.flush(); + + executeAgentRunners(applicationRuntime); + + Message record = consumer.receive(30, TimeUnit.SECONDS); + assertArrayEquals( + "{\"name\":\"some name\"}".getBytes(StandardCharsets.UTF_8), + (byte[]) record.getValue().getNativeObject()); + } + } } - @BeforeAll - public static void setup() throws Exception { - pulsarContainer = - new PulsarContainer( - DockerImageName.parse(IMAGE) - .asCompatibleSubstituteFor("apachepulsar/pulsar")) - .withStartupTimeout( - Duration.ofSeconds(120)) // Mac M1 is slow with Intel docker images - .withLogConsumer( - outputFrame -> - log.info("pulsar> {}", outputFrame.getUtf8String().trim())); - // start Pulsar and wait for it to be ready to accept requests - pulsarContainer.start(); - admin = - PulsarAdmin.builder() - .serviceHttpUrl("http://localhost:" + pulsarContainer.getMappedPort(8080)) - .build(); - - try { - admin.namespaces().createNamespace("public/default"); - } catch (PulsarAdminException.ConflictException exists) { - // ignore - } + private String buildInstanceYaml() { + return """ + instance: + streamingCluster: + type: "pulsar" + configuration: + admin: + serviceUrl: "%s" + service: + serviceUrl: "%s" + defaultTenant: "public" + defaultNamespace: "default" + computeCluster: + type: "kubernetes" + """ + .formatted(pulsarContainer.getHttpServiceUrl(), pulsarContainer.getBrokerUrl()); } - @AfterAll - public static void teardown() { - if (admin != null) { - admin.close(); - } - if (pulsarContainer != null) { - pulsarContainer.close(); - } + protected Producer createProducer(String topic) throws PulsarClientException { + return pulsarContainer.getClient().newProducer(Schema.STRING).topic(topic).create(); + } + + protected Consumer createConsumer(String topic) throws PulsarClientException { + return pulsarContainer + .getClient() + .newConsumer(Schema.AUTO_CONSUME()) + .topic(topic) + .subscriptionName("test-subscription") + .subscribe(); } }