Skip to content

Commit

Permalink
[Feature] Add support for leveraging Pulsar Schema Registry while con…
Browse files Browse the repository at this point in the history
…suming (#42)
  • Loading branch information
eolivelli authored Jun 7, 2022
1 parent d38e2b0 commit 5a82e0e
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -67,12 +68,13 @@ public class PulsarConnectionFactory

private final Map<String, Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final Set<PulsarConnection> connections = Collections.synchronizedSet(new HashSet<>());
private final List<Consumer<byte[]>> consumers = new CopyOnWriteArrayList<>();
private final List<Reader<byte[]>> readers = new CopyOnWriteArrayList<>();
private final List<Consumer<?>> consumers = new CopyOnWriteArrayList<>();
private final List<Reader<?>> readers = new CopyOnWriteArrayList<>();
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private Map<String, Object> producerConfiguration;
private Map<String, Object> consumerConfiguration;
private Schema<?> consumerSchema;
private String systemNamespace = "public/default";
private String defaultClientId = null;
private boolean enableTransaction = false;
Expand Down Expand Up @@ -175,6 +177,10 @@ private synchronized Map<String, Object> getConsumerConfiguration() {
return consumerConfiguration;
}

private synchronized Schema<?> getConsumerSchema() {
return consumerSchema;
}

private synchronized Map<String, Object> getProducerConfiguration() {
return producerConfiguration;
}
Expand Down Expand Up @@ -215,10 +221,16 @@ private synchronized void ensureInitialized() throws JMSException {
this.producerConfiguration = Collections.emptyMap();
}

this.consumerSchema = Schema.BYTES;
Map<String, Object> consumerConfigurationM =
(Map<String, Object>) configuration.remove("consumerConfig");
if (consumerConfigurationM != null) {
this.consumerConfiguration = new HashMap(consumerConfigurationM);
boolean useSchema =
Boolean.parseBoolean(getAndRemoveString("useSchema", "false", consumerConfiguration));
if (useSchema) {
consumerSchema = Schema.AUTO_CONSUME();
}
} else {
this.consumerConfiguration = Collections.emptyMap();
}
Expand Down Expand Up @@ -926,7 +938,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
}
}

