From e73e323186473c3a0ede96e9569cbf5d46db3fa6 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Tue, 30 Jan 2024 18:54:15 +0530
Subject: [PATCH 1/9] documentation in progress
---
.../quarkus-solace-extension-common.adoc | 2 +-
.../quarkus-solace-extension-incoming.adoc | 2 +-
.../quarkus-solace-extension-outgoing.adoc | 2 +-
docs/modules/ROOT/pages/index.adoc | 347 +++++++++++++++++-
4 files changed, 347 insertions(+), 6 deletions(-)
diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc
index 481c091..bac3aba 100644
--- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc
+++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc
@@ -1,6 +1,6 @@
:summaryTableId: quarkus-solace-extension-common
-Common configuration for Solace Quarkus Extension Incoming and Outgoing channels
+Common configuration for Quarkus Solace Messaging Connector Incoming and Outgoing channels
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===
diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc
index 084c5e1..50a6b4c 100644
--- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc
+++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc
@@ -1,6 +1,6 @@
:summaryTableId: quarkus-solace-extension-incoming
-Incoming configuration for Solace Quarkus Extension
+Incoming configuration for Quarkus Solace Messaging Connector
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===
diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
index aa01412..cb04a28 100644
--- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
+++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
@@ -1,6 +1,6 @@
:summaryTableId: quarkus-solace-extension-outgoing
-Outgoing configuration for Solace Quarkus Extension
+Outgoing configuration for Quarkus Solace Messaging Connector
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 0e8b029..e6c4d64 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -2,10 +2,19 @@
include::./includes/attributes.adoc[]
-TIP: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.
+== Introduction
+The https://solace.com/products/platform/[Solace PubSub+ Platform]'s https://solace.com/products/event-broker/software/[software event broker] efficiently streams event-driven information between applications, IoT devices and user interfaces running in the cloud, on-premises, and hybrid environments using open APIs and protocols like AMQP, JMS, MQTT, REST and WebSocket. It can be installed into a variety of public and private clouds, PaaS, and on-premises environments, and brokers in multiple locations can be linked together in an https://solace.com/what-is-an-event-mesh/[event mesh] to dynamically share events across the distributed enterprise.
-== Installation
+== Quarkus Extension for Solace
+
+Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.
+
+Users have the choice to use the extension in two ways
+
+{empty}1. `com.solace.quarkus:quarkus-solace-client`
+
+This extension provides only Solace Java Messaging API and users need to have their own implementation and configuration to interact with Solace PubSub+ broker.
If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-client` extension first to your build file.
@@ -20,6 +29,23 @@ For instance, with Maven, add the following dependency to your POM file:
----
+{empty}2. `com.solace.quarkus:quarkus-solace-messaging-connector`
+
+This extension is based on reactive messaging framework and provides pre-defined configurations for incoming and outgoing channels.
+
+If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-messaging-connector` extension first to your build file.
+
+For instance, with Maven, add the following dependency to your POM file:
+
+[source,xml,subs=attributes+]
+----
+
+ com.solace.quarkus
+ quarkus-solace-messaging-connector
+ {project-version}
+
+----
+
[[extension-configuration-reference]]
== Extension Configuration Reference
@@ -38,4 +64,319 @@ include::includes/quarkus-solace-extension-outgoing.adoc[leveloffset=+1, opts=op
[[extension-common-configuration-reference]]
== Common Configuration Reference
-include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional]
\ No newline at end of file
+include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional]
+
+[[configuring-quarkus-solace-messaging-connector]]
+== Configuring Quarkus Solace Messaging Connector
+
+Reactive Messaging framework supports different messaging backends it employs a generic vocabulary:
+
+* Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Solace connector, a message corresponds to Inbound or Outbound Message.
+
+* Messages transit on channels. Application components connect to channels to publish and consume messages. The Solace connector maps channels to Solace queues and topics.
+
+* Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named `quarkus-solace`.
+
+A minimal configuration for the Solace connector with an incoming channel looks like the following:
+
+The following lines of configuration assumes that a exclusive queue is already provisioned on the broker
+[source,properties]
+----
+quarkus.solace.host=tcp://localhost:55555
+quarkus.solace.vpn=default
+quarkus.solace.authentication.basic.username=basic
+quarkus.solace.authentication.basic.password=basic
+
+mp.messaging.incoming.in.connector=quarkus-solace
+mp.messaging.incoming.in.consumer.queue.name=temperatures
+----
+
+The extension also supports provisioning queues and subscriptions on broker given that the user has role access to create queues with subscriptions. Configuration is as follows
+
+[source,properties]
+----
+quarkus.solace.host=tcp://localhost:55555
+quarkus.solace.vpn=default
+quarkus.solace.authentication.basic.username=basic
+quarkus.solace.authentication.basic.password=basic
+
+mp.messaging.incoming.temperatures.connector=quarkus-solace
+mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start
+mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true
+mp.messaging.incoming.temperatures.consumer.queue.subscriptions=hello/foobar
+----
+
+1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.
+
+2. If `consumer.queue.name` property is not specified, channel name will be used as queue name.
+
+[[receiving-messages-from-solace]]
+== Receiving messages from Solace
+
+Using the previous configuration, Quarkus application can receive message in several possible ways.
+
+__Direct Payload__
+[source,java]
+----
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import jakarta.enterprise.context.ApplicationScoped;
+@ApplicationScoped
+public class TemperaturesConsumer {
+ @Incoming("temperatures")
+ public void consume(Double temperature) {
+ // process.
+ }
+}
+----
+
+__Message__
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesConsumer {
+ @Incoming("temperatures")
+ public CompletionStage consume(Message msg) {
+ // access record metadata
+ SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
+ // process the message payload.
+ Double temperature = msg.getPayload();
+ // Acknowledge the incoming message
+ return msg.ack();
+ }
+}
+----
+
+__SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Solace Messaging API
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesConsumer {
+ @Incoming("temperatures")
+ public void consume(SolaceInboundMessage solaceInboundMessage) {
+ // get actual inbound message
+ InboundMessage inboundMessage = solaceInboundMessage.getMessage();
+ // process the message payload.
+ Double temperature = solaceInboundMessage.getPayload();
+ // access record metadata
+ SolaceInboundMetadata metadata = solaceInboundMessage.getMetadata();
+ // ...
+ solaceInboundMessage.ack();
+ }
+}
+----
+
+__@Channel__
+
+Alternatively, your application can inject a Multi in your bean and subscribe to its events as the following example:
+
+[source,java]
+----
+import io.smallrye.mutiny.Multi;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import org.jboss.resteasy.reactive.RestStreamElementType;
+
+@Path("/temperatures")
+public class TemperaturesResource {
+ @Inject
+ @Channel("temperatures")
+ Multi temperatures;
+
+ @GET
+ @Path("/temperatures")
+ @RestStreamElementType(MediaType.TEXT_PLAIN)
+ public Multi stream() {
+ return temperatures;
+ }
+}
+----
+
+[[acknowledgement-handling]]
+== Acknowledgment Handling
+
+By default, acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing.
+
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesConsumer {
+ @Incoming("temperatures")
+ public CompletionStage consume(Message msg) {
+ // access record metadata
+ SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
+ // process the message payload.
+ Double temperature = msg.getPayload();
+ // Acknowledge the incoming message
+ return msg.ack();
+ }
+}
+----
+
+[[failure-strategies]]
+== Failure Strategies
+
+If a message is nacked, a failure strategy is applied. Refer to `consumer.queue.failure-strategy` in <>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.
+
+`ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost.
+
+`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.
+
+`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.
+
+`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.
+
+[[sending-messages-to-solace]]
+== Sending messages to Solace
+
+Outgoing channel configuration to publish messages to Solace.
+
+[source,properties]
+----
+quarkus.solace.host=tcp://localhost:55555
+quarkus.solace.vpn=default
+quarkus.solace.authentication.basic.username=basic
+quarkus.solace.authentication.basic.password=basic
+
+mp.messaging.incoming.temperatures-out.connector=quarkus-solace
+mp.messaging.incoming.temperatures-out.producer.topic=temperatures
+----
+
+1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.
+
+2. If `producer.topic` property is not specified, channel name will be used as topic name.
+
+Using the previous configuration Quarkus application can publish messages as follows
+
+[source,java]
+----
+import io.smallrye.mutiny.Multi;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Random;
+
+@ApplicationScoped
+public class TemperaturesProducer {
+
+ private final Random random = new Random();
+
+ @Outgoing("temperatures-out")
+ public Multi generate() {
+ // Emit 1000 records
+ return Multi.createFrom().range(0, 1000)
+ .map(x -> random.nextDouble());
+ }
+
+}
+----
+
+You can also generate a `org.eclipse.microprofile.reactive.messaging.Message` with required metadata and publish to Solace.
+
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesProducer {
+ private final Random random = new Random();
+
+ @Outgoing("temperatures-out")
+ Multi> publishTemperatures() {
+ return Multi.createFrom().range(0, 1000)
+ .map(i -> {
+ SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
+ .setApplicationMessageId(Integer.toString(i)).createPubSubOutboundMetadata();
+ return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
+ });
+ }
+}
+----
+
+*SolaceOutboundMetadata* allows to configure metadata for the message. It supports all the headers supported by Solace and custom user properties. In addition to this it also supports configuring dynamic topic which overrides the default topic in application configuration file.
+
+Generating `org.eclipse.microprofile.reactive.messaging.Message` with dynamic topic and publish to Solace.
+
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesProducer {
+ private final Random random = new Random();
+
+ @Outgoing("temperatures-out")
+ Multi> publishTemperatures() {
+ return Multi.createFrom().range(0, 1000)
+ .map(i -> {
+ SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
+ .setApplicationMessageId(Integer.toString(i))
+ .setDynamicDestination("device/" + Integer.toString(i) + "/temperature").createPubSubOutboundMetadata();
+ return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
+ });
+ }
+}
+----
+
+Sending messages with __@Emitter__
+
+[source,java]
+----
+@Path("/temperatures")
+public class PublisherResource {
+
+ @Channel("temperatures-out")
+ MutinyEmitter temperatureEmitter;
+
+ @POST
+ @Path("/publish")
+ public Uni publish(Double temperature) {
+ return temperatureEmitter.send(person);
+ }
+}
+----
+
+[[processing-messages]]
+== Processing Messages
+
+Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the *@Incoming* and *@Outgoing* annotations:
+
+[source,java]
+----
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class TemperaturesProcessor {
+
+ @Incoming("temperatures-in")
+ @Outgoing("temperatures-out")
+ public double process(double temperature) {
+ return (temperature - 32) * 5 / 9;
+ }
+
+}
+----
+
+[[health-checks]]
+== Health Checks
+
+Quarkus provides several health checks for Solace. These checks are used in combination with the *quarkus-smallrye-health* extension.
+
+=== Reactive Messaging Health Checks
+
+When using Reactive Messaging and the Quarkus Solace Messaging Connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.
+
+The startup check verifies that the communication with Solace Broker is established.
+
+The liveness check captures any unrecoverable failure happening during the communication with Solace.
+
+The readiness check verifies that the Quarkus Solace Messaging Connector is ready to consume/produce messages to the configured Solace queues/topics.
+
+[[metrics]]
+== Metrics
+
+Quarkus Solace Messaging Connector exposes different metrics provided by Solace Java Messaging API. The metrics are enabled by default and can be accessed at `http://localhost:8080/q/dev-ui/io.quarkus.quarkus-micrometer/prometheus`
+
From 788600e8b5382f6793372fe4136ef153efc418bb Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 31 Jan 2024 13:07:02 +0530
Subject: [PATCH 2/9] Documentation update
---
docs/modules/ROOT/pages/index.adoc | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index e6c4d64..03c5fd8 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -87,8 +87,8 @@ quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
-mp.messaging.incoming.in.connector=quarkus-solace
-mp.messaging.incoming.in.consumer.queue.name=temperatures
+mp.messaging.incoming.temperatures.connector=quarkus-solace
+mp.messaging.incoming.temperatures.consumer.queue.name=temperatures
----
The extension also supports provisioning queues and subscriptions on broker given that the user has role access to create queues with subscriptions. Configuration is as follows
@@ -375,6 +375,11 @@ The liveness check captures any unrecoverable failure happening during the commu
The readiness check verifies that the Quarkus Solace Messaging Connector is ready to consume/produce messages to the configured Solace queues/topics.
+[[dev-services]]
+Dev Services
+
+Solace Dev Services for Quarkus will spin up latest version of Solace PubSub standard with label `solace` when running tests or in dev mode. Solace Dev Services are enabled by default and will check for any existing containers with same label to reuse. If none is present a new container is started.
+
[[metrics]]
== Metrics
From 296b8d765dad0d73fbee650e1f9d9fbeb670671d Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 31 Jan 2024 14:03:11 +0530
Subject: [PATCH 3/9] added producer back-pressure strategies
---
.../quarkus-solace-extension-outgoing.adoc | 2 +-
docs/modules/ROOT/pages/index.adoc | 14 +++++++++++++-
.../solace/quarkus/messaging/SolaceConnector.java | 2 +-
.../messaging/outgoing/SolaceOutgoingChannel.java | 6 +++---
4 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
index cb04a28..73ccbed 100644
--- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
+++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc
@@ -111,7 +111,7 @@ Supported strategies `reject`, `elastic`, `wait`. Refer to `https://docs.solace.
// Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SERVICE_NAME+++`
// endif::add-copy-button-to-env-var[]
--|string
-|`wait`
+|`elastic`
a| [[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.producer.back-pressure.buffer-capacity[producer.back-pressure.buffer-capacity]`
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 03c5fd8..3dce051 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -219,7 +219,7 @@ public class TemperaturesConsumer {
[[failure-strategies]]
== Failure Strategies
-If a message is nacked, a failure strategy is applied. Refer to `consumer.queue.failure-strategy` in <>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.
+If a message is nacked, a failure strategy is applied. Refer to <><>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.
`ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost.
@@ -336,6 +336,18 @@ public class PublisherResource {
}
----
+== Producer Back-Pressure strategies
+
+Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages
+
+{empty}1.Reject - Publisher will start rejecting messages once specified limit is reached
+
+{empty}2.Wait - Publisher is throttled when a specified limit is reached
+
+{empty}3.Elastic - Use an unlimited internal buffer (default)
+
+Refer to <><> and <><> on how to configure back-pressure for producer.
+
[[processing-messages]]
== Processing Messages
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
index f9386cf..6badc57 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
@@ -59,7 +59,7 @@
@ConnectorAttribute(name = "producer.waitForPublishReceipt", type = "boolean", direction = OUTGOING, description = "Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "producer.delivery.ack.timeout", type = "int", direction = OUTGOING, description = "Timeout to receive the publish receipt from broker.")
@ConnectorAttribute(name = "producer.delivery.ack.window.size", type = "int", direction = OUTGOING, description = "Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace.")
-@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "wait")
+@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "elastic")
@ConnectorAttribute(name = "producer.back-pressure.buffer-capacity", type = "int", direction = OUTGOING, description = "Outgoing messages backpressure buffer capacity", defaultValue = "1024")
public class SolaceConnector implements InboundConnector, OutboundConnector, HealthReporter {
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
index 36de3cc..96d9e1c 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
@@ -54,14 +54,14 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
this.channel = oc.getChannel();
PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder();
switch (oc.getProducerBackPressureStrategy()) {
- case "elastic":
- builder.onBackPressureElastic();
+ case "wait":
+ builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
break;
case "reject":
builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity());
break;
default:
- builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
+ builder.onBackPressureElastic();
break;
}
this.gracefulShutdown = oc.getClientGracefulShutdown();
From 712c51f1ffb5c59455beef808959327d038cfbc3 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 31 Jan 2024 14:39:38 +0530
Subject: [PATCH 4/9] Updated samples to show usage of various ways to consume
messages as mentioned in docs
---
docs/modules/ROOT/pages/index.adoc | 30 --------------
.../solace/quarkus/samples/HelloConsumer.java | 40 +++++++++++++++----
.../src/main/resources/application.properties | 10 ++++-
3 files changed, 42 insertions(+), 38 deletions(-)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 3dce051..1c080bd 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -165,36 +165,6 @@ public class TemperaturesConsumer {
}
----
-__@Channel__
-
-Alternatively, your application can inject a Multi in your bean and subscribe to its events as the following example:
-
-[source,java]
-----
-import io.smallrye.mutiny.Multi;
-import org.eclipse.microprofile.reactive.messaging.Channel;
-import jakarta.inject.Inject;
-import jakarta.ws.rs.GET;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
-import org.jboss.resteasy.reactive.RestStreamElementType;
-
-@Path("/temperatures")
-public class TemperaturesResource {
- @Inject
- @Channel("temperatures")
- Multi temperatures;
-
- @GET
- @Path("/temperatures")
- @RestStreamElementType(MediaType.TEXT_PLAIN)
- public Multi stream() {
- return temperatures;
- }
-}
-----
-
[[acknowledgement-handling]]
== Acknowledgment Handling
diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
index a6b7675..0a9bd9f 100644
--- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
+++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
@@ -8,6 +8,7 @@
import org.eclipse.microprofile.reactive.messaging.*;
import com.solace.quarkus.messaging.incoming.SolaceInboundMessage;
+import com.solace.quarkus.messaging.incoming.SolaceInboundMetadata;
import com.solace.quarkus.messaging.outgoing.SolaceOutboundMetadata;
import io.quarkus.logging.Log;
@@ -19,14 +20,18 @@ public class HelloConsumer {
/**
* Publishes message to topic hello/foobar which is subscribed by queue.foobar
*
- * @see #consumeMessage(SolaceInboundMessage)
+ * @see #consumeInboundMessage(SolaceInboundMessage)
+ * @see #consumePayload(String)
+ * @see #consumeMessage(Message)
* @return
*/
@Outgoing("hello-out")
Multi> publishMessage() {
- SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
- .setApplicationMessageId("1").createPubSubOutboundMetadata();
- return Multi.createFrom().items("1").map(m -> Message.of(m, Metadata.of(outboundMetadata)));
+ return Multi.createFrom().items("1", "2", "3", "4").map(m -> {
+ SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
+ .setApplicationMessageId(m).createPubSubOutboundMetadata();
+ return Message.of(m, Metadata.of(outboundMetadata));
+ });
}
/**
@@ -35,13 +40,34 @@ Multi> publishMessage() {
* @param p
*/
@Incoming("hello-in")
- @Acknowledgment(Acknowledgment.Strategy.MANUAL)
- CompletionStage consumeMessage(SolaceInboundMessage> p) {
- Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8),
+ CompletionStage consumeInboundMessage(SolaceInboundMessage p) {
+ Log.infof("Received message: %s from topic: %s", p.getPayload(),
p.getMessage().getDestinationName());
return p.ack();
}
+ /**
+ * Receives message from queue - queue.foobar
+ *
+ * @param p
+ */
+ @Incoming("hello-plain-message-in")
+ void consumePayload(String p) {
+ Log.infof("Received message: %s", p);
+ }
+
+ /**
+ * Receives message from queue - queue.foobar
+ *
+ * @param p
+ */
+ @Incoming("hello-reactive-message-in")
+ CompletionStage consumeMessage(Message p) {
+ Log.infof("Received message: %s from topic: %s", p.getPayload(),
+ p.getMetadata(SolaceInboundMetadata.class).get().getDestinationName());
+ return p.ack();
+ }
+
/**
* Receives message from queue - queue.dynamic.topic and overwrites the topic configured in outgoing channel
* dynamic-destination-out
diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties
index c0d8f33..15600f0 100644
--- a/samples/hello-connector-solace/src/main/resources/application.properties
+++ b/samples/hello-connector-solace/src/main/resources/application.properties
@@ -11,10 +11,18 @@ mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.name=queue.foobar
mp.messaging.incoming.hello-in.consumer.queue.missing-resource-creation-strategy=create-on-start
-mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive
+mp.messaging.incoming.hello-in.consumer.queue.type=durable-non-exclusive
mp.messaging.incoming.hello-in.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.hello-in.consumer.queue.subscriptions=hello/foobar
+mp.messaging.incoming.hello-plain-message-in.connector=quarkus-solace
+mp.messaging.incoming.hello-plain-message-in.consumer.queue.supports-nacks=true
+mp.messaging.incoming.hello-plain-message-in.consumer.queue.name=queue.foobar
+
+mp.messaging.incoming.hello-reactive-message-in.connector=quarkus-solace
+mp.messaging.incoming.hello-reactive-message-in.consumer.queue.supports-nacks=true
+mp.messaging.incoming.hello-reactive-message-in.consumer.queue.name=queue.foobar
+
mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace
mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.topic
From f621449f14f51276babd6d04a4f7404e7b954084 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 31 Jan 2024 14:45:59 +0530
Subject: [PATCH 5/9] Updated acknowledgement handling section
---
docs/modules/ROOT/pages/index.adoc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 1c080bd..6d0cd78 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -175,7 +175,7 @@ By default, acknowledgement strategy is set to client acknowledgement. This give
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
- public CompletionStage consume(Message msg) {
+ public CompletionStage consume(SolaceInboundMessage msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
From 8b103b1c877aa64263d198a0fa54c7331ff7163f Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 31 Jan 2024 14:50:31 +0530
Subject: [PATCH 6/9] added caution for back-pressure strategy reject in
documentation
---
docs/modules/ROOT/pages/index.adoc | 2 ++
1 file changed, 2 insertions(+)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 6d0cd78..5b19521 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -316,6 +316,8 @@ Quarkus Solace Messaging connector provides three different strategies to handle
{empty}3.Elastic - Use an unlimited internal buffer (default)
+CAUTION: In the current version we don't recommend to use back-pressure strategy `Reject` as it is in evolving phase.
+
Refer to <><> and <><> on how to configure back-pressure for producer.
[[processing-messages]]
From b331e5fba600cf558ad1af76958d781151f79e09 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Mon, 5 Feb 2024 12:41:32 +0530
Subject: [PATCH 7/9] addressed comments
---
docs/modules/ROOT/pages/index.adoc | 33 ++++++++-----------
.../solace/quarkus/samples/HelloConsumer.java | 2 ++
.../quarkus/samples/PublisherResource.java | 2 +-
.../src/main/resources/application.properties | 4 ++-
4 files changed, 20 insertions(+), 21 deletions(-)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 5b19521..2d975ad 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -123,7 +123,7 @@ import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
- public void consume(Double temperature) {
+ public void consume(byte[] temperature) {
// process.
}
}
@@ -135,11 +135,11 @@ __Message__
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
- public CompletionStage consume(Message msg) {
+ public CompletionStage consume(Message msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
- Double temperature = msg.getPayload();
+ double temperature = ByteBuffer.wrap(msg.getPayload()).getDouble();
// Acknowledge the incoming message
return msg.ack();
}
@@ -147,20 +147,15 @@ public class TemperaturesConsumer {
----
__SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Solace Messaging API
+
[source,java]
----
-@ApplicationScoped
+import com.solace.messaging.receiver.InboundMessage;@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
- public void consume(SolaceInboundMessage solaceInboundMessage) {
- // get actual inbound message
- InboundMessage inboundMessage = solaceInboundMessage.getMessage();
+ public void consume(InboundMessage inboundMessage) {
// process the message payload.
- Double temperature = solaceInboundMessage.getPayload();
- // access record metadata
- SolaceInboundMetadata metadata = solaceInboundMessage.getMetadata();
- // ...
- solaceInboundMessage.ack();
+ String temperature = inboundMessage.getPayloadAsString();
}
}
----
@@ -175,11 +170,11 @@ By default, acknowledgement strategy is set to client acknowledgement. This give
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
- public CompletionStage consume(SolaceInboundMessage msg) {
+ public CompletionStage consume(Message msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
- Double temperature = msg.getPayload();
+ double temperature = ByteBuffer.wrap(msg.getPayload()).getDouble();
// Acknowledge the incoming message
return msg.ack();
}
@@ -296,12 +291,12 @@ Sending messages with __@Emitter__
public class PublisherResource {
@Channel("temperatures-out")
- MutinyEmitter temperatureEmitter;
+ MutinyEmitter temperatureEmitter;
@POST
@Path("/publish")
- public Uni publish(Double temperature) {
- return temperatureEmitter.send(person);
+ public Uni publish(Temperature temperature) {
+ return temperatureEmitter.send(temperature);
}
}
----
@@ -337,8 +332,8 @@ public class TemperaturesProcessor {
@Incoming("temperatures-in")
@Outgoing("temperatures-out")
- public double process(double temperature) {
- return (temperature - 32) * 5 / 9;
+ public double process(byte[] temperature) {
+ return (ByteBuffer.wrap(temperature).getDouble() - 32) * 5 / 9;
}
}
diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
index 68ebc8f..03ddc10 100644
--- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
+++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java
@@ -1,5 +1,7 @@
package com.solace.quarkus.samples;
+import java.util.concurrent.CompletionStage;
+
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.*;
diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java
index 601ce63..69bbd3f 100644
--- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java
+++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java
@@ -15,7 +15,7 @@
@Path("/hello")
public class PublisherResource {
- @Channel("hello-out")
+ @Channel("publisher-out")
MutinyEmitter foobar;
/**
diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties
index 15600f0..fa3c93a 100644
--- a/samples/hello-connector-solace/src/main/resources/application.properties
+++ b/samples/hello-connector-solace/src/main/resources/application.properties
@@ -3,9 +3,11 @@
#quarkus.solace.authentication.basic.username=
#quarkus.solace.authentication.basic.password=
+mp.messaging.outgoing.publisher-out.connector=quarkus-solace
+mp.messaging.outgoing.publisher-out.producer.topic=hello/person
+
mp.messaging.outgoing.hello-out.connector=quarkus-solace
mp.messaging.outgoing.hello-out.producer.topic=hello/foobar
-mp.messaging.outgoing.hello-out.merge=true
mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true
From 3ebef38cd7191cb39937572f5696a21c5cdbd676 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Mon, 5 Feb 2024 12:46:48 +0530
Subject: [PATCH 8/9] Added partition key in documentation
---
docs/modules/ROOT/pages/index.adoc | 25 +++++++++++++++++++++++++
1 file changed, 25 insertions(+)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index 2d975ad..a4827a7 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -283,6 +283,31 @@ public class TemperaturesProducer {
}
----
+Generating `org.eclipse.microprofile.reactive.messaging.Message` with partition key and publish to Solace.
+
+[source,java]
+----
+@ApplicationScoped
+public class TemperaturesProducer {
+ private final Random random = new Random();
+
+ @Outgoing("temperatures-out")
+ Multi> publishTemperatures() {
+ return Multi.createFrom().range(0, 1000)
+ .map(i -> {
+ String partitionKey = "Group-1";
+ if(i % 2 == 0) {
+ partitionKey = "Group-2";
+ }
+ SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
+ .setApplicationMessageId(Integer.toString(i))
+ .setPartitionKey(partitionKey).createPubSubOutboundMetadata();
+ return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
+ });
+ }
+}
+----
+
Sending messages with __@Emitter__
[source,java]
From 90a7a8a3573187f299099ab9b7169d776d19f28f Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Mon, 5 Feb 2024 13:07:41 +0530
Subject: [PATCH 9/9] Corrected parse to double
---
docs/modules/ROOT/pages/index.adoc | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc
index a4827a7..fd22a75 100644
--- a/docs/modules/ROOT/pages/index.adoc
+++ b/docs/modules/ROOT/pages/index.adoc
@@ -139,7 +139,7 @@ public class TemperaturesConsumer {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
- double temperature = ByteBuffer.wrap(msg.getPayload()).getDouble();
+ double temperature = Double.parseDouble(new String(p.getPayload()));
// Acknowledge the incoming message
return msg.ack();
}
@@ -174,7 +174,7 @@ public class TemperaturesConsumer {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
- double temperature = ByteBuffer.wrap(msg.getPayload()).getDouble();
+ double temperature = Double.parseDouble(new String(p.getPayload()));
// Acknowledge the incoming message
return msg.ack();
}
@@ -358,7 +358,7 @@ public class TemperaturesProcessor {
@Incoming("temperatures-in")
@Outgoing("temperatures-out")
public double process(byte[] temperature) {
- return (ByteBuffer.wrap(temperature).getDouble() - 32) * 5 / 9;
+ return (Double.parseDouble(new String(p.getPayload())) - 32) * 5 / 9;
}
}