Skip to content

Commit

Permalink
Add support for topic schemas in Pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Oct 18, 2023
1 parent fb7b797 commit 72861c2
Show file tree
Hide file tree
Showing 32 changed files with 561 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
@Slf4j
class KafkaProducerWrapper implements TopicProducer {

final Map<Class<?>, Serializer<?>> BASE_SERIALIZERS =
static final Map<Class<?>, Serializer<?>> BASE_SERIALIZERS =
Map.ofEntries(
entry(String.class, new StringSerializer()),
entry(Boolean.class, new BooleanSerializer()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,21 @@ public PulsarTopicProducer(Map<String, Object> configuration) {
@SneakyThrows
public void start() {
String topic = (String) configuration.remove("topic");
schema = (Schema<K>) configuration.remove("schema");
if (configuration.containsKey("valueSchema")) {
SchemaDefinition valueSchemaDefinition =
mapper.convertValue(
configuration.remove("valueSchema"), SchemaDefinition.class);
Schema<?> valueSchema = Schema.getSchema(getSchemaInfo(valueSchemaDefinition));
if (configuration.containsKey("keySchema")) {
SchemaDefinition keySchemaDefinition =
mapper.convertValue(
configuration.remove("keySchema"), SchemaDefinition.class);
Schema<?> keySchema = Schema.getSchema(getSchemaInfo(keySchemaDefinition));
schema = (Schema<K>) Schema.KeyValue(keySchema, valueSchema);
} else {
schema = (Schema<K>) valueSchema;
}
}
if (schema == null) {
schema = (Schema) Schema.STRING;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ public Map<String, Object> createProducerConfiguration(
PulsarTopic pulsarTopic = (PulsarTopic) outputConnectionImplementation;

Map<String, Object> configuration = new HashMap<>();
// TODO: handle other configurations and schema
// TODO: handle other configurations

configuration.put("topic", pulsarTopic.name().toPulsarName());
if (pulsarTopic.keySchema() != null) {
configuration.put("keySchema", pulsarTopic.keySchema());
}
if (pulsarTopic.valueSchema() != null) {
configuration.put("valueSchema", pulsarTopic.valueSchema());
}
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
*/
package ai.langstream;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import ai.langstream.api.model.Application;
Expand All @@ -32,16 +28,13 @@
import ai.langstream.impl.k8s.tests.KubeTestServer;
import ai.langstream.impl.nar.NarFileHandler;
import ai.langstream.impl.parser.ModelBuilder;
import ai.langstream.kafka.extensions.KafkaContainerExtension;
import ai.langstream.runtime.agent.AgentRunner;
import ai.langstream.runtime.agent.api.AgentInfo;
import ai.langstream.runtime.api.agent.RuntimePodConfiguration;
import io.fabric8.kubernetes.api.model.Secret;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -52,15 +45,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -78,9 +63,6 @@ public abstract class AbstractApplicationRunner {

@RegisterExtension protected static final KubeTestServer kubeServer = new KubeTestServer();

@RegisterExtension
protected static final KafkaContainerExtension kafkaContainer = new KafkaContainerExtension();

protected static ApplicationDeployer applicationDeployer;
private static NarFileHandler narFileHandler;
private static TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry;
Expand Down Expand Up @@ -156,28 +138,6 @@ protected ApplicationRuntime deployApplicationWithSecrets(
return new ApplicationRuntime(tenant, appId, applicationInstance, implementation, secrets);
}

protected String buildInstanceYaml() {
String inputTopic = "input-topic-" + UUID.randomUUID();
String outputTopic = "output-topic-" + UUID.randomUUID();
String streamTopic = "stream-topic-" + UUID.randomUUID();
return """
instance:
globals:
input-topic: %s
output-topic: %s
stream-topic: %s
streamingCluster:
type: "kafka"
configuration:
admin:
bootstrap.servers: "%s"
computeCluster:
type: "kubernetes"
"""
.formatted(
inputTopic, outputTopic, streamTopic, kafkaContainer.getBootstrapServers());
}

@BeforeAll
public static void setup() throws Exception {
Path codeDirectory = Paths.get("target/test-jdbc-drivers");
Expand All @@ -200,121 +160,6 @@ public static void setup() throws Exception {
.build();
}

protected KafkaProducer<String, String> createProducer() {
return new KafkaProducer<>(
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer",
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"));
}

protected void sendMessage(String topic, Object content, KafkaProducer producer)
throws Exception {
sendMessage(topic, content, List.of(), producer);
}

protected void sendMessage(
String topic, Object content, List<Header> headers, KafkaProducer producer)
throws Exception {
sendMessage(topic, "key", content, headers, producer);
}

protected void sendMessage(
String topic, Object key, Object content, List<Header> headers, KafkaProducer producer)
throws Exception {
producer.send(
new ProducerRecord<>(
topic, null, System.currentTimeMillis(), key, content, headers))
.get();
producer.flush();
}

protected List<ConsumerRecord> waitForMessages(KafkaConsumer consumer, List<?> expected) {
List<ConsumerRecord> result = new ArrayList<>();
List<Object> received = new ArrayList<>();

Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, String> poll =
consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord record : poll) {
log.info("Received message {}", record);
received.add(record.value());
result.add(record);
}
log.info("Result: {}", received);
received.forEach(r -> log.info("Received |{}|", r));

assertEquals(expected.size(), received.size());
for (int i = 0; i < expected.size(); i++) {
Object expectedValue = expected.get(i);
Object actualValue = received.get(i);
if (expectedValue instanceof Consumer fn) {
fn.accept(actualValue);
} else if (expectedValue instanceof byte[]) {
assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue);
} else {
log.info("expected: {}", expectedValue);
log.info("got: {}", actualValue);
assertEquals(expectedValue, actualValue);
}
}
});

return result;
}

protected List<ConsumerRecord> waitForMessagesInAnyOrder(
KafkaConsumer consumer, Collection<String> expected) {
List<ConsumerRecord> result = new ArrayList<>();
List<Object> received = new ArrayList<>();

Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(
() -> {
ConsumerRecords<String, String> poll =
consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord record : poll) {
log.info("Received message {}", record);
received.add(record.value());
result.add(record);
}
log.info("Result: {}", received);
received.forEach(r -> log.info("Received |{}|", r));

assertEquals(expected.size(), received.size());
for (Object expectedValue : expected) {
// this doesn't work for byte[]
assertFalse(expectedValue instanceof byte[]);
assertTrue(
received.contains(expectedValue),
"Expected value "
+ expectedValue
+ " not found in "
+ received);
}

for (Object receivedValue : received) {
// this doesn't work for byte[]
assertFalse(receivedValue instanceof byte[]);
assertTrue(
expected.contains(receivedValue),
"Received value "
+ receivedValue
+ " not found in "
+ expected);
}
});

return result;
}

public record AgentRunResult(Map<String, AgentInfo> info) {}

protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws Exception {
Expand All @@ -341,7 +186,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws
});
// execute all the pods
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture> futures = new ArrayList<>();
List<CompletableFuture<?>> futures = new ArrayList<>();
for (RuntimePodConfiguration podConfiguration : pods) {
CompletableFuture<?> handle = new CompletableFuture<>();
futures.add(handle);
Expand Down Expand Up @@ -419,84 +264,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws
return new AgentRunResult(allAgentsInfo);
}

private volatile boolean validateConsumerOffsets = true;

public boolean isValidateConsumerOffsets() {
return validateConsumerOffsets;
}

public void setValidateConsumerOffsets(boolean validateConsumerOffsets) {
this.validateConsumerOffsets = validateConsumerOffsets;
}

private void validateAgentInfoBeforeStop(AgentInfo agentInfo) {
if (!validateConsumerOffsets) {
return;
}
agentInfo
.serveWorkerStatus()
.forEach(
workerStatus -> {
String agentType = workerStatus.getAgentType();
log.info("Checking Agent type {}", agentType);
switch (agentType) {
case "topic-source":
Map<String, Object> info = workerStatus.getInfo();
log.info("Topic source info {}", info);
Map<String, Object> consumerInfo =
(Map<String, Object>) info.get("consumer");
if (consumerInfo != null) {
Map<String, Object> committedOffsets =
(Map<String, Object>)
consumerInfo.get("committedOffsets");
log.info("Committed offsets {}", committedOffsets);
committedOffsets.forEach(
(topic, offset) -> {
assertNotNull(offset);
assertTrue(((Number) offset).intValue() >= 0);
});
Map<String, Object> uncommittedOffsets =
(Map<String, Object>)
consumerInfo.get("uncommittedOffsets");
log.info("Uncommitted offsets {}", uncommittedOffsets);
uncommittedOffsets.forEach(
(topic, number) -> {
assertNotNull(number);
assertTrue(
((Number) number).intValue() <= 0,
"for topic "
+ topic
+ " we have some uncommitted offsets: "
+ number);
});
}
default:
// ignore
}
});
}

protected KafkaConsumer<String, String> createConsumer(String topic) {
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id",
"testgroup-" + UUID.randomUUID(),
"auto.offset.reset",
"earliest"));
consumer.subscribe(List.of(topic));
return consumer;
}

protected static AdminClient getKafkaAdmin() {
return kafkaContainer.getAdmin();
}
protected void validateAgentInfoBeforeStop(AgentInfo agentInfo) {}

@AfterAll
public static void teardown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import ai.langstream.AbstractApplicationRunner;
import ai.langstream.api.model.AssetDefinition;
import ai.langstream.api.runtime.ExecutionPlan;
import ai.langstream.kafka.AbstractKafkaApplicationRunner;
import ai.langstream.mockagents.MockAssetManagerCodeProvider;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

@Slf4j
class DeployAssetsTest extends AbstractApplicationRunner {
class DeployAssetsTest extends AbstractKafkaApplicationRunner {

@Test
public void testDeployAsset() throws Exception {
Expand Down
Loading

0 comments on commit 72861c2

Please sign in to comment.