public Consumer<byte[]> createConsumer(
public Consumer<?> createConsumer(
PulsarDestination destination,
String consumerName,
int sessionMode,
Expand Down Expand Up @@ -1018,12 +1030,14 @@ public Consumer<byte[]> createConsumer(
}
}

ConsumerBuilder<byte[]> builder =
Schema<?> schema = getConsumerSchema();
Map<String, Object> consumerConfiguration = getConsumerConfiguration();
ConsumerBuilder<?> builder =
pulsarClient
.newConsumer()
.newConsumer(schema)
// these properties can be overridden by the configuration
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.loadConf(getConsumerConfiguration())
.loadConf(consumerConfiguration)
.properties(consumerMetadata)
// these properties cannot be overwritten by the configuration
.subscriptionInitialPosition(initialPosition)
Expand All @@ -1032,7 +1046,7 @@ public Consumer<byte[]> createConsumer(
.subscriptionType(subscriptionType)
.subscriptionName(subscriptionName)
.topic(fullQualifiedTopicName);
Consumer<byte[]> newConsumer = builder.subscribe();
Consumer<?> newConsumer = builder.subscribe();
consumers.add(newConsumer);

return newConsumer;
Expand All @@ -1041,7 +1055,7 @@ public Consumer<byte[]> createConsumer(
}
}

public Reader<byte[]> createReaderForBrowser(PulsarQueue destination) throws JMSException {
public Reader<?> createReaderForBrowser(PulsarQueue destination) throws JMSException {
String fullQualifiedTopicName = getPulsarTopicName(destination);
try {
List<Message<byte[]>> messages =
Expand All @@ -1059,29 +1073,30 @@ public Reader<byte[]> createReaderForBrowser(PulsarQueue destination) throws JMS
if (log.isDebugEnabled()) {
log.debug("createBrowser {} at {}", fullQualifiedTopicName, seekMessageId);
}
ReaderBuilder<byte[]> builder =
Schema<?> schema = getConsumerSchema();
ReaderBuilder<?> builder =
pulsarClient
.newReader()
.newReader(schema)
// these properties can be overridden by the configuration
.loadConf(getConsumerConfiguration())
// these properties cannot be overwritten by the configuration
.readerName("jms-queue-browser-" + UUID.randomUUID())
.startMessageId(seekMessageId)
.startMessageIdInclusive()
.topic(fullQualifiedTopicName);
Reader<byte[]> newReader = builder.create();
Reader<?> newReader = builder.create();
readers.add(newReader);
return newReader;
} catch (PulsarClientException | PulsarAdminException err) {
throw Utils.handleException(err);
}
}

public void removeConsumer(Consumer<byte[]> consumer) {
public void removeConsumer(Consumer<?> consumer) {
consumers.remove(consumer);
}

public void removeReader(Reader<byte[]> reader) {
public void removeReader(Reader<?> reader) {
readers.remove(reader);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.EOFException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand All @@ -44,9 +47,14 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValue;

@Slf4j
public abstract class PulsarMessage implements Message {
Expand All @@ -66,7 +74,7 @@ public abstract class PulsarMessage implements Message {
protected final Map<String, String> properties = new HashMap<>();
private PulsarMessageConsumer consumer;
private boolean negativeAcked;
private org.apache.pulsar.client.api.Message<byte[]> receivedPulsarMessage;
private org.apache.pulsar.client.api.Message<?> receivedPulsarMessage;

/**
* Gets the message ID.
Expand Down Expand Up @@ -1231,34 +1239,97 @@ final void send(
protected abstract void prepareForSend(TypedMessageBuilder<byte[]> producer) throws JMSException;

static PulsarMessage decode(
PulsarMessageConsumer consumer, org.apache.pulsar.client.api.Message<byte[]> msg)
PulsarMessageConsumer consumer, org.apache.pulsar.client.api.Message<?> msg)
throws JMSException {
if (msg == null) {
return null;
}
String type = msg.getProperty("JMSPulsarMessageType");
if (type == null) {
type = "bytes"; // non JMS clients
Object value = msg.getValue();
if (value instanceof byte[] || value == null) {
String type = msg.getProperty("JMSPulsarMessageType");
if (type == null) {
type = "bytes"; // non JMS clients
}
byte[] valueAsArray = (byte[]) value;
switch (type) {
case "map":
return new PulsarMapMessage(valueAsArray).applyMessage(msg, consumer);
case "object":
return new PulsarObjectMessage(valueAsArray).applyMessage(msg, consumer);
case "stream":
return new PulsarStreamMessage(valueAsArray).applyMessage(msg, consumer);
case "bytes":
return new PulsarBytesMessage(valueAsArray).applyMessage(msg, consumer);
case "text":
return new PulsarTextMessage(valueAsArray).applyMessage(msg, consumer);
default:
return new PulsarSimpleMessage().applyMessage(msg, consumer);
}
} else if (value instanceof GenericObject) {
GenericObject genericObject = (GenericObject) value;
Object nativeObject = genericObject.getNativeObject();
Object unwrapped = unwrapNativeObject(nativeObject);
if (unwrapped instanceof String) {
return new PulsarTextMessage((String) unwrapped).applyMessage(msg, consumer);
} else if (unwrapped instanceof Map) {
return new PulsarMapMessage((Map) unwrapped, false).applyMessage(msg, consumer);
} else {
return new PulsarObjectMessage((Serializable) unwrapped).applyMessage(msg, consumer);
}
} else {
throw new IllegalStateException("Cannot decode message, payload type is " + value.getClass());
}
}

private static Object unwrapNativeObject(Object nativeObject) {
if (nativeObject instanceof KeyValue) {
KeyValue keyValue = (KeyValue) nativeObject;
Object keyPart = unwrapNativeObject(keyValue.getKey());
Object valuePart = unwrapNativeObject(keyValue.getValue());
Map<String, Object> result = new HashMap<>();
result.put("key", keyPart);
result.put("value", valuePart);
return result;
}
if (nativeObject instanceof GenericObject) {
return unwrapNativeObject(((GenericObject) nativeObject).getNativeObject());
}
if (nativeObject instanceof GenericRecord) {
return genericRecordToMap((GenericRecord) nativeObject);
}
if (nativeObject instanceof GenericArray) {
return genericArrayToList((GenericArray) nativeObject);
}
byte[] value = msg.getValue();
switch (type) {
case "map":
return new PulsarMapMessage(value).applyMessage(msg, consumer);
case "object":
return new PulsarObjectMessage(value).applyMessage(msg, consumer);
case "stream":
return new PulsarStreamMessage(value).applyMessage(msg, consumer);
case "bytes":
return new PulsarBytesMessage(value).applyMessage(msg, consumer);
case "text":
return new PulsarTextMessage(value).applyMessage(msg, consumer);
default:
return new PulsarSimpleMessage().applyMessage(msg, consumer);
if (nativeObject instanceof Utf8) {
return nativeObject.toString();
}
return nativeObject;
}

private static List<Object> genericArrayToList(GenericArray genericArray) {
List<Object> res = new ArrayList<>();
genericArray.forEach(
fieldValue -> {
res.add(unwrapNativeObject(fieldValue));
});
return res;
}

private static Map<String, Object> genericRecordToMap(GenericRecord genericRecord) {
Map<String, Object> asMap = new HashMap<>();
genericRecord
.getSchema()
.getFields()
.forEach(
f -> {
Object fieldValue = unwrapNativeObject(genericRecord.get(f.name()));
asMap.put(f.name(), fieldValue);
});
return asMap;
}

protected PulsarMessage applyMessage(
org.apache.pulsar.client.api.Message<byte[]> msg, PulsarMessageConsumer consumer) {
org.apache.pulsar.client.api.Message<?> msg, PulsarMessageConsumer consumer) {
this.writable = false;
this.properties.putAll(msg.getProperties());
if (consumer != null) {
Expand Down Expand Up @@ -1387,7 +1458,7 @@ public CompletableFuture<?> acknowledgeInternalInTransaction(Transaction transac
.acknowledgeAsync(receivedPulsarMessage.getMessageId(), transaction);
}

public org.apache.pulsar.client.api.Message<byte[]> getReceivedPulsarMessage() {
public org.apache.pulsar.client.api.Message<?> getReceivedPulsarMessage() {
return receivedPulsarMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class PulsarMessageConsumer implements MessageConsumer, TopicSubscriber,
private SelectorSupport selectorSupport;
private SelectorSupport selectorSupportOnSubscription;
private final boolean noLocal;
private Consumer<byte[]> consumer;
private Consumer<?> consumer;
private MessageListener listener;
private final SubscriptionMode subscriptionMode;
private final SubscriptionType subscriptionType;
Expand Down Expand Up @@ -124,7 +124,7 @@ public PulsarMessageConsumer subscribe() throws JMSException {
}

// Visible for testing
synchronized Consumer<byte[]> getConsumer() throws JMSException {
synchronized Consumer<?> getConsumer() throws JMSException {
if (closed) {
throw new IllegalStateException("Consumer is closed");
}
Expand Down Expand Up @@ -304,8 +304,8 @@ private synchronized Message receiveWithTimeoutAndValidateType(long timeout, Cla
session.executeCriticalOperation(
() -> {
try {
Consumer<byte[]> consumer = getConsumer();
org.apache.pulsar.client.api.Message<byte[]> message =
Consumer<?> consumer = getConsumer();
org.apache.pulsar.client.api.Message<?> message =
consumer.receive(stepTimeout, TimeUnit.MILLISECONDS);
if (message == null) {
return null;
Expand Down Expand Up @@ -338,8 +338,7 @@ public Message receiveNoWait() throws JMSException {
return receive(1);
}

private void skipMessage(org.apache.pulsar.client.api.Message<byte[]> message)
throws JMSException {
private void skipMessage(org.apache.pulsar.client.api.Message<?> message) throws JMSException {
skippedMessages.incrementAndGet();
if (subscriptionType == SubscriptionType.Exclusive
|| session.getFactory().isAcknowledgeRejectedMessages()) {
Expand All @@ -359,7 +358,7 @@ private void skipMessage(org.apache.pulsar.client.api.Message<byte[]> message)
}

private PulsarMessage handleReceivedMessage(
org.apache.pulsar.client.api.Message<byte[]> message,
org.apache.pulsar.client.api.Message<?> message,
Class expectedType,
java.util.function.Consumer<PulsarMessage> listenerCode,
boolean noLocalFilter)
Expand All @@ -368,7 +367,7 @@ private PulsarMessage handleReceivedMessage(
receivedMessages.incrementAndGet();

PulsarMessage result = PulsarMessage.decode(this, message);
Consumer<byte[]> consumer = getConsumer();
Consumer<?> consumer = getConsumer();
if (expectedType != null && !result.isBodyAssignableTo(expectedType)) {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -589,9 +588,9 @@ public JMSConsumer asJMSConsumer() {
}

synchronized void acknowledge(
org.apache.pulsar.client.api.Message<byte[]> receivedPulsarMessage, PulsarMessage message)
org.apache.pulsar.client.api.Message<?> receivedPulsarMessage, PulsarMessage message)
throws JMSException {
Consumer<byte[]> consumer = getConsumer();
Consumer<?> consumer = getConsumer();
try {
consumer.acknowledge(receivedPulsarMessage);
session.unregisterUnacknowledgedMessage(message);
Expand All @@ -614,8 +613,8 @@ synchronized void runListener(int timeout) {
return;
}
try {
Consumer<byte[]> consumer = getConsumer();
org.apache.pulsar.client.api.Message<byte[]> message =
Consumer<?> consumer = getConsumer();
org.apache.pulsar.client.api.Message<?> message =
consumer.receive(timeout, TimeUnit.MILLISECONDS);
if (message == null) {
return;
Expand Down Expand Up @@ -666,7 +665,7 @@ public synchronized boolean isClosedWhileActiveTransaction() {
return closedWhileActiveTransaction;
}

public void negativeAck(org.apache.pulsar.client.api.Message<byte[]> message) {
public void negativeAck(org.apache.pulsar.client.api.Message<?> message) {
if (consumer != null) {
consumer.negativeAcknowledge(message);
}
Expand All @@ -690,7 +689,7 @@ public synchronized SelectorSupport getSelectorSupportOnSubscription() {
return selectorSupportOnSubscription;
}

Consumer<byte[]> getInternalConsumer() {
Consumer<?> getInternalConsumer() {
return consumer;
}

Expand Down
Loading

0 comments on commit 5a82e0e

Please sign in to comment.