diff --git a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarConsumer.java b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarConsumer.java index e183463..e5c60a0 100644 --- a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarConsumer.java +++ b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarConsumer.java @@ -100,7 +100,7 @@ public void received(final Consumer consumer, final Message mess final Map data = new HashMap(); data.put("callback", new Callback(message.getMessageId())); data.put("properties", message.getProperties()); - data.put("message", new String(message.getData())); + data.put("message", new String(message.getData(), ENCODING_CHARSET)); this.getGateway().handleMessage(data); } catch (Throwable t) diff --git a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarExchanger.java b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarExchanger.java index a327899..c2cfca9 100644 --- a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarExchanger.java +++ b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarExchanger.java @@ -15,6 +15,8 @@ */ package org.primeoservices.cfgateway.pulsar; +import java.nio.charset.Charset; + import org.apache.pulsar.client.api.PulsarClientException; /** @@ -24,6 +26,8 @@ */ public abstract class PulsarExchanger { + protected static final Charset ENCODING_CHARSET = Charset.forName("UTF-8"); + private PulsarGateway gateway; private Logger log; diff --git a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarProducer.java b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarProducer.java index 70f1f37..9e84709 100644 --- a/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarProducer.java +++ b/src/main/java/org/primeoservices/cfgateway/pulsar/PulsarProducer.java @@ -96,7 +96,7 @@ public void postMessage(final Map data) throws Exception final TypedMessageBuilder builder = this.producer.newMessage(); final Object properties = data.get(PROPERTIES_KEY); PulsarUtils.setProperties(builder, (Map) properties); - builder.value(((String) message).getBytes()); + builder.value(((String) message).getBytes(ENCODING_CHARSET)); builder.send(); } }