Skip to content

Commit

Permalink
Add support for topic schemas in Pulsar (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Oct 18, 2023
1 parent 14a2747 commit 5b6e6bd
Show file tree
Hide file tree
Showing 33 changed files with 625 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
@Slf4j
class KafkaProducerWrapper implements TopicProducer {

final Map<Class<?>, Serializer<?>> BASE_SERIALIZERS =
static final Map<Class<?>, Serializer<?>> BASE_SERIALIZERS =
Map.ofEntries(
entry(String.class, new StringSerializer()),
entry(Boolean.class, new BooleanSerializer()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.pulsar.runner;

import static ai.langstream.pulsar.PulsarClientUtils.buildPulsarAdmin;
import static java.util.Map.entry;

import ai.langstream.api.model.Application;
import ai.langstream.api.model.SchemaDefinition;
Expand All @@ -37,8 +38,16 @@
import ai.langstream.pulsar.PulsarTopic;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -524,19 +533,53 @@ private class PulsarTopicProducer<K> implements TopicProducer {
Producer<K> producer;
Schema<K> schema;

static final Map<Class<?>, Schema<?>> BASE_SCHEMAS =
Map.ofEntries(
entry(String.class, Schema.STRING),
entry(Boolean.class, Schema.BOOL),
entry(Byte.class, Schema.INT8),
entry(Short.class, Schema.INT16),
entry(Integer.class, Schema.INT32),
entry(Long.class, Schema.INT64),
entry(Float.class, Schema.FLOAT),
entry(Double.class, Schema.DOUBLE),
entry(byte[].class, Schema.BYTES),
entry(Date.class, Schema.DATE),
entry(Timestamp.class, Schema.TIMESTAMP),
entry(Time.class, Schema.TIME),
entry(LocalDate.class, Schema.LOCAL_DATE),
entry(LocalTime.class, Schema.LOCAL_TIME),
entry(LocalDateTime.class, Schema.LOCAL_DATE_TIME),
entry(Instant.class, Schema.INSTANT),
entry(ByteBuffer.class, Schema.BYTEBUFFER));

public PulsarTopicProducer(Map<String, Object> configuration) {
this.configuration = configuration;
}

@Override
@SneakyThrows
public void start() {
String topic = (String) configuration.remove("topic");
schema = (Schema<K>) configuration.remove("schema");
if (schema == null) {
schema = (Schema) Schema.STRING;
if (configuration.containsKey("valueSchema")) {
SchemaDefinition valueSchemaDefinition =
mapper.convertValue(
configuration.remove("valueSchema"), SchemaDefinition.class);
Schema<?> valueSchema = Schema.getSchema(getSchemaInfo(valueSchemaDefinition));
if (configuration.containsKey("keySchema")) {
SchemaDefinition keySchemaDefinition =
mapper.convertValue(
configuration.remove("keySchema"), SchemaDefinition.class);
Schema<?> keySchema = Schema.getSchema(getSchemaInfo(keySchemaDefinition));
schema = (Schema<K>) Schema.KeyValue(keySchema, valueSchema);
} else {
schema = (Schema<K>) valueSchema;
}
producer =
client.newProducer(schema)
.topic((String) configuration.remove("topic"))
.loadConf(configuration)
.create();
}
producer = client.newProducer(schema).topic(topic).loadConf(configuration).create();
}

@Override
Expand All @@ -552,9 +595,39 @@ public void close() {
}
}

private Schema<?> getSchema(Class<?> klass) {
Schema<?> schema = BASE_SCHEMAS.get(klass);
if (schema == null) {
throw new IllegalArgumentException("Cannot infer schema for " + klass);
}
return schema;
}

@Override
public CompletableFuture<?> write(Record r) {
totalIn.addAndGet(1);
if (schema == null) {
try {
if (r.value() == null) {
throw new IllegalStateException(
"Cannot infer schema because value is null");
}
Schema<?> valueSchema = getSchema(r.value().getClass());
if (r.key() != null) {
Schema<?> keySchema = getSchema(r.key().getClass());
schema = (Schema<K>) Schema.KeyValue(keySchema, valueSchema);
} else {
schema = (Schema<K>) valueSchema;
}
producer =
client.newProducer(schema)
.topic((String) configuration.remove("topic"))
.loadConf(configuration)
.create();
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

log.info("Writing message {}", r);
// TODO: handle KV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ public Map<String, Object> createProducerConfiguration(
PulsarTopic pulsarTopic = (PulsarTopic) outputConnectionImplementation;

Map<String, Object> configuration = new HashMap<>();
// TODO: handle other configurations and schema
// TODO: handle other configurations

configuration.put("topic", pulsarTopic.name().toPulsarName());
if (pulsarTopic.keySchema() != null) {
configuration.put("keySchema", pulsarTopic.keySchema());
}
if (pulsarTopic.valueSchema() != null) {
configuration.put("valueSchema", pulsarTopic.valueSchema());
}
return configuration;
}

Expand Down
Loading

0 comments on commit 5b6e6bd

Please sign in to comment.