This application can send, receive and optionally forward messages to an Artemis (Red Hat AMQ 7) message broker.
The app is based on Apache Camel v2 (Red Hat Fuse v7) running on Spring Boot v2. Build should be simple with Maven v3.8 using Java 1.8 or Java 11:
mvn clean install
Run as:
java -jar target/*.jar
Alternatively we can build/run the application in one command using spring-boot:run
. For example:
mvn clean spring-boot:run
By default parameters are taken from application.properties
, but usually we have different application-myprofile.properties
files and pick one like:
-
java -Dspring.profiles.active=myprofile -jar target/*.jar
-
mvn clean spring-boot:run -Dspring-boot.run.profiles=myprofile
The app supports CORE, AMQP, OPENWIRE and MQTT protocols to connect to the broker. The connection string must set to match the selected protocol.
CORE example:
# https://access.redhat.com/documentation/en-us/red_hat_amq_clients/2.11/html-single/using_the_amq_core_protocol_jms_client/index#configuration_options
connection.type=CORE
connection.remoteUrl=tcp://localhost:61616?retryInterval=100;retryIntervalMultiplier=1.0;reconnectAttempts=30;consumerWindowSize=4000
AMQP example:
# https://qpid.apache.org/releases/qpid-jms-0.61.0/docs/index.html#connection-uri
connection.type=AMQP
connection.remoteUrl=amqp://localhost:5672?jms.prefetchPolicy.all=10
OPENWIRE example:
# https://activemq.apache.org/connection-configuration-uri
connection.type=OPENWIRE
connection.remoteUrl=failover:(tcp://localhost:61616)?maxReconnectDelay=10000
MQTT example:
connection.type=MQTT
connection.remoteUrl=tcp://localhost:1883
Connection pooling is enabled using org.messaginghub.pooled.jms.JmsPoolConnectionFactory
with connection.maxConnections
size.
Alternatively can set connection.useCachingConnectionFactory=true
to use Spring’s CachingConnectionFactory
instead.
We can send messages to queue "myqueue" by creating application-send.properties
properties file:
send.enabled=true
send.endpoint=jms:queue:myqueue
send.message.length=1000
send.headers.count=2
send.headers.length=40
send.count=10
send.shutdownEnabled=true
connection.type=CORE
connection.remoteUrl=tcp://localhost:61616
connection.username=admin
connection.password=admin
This will send 10 messages with 1000 char message body and two 40 char long JMS property, then it will shut down.
Run application using the send Spring profile:
java -Dspring.profiles.active=send -jar target/*.jar
Message body - text - can be set, or it can be generated with given length:
# Message body to use
send.message=Hello World!
# Alternatively generate message body with given length (chars) greater than 0.
send.message.length=1000
Messages headers (JMS properties) can also be set directly or generated with given length (name "extra0", "extra1"..):
# Headers to add
send.headers={header1: "value1", header2: "value2"}
# Alternatively add geneated headers with given length (chars)
send.headers.count=2
send.headers.length=40
The generated messages are useful for large message testing or during performance tests, when the message content is not important.
With send.threads
parameter we can also have multiple threads sending messages (send.count
per each), to increase load on the broker.
The total number of sent messages and rate is logged every second.
We can receive messages from a queue or topic. These messages are not saved or printed anywhere, the goal is to test message flow and rate of message consumption.
For example create application-receive.properties
properties file:
receive.enabled=true
receive.concurrentConsumers=2
receive.transacted=false
receive.cacheLevelName=CACHE_CONSUMER
receive.destination=q1
# Instead of "receive.destination" and parameters above we could set the whole JMS component endpoint "receive.endpoint" directly
#receive.endpoint=jms:queue:q1?concurrentConsumers=2&transacted=false&cacheLevelName=CACHE_CONSUMER
connection.type=CORE
connection.remoteUrl=tcp://localhost:61616
connection.username=admin
connection.password=admin
Run java -Dspring.profiles.active=receive -jar target/*.jar
to receive messages from queue "q1" using two consumer threads.
Add logging.level.bszeti.camelspringboot.jmstest=TRACE
to log message ID’s and when messages are "received" and "processed". We can use parameter receive.delay=100
to simulate 100ms message processing time.
By default the application doesn’t terminate and we have to kill or manually (Ctrl+C) stop the process. To stop app after receiving messages use one of these parameters:
-
receive.shutdownMessageCount
: Stop process after receiving a number of messages. -
receive.shutdownIdleSec
: Stop after not receiving any more messages for the given time. -
shutdownSec
: Stop app after given time from startup (applies to send and receive too)
The total number of received messages and rate is logged every second.
Optionally the received messages can be forwarded to another queue. This advanced feature was built to test message receive and send within the same transaction. Look at the details in the code.
A complete scenario with sending, receiving and forwarding 10k messages would look like this:
send.enabled=true
send.endpoint=jms:queue:q1
send.message.length=1000
send.count=10000
receive.enabled=true
receive.endpoint=jms:queue:q1?concurrentConsumers=1&transacted=true&cacheLevelName=CACHE_CONSUMER
receive.forward.enabled=true
receive.forward.endpoint=jms:queue:q1.forward
receive.forward.propagation=PROPAGATION_REQUIRED
connection.type=OPENWIRE
connection.remoteUrl=failover(tcp://localhost:61616)?maxReconnectDelay=1000
Ideally this would end up with 10k messages on "q1.forward" queue.
Endpoint examples:
send.endpoint=paho:q1?qos=2
receive.endpoint=paho:q1
Some JMS specific parameters (e.g. receive.concurrentConsumers
, receive.transacted=true
, receive.cacheLevelName
, connection.maxConnections
) are not ignored.
See related endpoint-jms-default
and endpoint-paho-default
parameters in application.properties
.
The app can be packaged in a container using Dockerfile
or with OpenShift S2I BuildConfig yamls/buildconfig.yaml
.
A built image is available as quay.io/bszeti/fuse-artemis-test:latest
.
We can put "application.properties" in a ConfigMap like yamls/configmap.yaml
and then create a Kubernetes Job to run the container image as yamls/job.yaml
.
An example ActiveMQArtemis CR for the AMQ Broker Operator can be found in yamls/activemqartemis.yaml