From 5a82e0e03c20cad02a84325907c951e9ffca9f3a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Jun 2022 08:48:38 +0200 Subject: [PATCH] [Feature] Add support for leveraging Pulsar Schema Registry while consuming (#42) --- .../pulsar/jms/PulsarConnectionFactory.java | 41 ++-- .../oss/pulsar/jms/PulsarMessage.java | 113 +++++++-- .../oss/pulsar/jms/PulsarMessageConsumer.java | 27 ++- .../oss/pulsar/jms/PulsarQueueBrowser.java | 2 +- .../oss/pulsar/jms/PulsarSession.java | 2 +- .../pulsar/jms/messages/PulsarMapMessage.java | 11 +- .../oss/pulsar/jms/ConfigurationTest.java | 2 +- .../oss/pulsar/jms/PulsarInteropTest.java | 216 ++++++++++++++++++ .../oss/pulsar/jms/TransactionsTest.java | 4 +- 9 files changed, 363 insertions(+), 55 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 91cf3e03..ae70074b 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; @@ -67,12 +68,13 @@ public class PulsarConnectionFactory private final Map> producers = new ConcurrentHashMap<>(); private final Set connections = Collections.synchronizedSet(new HashSet<>()); - private final List> consumers = new CopyOnWriteArrayList<>(); - private final List> readers = new CopyOnWriteArrayList<>(); + private final List> consumers = new CopyOnWriteArrayList<>(); + private final List> readers = new CopyOnWriteArrayList<>(); private PulsarClient pulsarClient; private PulsarAdmin pulsarAdmin; private Map producerConfiguration; private Map consumerConfiguration; + private Schema consumerSchema; private String systemNamespace = "public/default"; private String defaultClientId = null; private boolean enableTransaction = false; @@ -175,6 +177,10 @@ private synchronized Map getConsumerConfiguration() { return consumerConfiguration; } + private synchronized Schema getConsumerSchema() { + return consumerSchema; + } + private synchronized Map getProducerConfiguration() { return producerConfiguration; } @@ -215,10 +221,16 @@ private synchronized void ensureInitialized() throws JMSException { this.producerConfiguration = Collections.emptyMap(); } + this.consumerSchema = Schema.BYTES; Map consumerConfigurationM = (Map) configuration.remove("consumerConfig"); if (consumerConfigurationM != null) { this.consumerConfiguration = new HashMap(consumerConfigurationM); + boolean useSchema = + Boolean.parseBoolean(getAndRemoveString("useSchema", "false", consumerConfiguration)); + if (useSchema) { + consumerSchema = Schema.AUTO_CONSUME(); + } } else { this.consumerConfiguration = Collections.emptyMap(); } @@ -926,7 +938,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc } } - public Consumer createConsumer( + public Consumer createConsumer( PulsarDestination destination, String consumerName, int sessionMode, @@ -1018,12 +1030,14 @@ public Consumer createConsumer( } } - ConsumerBuilder builder = + Schema schema = getConsumerSchema(); + Map consumerConfiguration = getConsumerConfiguration(); + ConsumerBuilder builder = pulsarClient - .newConsumer() + .newConsumer(schema) // these properties can be overridden by the configuration .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) - .loadConf(getConsumerConfiguration()) + .loadConf(consumerConfiguration) .properties(consumerMetadata) // these properties cannot be overwritten by the configuration .subscriptionInitialPosition(initialPosition) @@ -1032,7 +1046,7 @@ public Consumer createConsumer( .subscriptionType(subscriptionType) .subscriptionName(subscriptionName) .topic(fullQualifiedTopicName); - Consumer newConsumer = builder.subscribe(); + Consumer newConsumer = builder.subscribe(); consumers.add(newConsumer); return newConsumer; @@ -1041,7 +1055,7 @@ public Consumer createConsumer( } } - public Reader createReaderForBrowser(PulsarQueue destination) throws JMSException { + public Reader createReaderForBrowser(PulsarQueue destination) throws JMSException { String fullQualifiedTopicName = getPulsarTopicName(destination); try { List> messages = @@ -1059,9 +1073,10 @@ public Reader createReaderForBrowser(PulsarQueue destination) throws JMS if (log.isDebugEnabled()) { log.debug("createBrowser {} at {}", fullQualifiedTopicName, seekMessageId); } - ReaderBuilder builder = + Schema schema = getConsumerSchema(); + ReaderBuilder builder = pulsarClient - .newReader() + .newReader(schema) // these properties can be overridden by the configuration .loadConf(getConsumerConfiguration()) // these properties cannot be overwritten by the configuration @@ -1069,7 +1084,7 @@ public Reader createReaderForBrowser(PulsarQueue destination) throws JMS .startMessageId(seekMessageId) .startMessageIdInclusive() .topic(fullQualifiedTopicName); - Reader newReader = builder.create(); + Reader newReader = builder.create(); readers.add(newReader); return newReader; } catch (PulsarClientException | PulsarAdminException err) { @@ -1077,11 +1092,11 @@ public Reader createReaderForBrowser(PulsarQueue destination) throws JMS } } - public void removeConsumer(Consumer consumer) { + public void removeConsumer(Consumer consumer) { consumers.remove(consumer); } - public void removeReader(Reader reader) { + public void removeReader(Reader reader) { readers.remove(reader); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java index 8f5f21da..8d04eeda 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java @@ -23,11 +23,14 @@ import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.EOFException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -44,9 +47,14 @@ import javax.jms.MessageProducer; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.schema.KeyValue; @Slf4j public abstract class PulsarMessage implements Message { @@ -66,7 +74,7 @@ public abstract class PulsarMessage implements Message { protected final Map properties = new HashMap<>(); private PulsarMessageConsumer consumer; private boolean negativeAcked; - private org.apache.pulsar.client.api.Message receivedPulsarMessage; + private org.apache.pulsar.client.api.Message receivedPulsarMessage; /** * Gets the message ID. @@ -1231,34 +1239,97 @@ final void send( protected abstract void prepareForSend(TypedMessageBuilder producer) throws JMSException; static PulsarMessage decode( - PulsarMessageConsumer consumer, org.apache.pulsar.client.api.Message msg) + PulsarMessageConsumer consumer, org.apache.pulsar.client.api.Message msg) throws JMSException { if (msg == null) { return null; } - String type = msg.getProperty("JMSPulsarMessageType"); - if (type == null) { - type = "bytes"; // non JMS clients + Object value = msg.getValue(); + if (value instanceof byte[] || value == null) { + String type = msg.getProperty("JMSPulsarMessageType"); + if (type == null) { + type = "bytes"; // non JMS clients + } + byte[] valueAsArray = (byte[]) value; + switch (type) { + case "map": + return new PulsarMapMessage(valueAsArray).applyMessage(msg, consumer); + case "object": + return new PulsarObjectMessage(valueAsArray).applyMessage(msg, consumer); + case "stream": + return new PulsarStreamMessage(valueAsArray).applyMessage(msg, consumer); + case "bytes": + return new PulsarBytesMessage(valueAsArray).applyMessage(msg, consumer); + case "text": + return new PulsarTextMessage(valueAsArray).applyMessage(msg, consumer); + default: + return new PulsarSimpleMessage().applyMessage(msg, consumer); + } + } else if (value instanceof GenericObject) { + GenericObject genericObject = (GenericObject) value; + Object nativeObject = genericObject.getNativeObject(); + Object unwrapped = unwrapNativeObject(nativeObject); + if (unwrapped instanceof String) { + return new PulsarTextMessage((String) unwrapped).applyMessage(msg, consumer); + } else if (unwrapped instanceof Map) { + return new PulsarMapMessage((Map) unwrapped, false).applyMessage(msg, consumer); + } else { + return new PulsarObjectMessage((Serializable) unwrapped).applyMessage(msg, consumer); + } + } else { + throw new IllegalStateException("Cannot decode message, payload type is " + value.getClass()); + } + } + + private static Object unwrapNativeObject(Object nativeObject) { + if (nativeObject instanceof KeyValue) { + KeyValue keyValue = (KeyValue) nativeObject; + Object keyPart = unwrapNativeObject(keyValue.getKey()); + Object valuePart = unwrapNativeObject(keyValue.getValue()); + Map result = new HashMap<>(); + result.put("key", keyPart); + result.put("value", valuePart); + return result; + } + if (nativeObject instanceof GenericObject) { + return unwrapNativeObject(((GenericObject) nativeObject).getNativeObject()); + } + if (nativeObject instanceof GenericRecord) { + return genericRecordToMap((GenericRecord) nativeObject); + } + if (nativeObject instanceof GenericArray) { + return genericArrayToList((GenericArray) nativeObject); } - byte[] value = msg.getValue(); - switch (type) { - case "map": - return new PulsarMapMessage(value).applyMessage(msg, consumer); - case "object": - return new PulsarObjectMessage(value).applyMessage(msg, consumer); - case "stream": - return new PulsarStreamMessage(value).applyMessage(msg, consumer); - case "bytes": - return new PulsarBytesMessage(value).applyMessage(msg, consumer); - case "text": - return new PulsarTextMessage(value).applyMessage(msg, consumer); - default: - return new PulsarSimpleMessage().applyMessage(msg, consumer); + if (nativeObject instanceof Utf8) { + return nativeObject.toString(); } + return nativeObject; + } + + private static List genericArrayToList(GenericArray genericArray) { + List res = new ArrayList<>(); + genericArray.forEach( + fieldValue -> { + res.add(unwrapNativeObject(fieldValue)); + }); + return res; + } + + private static Map genericRecordToMap(GenericRecord genericRecord) { + Map asMap = new HashMap<>(); + genericRecord + .getSchema() + .getFields() + .forEach( + f -> { + Object fieldValue = unwrapNativeObject(genericRecord.get(f.name())); + asMap.put(f.name(), fieldValue); + }); + return asMap; } protected PulsarMessage applyMessage( - org.apache.pulsar.client.api.Message msg, PulsarMessageConsumer consumer) { + org.apache.pulsar.client.api.Message msg, PulsarMessageConsumer consumer) { this.writable = false; this.properties.putAll(msg.getProperties()); if (consumer != null) { @@ -1387,7 +1458,7 @@ public CompletableFuture acknowledgeInternalInTransaction(Transaction transac .acknowledgeAsync(receivedPulsarMessage.getMessageId(), transaction); } - public org.apache.pulsar.client.api.Message getReceivedPulsarMessage() { + public org.apache.pulsar.client.api.Message getReceivedPulsarMessage() { return receivedPulsarMessage; } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java index f663d934..26a144fc 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java @@ -51,7 +51,7 @@ public class PulsarMessageConsumer implements MessageConsumer, TopicSubscriber, private SelectorSupport selectorSupport; private SelectorSupport selectorSupportOnSubscription; private final boolean noLocal; - private Consumer consumer; + private Consumer consumer; private MessageListener listener; private final SubscriptionMode subscriptionMode; private final SubscriptionType subscriptionType; @@ -124,7 +124,7 @@ public PulsarMessageConsumer subscribe() throws JMSException { } // Visible for testing - synchronized Consumer getConsumer() throws JMSException { + synchronized Consumer getConsumer() throws JMSException { if (closed) { throw new IllegalStateException("Consumer is closed"); } @@ -304,8 +304,8 @@ private synchronized Message receiveWithTimeoutAndValidateType(long timeout, Cla session.executeCriticalOperation( () -> { try { - Consumer consumer = getConsumer(); - org.apache.pulsar.client.api.Message message = + Consumer consumer = getConsumer(); + org.apache.pulsar.client.api.Message message = consumer.receive(stepTimeout, TimeUnit.MILLISECONDS); if (message == null) { return null; @@ -338,8 +338,7 @@ public Message receiveNoWait() throws JMSException { return receive(1); } - private void skipMessage(org.apache.pulsar.client.api.Message message) - throws JMSException { + private void skipMessage(org.apache.pulsar.client.api.Message message) throws JMSException { skippedMessages.incrementAndGet(); if (subscriptionType == SubscriptionType.Exclusive || session.getFactory().isAcknowledgeRejectedMessages()) { @@ -359,7 +358,7 @@ private void skipMessage(org.apache.pulsar.client.api.Message message) } private PulsarMessage handleReceivedMessage( - org.apache.pulsar.client.api.Message message, + org.apache.pulsar.client.api.Message message, Class expectedType, java.util.function.Consumer listenerCode, boolean noLocalFilter) @@ -368,7 +367,7 @@ private PulsarMessage handleReceivedMessage( receivedMessages.incrementAndGet(); PulsarMessage result = PulsarMessage.decode(this, message); - Consumer consumer = getConsumer(); + Consumer consumer = getConsumer(); if (expectedType != null && !result.isBodyAssignableTo(expectedType)) { if (log.isDebugEnabled()) { log.debug( @@ -589,9 +588,9 @@ public JMSConsumer asJMSConsumer() { } synchronized void acknowledge( - org.apache.pulsar.client.api.Message receivedPulsarMessage, PulsarMessage message) + org.apache.pulsar.client.api.Message receivedPulsarMessage, PulsarMessage message) throws JMSException { - Consumer consumer = getConsumer(); + Consumer consumer = getConsumer(); try { consumer.acknowledge(receivedPulsarMessage); session.unregisterUnacknowledgedMessage(message); @@ -614,8 +613,8 @@ synchronized void runListener(int timeout) { return; } try { - Consumer consumer = getConsumer(); - org.apache.pulsar.client.api.Message message = + Consumer consumer = getConsumer(); + org.apache.pulsar.client.api.Message message = consumer.receive(timeout, TimeUnit.MILLISECONDS); if (message == null) { return; @@ -666,7 +665,7 @@ public synchronized boolean isClosedWhileActiveTransaction() { return closedWhileActiveTransaction; } - public void negativeAck(org.apache.pulsar.client.api.Message message) { + public void negativeAck(org.apache.pulsar.client.api.Message message) { if (consumer != null) { consumer.negativeAcknowledge(message); } @@ -690,7 +689,7 @@ public synchronized SelectorSupport getSelectorSupportOnSubscription() { return selectorSupportOnSubscription; } - Consumer getInternalConsumer() { + Consumer getInternalConsumer() { return consumer; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueueBrowser.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueueBrowser.java index db275b49..d936667c 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueueBrowser.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarQueueBrowser.java @@ -30,7 +30,7 @@ final class PulsarQueueBrowser implements QueueBrowser { private final PulsarSession session; private final PulsarQueue queue; - private final Reader reader; + private final Reader reader; private final SelectorSupport selectorSupport; public PulsarQueueBrowser(PulsarSession session, Queue queue, String selector) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java index 1f17a20f..576ae488 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java @@ -1591,7 +1591,7 @@ public void unregisterUnacknowledgedMessage(PulsarMessage result) { } public void removeConsumer(PulsarMessageConsumer consumer) { - Consumer pulsarConsumer = consumer.getInternalConsumer(); + Consumer pulsarConsumer = consumer.getInternalConsumer(); if (pulsarConsumer != null) { consumers.remove(consumer); getFactory().removeConsumer(pulsarConsumer); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java index 0da9fc6b..0fddf659 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarMapMessage.java @@ -40,11 +40,18 @@ public PulsarMapMessage() { } public PulsarMapMessage(Map body) throws MessageFormatException { + this(body, true); + } + + public PulsarMapMessage(Map body, boolean validate) + throws MessageFormatException { this(); if (body != null) { map.putAll(body); - for (Object value : body.values()) { - validateWritableObject(value); + if (validate) { + for (Object value : body.values()) { + validateWritableObject(value); + } } } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java index 746bd259..bb39673a 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConfigurationTest.java @@ -75,7 +75,7 @@ public void customizeConsumerTest() throws Exception { PulsarSession session = connection.createSession()) { Queue queue = session.createQueue("test" + UUID.randomUUID()); try (PulsarMessageConsumer consumer = session.createConsumer(queue); ) { - Consumer pulsarConsumer = consumer.getConsumer(); + Consumer pulsarConsumer = consumer.getConsumer(); assertEquals("the-consumer-name", pulsarConsumer.getConsumerName()); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java index 83126067..cd0fd066 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarInteropTest.java @@ -21,7 +21,9 @@ import com.datastax.oss.pulsar.jms.utils.PulsarCluster; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.jms.BytesMessage; @@ -29,14 +31,19 @@ import javax.jms.Destination; import javax.jms.JMSConsumer; import javax.jms.JMSContext; +import javax.jms.MapMessage; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; +import lombok.Data; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -125,4 +132,213 @@ public void sendFromPulsarClientReceiveWithJMS() throws Exception { } } } + + @Test + public void stringSchemaTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + Map consumerConfig = new HashMap<>(); + properties.put("consumerConfig", consumerConfig); + consumerConfig.put("useSchema", true); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (JMSContext context = factory.createContext()) { + + String topic = "persistent://public/default/test-" + UUID.randomUUID(); + Destination destination = context.createTopic(topic); + + PulsarClient client = + cluster + .getService() + .getClient(); // do not close this client, it is internal to the broker + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.STRING).topic(topic).create(); ) { + producer.newMessage().value("foo").key("bar").send(); + + // the JMS client reads Schema String as TextMessage + TextMessage message = (TextMessage) consumer.receive(); + assertEquals("foo", message.getText()); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + } + } + } + } + } + + @Test + public void longSchemaTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + Map consumerConfig = new HashMap<>(); + properties.put("consumerConfig", consumerConfig); + consumerConfig.put("useSchema", true); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (JMSContext context = factory.createContext()) { + + String topic = "persistent://public/default/test-" + UUID.randomUUID(); + Destination destination = context.createTopic(topic); + + PulsarClient client = + cluster + .getService() + .getClient(); // do not close this client, it is internal to the broker + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.INT64).topic(topic).create(); ) { + producer.newMessage().value(23432424L).key("bar").send(); + + // the JMS client reads Schema INT64 as ObjectMessage + ObjectMessage message = (ObjectMessage) consumer.receive(); + assertEquals(23432424L, message.getObject()); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + } + } + } + } + } + + @Data + static final class Nested { + int age; + Pojo pojo; + } + + @Data + static final class Pojo { + String name; + Nested nested; + List nestedList; + } + + @Test + public void avroSchemaTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + Map consumerConfig = new HashMap<>(); + properties.put("consumerConfig", consumerConfig); + consumerConfig.put("useSchema", true); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (JMSContext context = factory.createContext()) { + + String topic = "persistent://public/default/test-" + UUID.randomUUID(); + Destination destination = context.createTopic(topic); + + PulsarClient client = + cluster + .getService() + .getClient(); // do not close this client, it is internal to the broker + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer producer = + client.newProducer(Schema.AVRO(Pojo.class)).topic(topic).create(); ) { + Pojo pojo = new Pojo(); + pojo.setName("foo"); + Nested nested = new Nested(); + nested.setAge(23); + + Pojo pojo2 = new Pojo(); + pojo2.setName("foo2"); + nested.setPojo(pojo2); + + pojo.setNested(nested); + pojo.setNestedList(Arrays.asList(nested)); + producer.newMessage().value(pojo).key("bar").send(); + + // the JMS client reads Schema AVRO as TextMessage + MapMessage message = (MapMessage) consumer.receive(); + assertEquals("foo", message.getString("name")); + Map nestedValue = (Map) message.getObject("nested"); + assertEquals(23, nestedValue.get("age")); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + Map nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + + List> nestedValueList = + (List>) message.getObject("nestedList"); + nestedValue = nestedValueList.get(0); + assertEquals(23, nestedValue.get("age")); + assertEquals("bar", message.getStringProperty("JMSXGroupID")); + nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + } + } + } + } + } + + @Test + public void avroKeyValueSchemaTest() throws Exception { + + Map properties = new HashMap<>(); + properties.put("webServiceUrl", cluster.getAddress()); + Map consumerConfig = new HashMap<>(); + properties.put("consumerConfig", consumerConfig); + consumerConfig.put("useSchema", true); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (JMSContext context = factory.createContext()) { + + String topic = "persistent://public/default/test-" + UUID.randomUUID(); + Destination destination = context.createTopic(topic); + + PulsarClient client = + cluster + .getService() + .getClient(); // do not close this client, it is internal to the broker + try (JMSConsumer consumer = context.createConsumer(destination)) { + try (Producer> producer = + client + .newProducer( + Schema.KeyValue( + Schema.AVRO(Nested.class), + Schema.AVRO(Pojo.class), + KeyValueEncodingType.INLINE)) + .topic(topic) + .create(); ) { + Pojo pojo = new Pojo(); + pojo.setName("foo"); + Nested nested = new Nested(); + nested.setAge(23); + + Pojo pojo2 = new Pojo(); + pojo2.setName("foo2"); + nested.setPojo(pojo2); + + pojo.setNested(nested); + pojo.setNestedList(Arrays.asList(nested)); + + KeyValue keyValue = new KeyValue<>(nested, pojo); + + producer.newMessage().value(keyValue).send(); + + // the JMS client reads Schema AVRO as TextMessage + MapMessage message = (MapMessage) consumer.receive(); + + Map key = (Map) message.getObject("key"); + assertEquals(23, key.get("age")); + + Map value = (Map) message.getObject("value"); + + assertEquals("foo", value.get("name")); + Map nestedValue = (Map) value.get("nested"); + assertEquals(23, nestedValue.get("age")); + Map nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + + List> nestedValueList = + (List>) value.get("nestedList"); + nestedValue = nestedValueList.get(0); + assertEquals(23, nestedValue.get("age")); + nestedPojo = (Map) nestedValue.get("pojo"); + assertEquals("foo2", nestedPojo.get("name")); + } + } + } + } + } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index f3659597..b93cbee2 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -672,13 +672,13 @@ public void onException(Message message, Exception e) {} // verify that the two messages are part of the same batch PulsarMessage message1 = (PulsarMessage) consumer.receive(); - org.apache.pulsar.client.api.Message receivedPulsarMessage1 = + org.apache.pulsar.client.api.Message receivedPulsarMessage1 = message1.getReceivedPulsarMessage(); BatchMessageIdImpl messageId1 = (BatchMessageIdImpl) receivedPulsarMessage1.getMessageId(); PulsarMessage message2 = (PulsarMessage) consumer.receive(); - org.apache.pulsar.client.api.Message receivedPulsarMessage2 = + org.apache.pulsar.client.api.Message receivedPulsarMessage2 = message2.getReceivedPulsarMessage(); BatchMessageIdImpl messageId2 = (BatchMessageIdImpl) receivedPulsarMessage2.getMessageId();