An Apache Camel endpoint using Vortex OpenSplice is configured using following URI format:
dds:topicName:domainID/[topicClass][?options]
Where topicName is the DDS topic name to publish or subscribe, domainID is the DDS domain identifier, and topicClass is the generated class for the topic type (optional in the case of Camel Messages mode; see Camel Messages mode).
Options is a list of optional parameters, separated by '&' characters; for example:
?option1=value1&option2=value2&...
The following options are accepted:
Name | Type | Description |
---|---|---|
readMode |
Boolean |
Only applies to a consumer endpoint (from("dds:…')). By default the DDS DataReader of a consumer endpoint uses the The advantage is that in case of a DISPOSE or WRITEDISPOSE message the data will be valid ( i.e. the latest-read data). The drawback is that the DataReader consumes more memory to keepthe latest data in a cache. |
ignoreLocalPublishers |
Boolean |
Only applies to a consumer endpoint (from("dds:…')). Indicate if a consumer must ignore the data published by a local producer (i.e. in the same JVM). By default this option is set to true, avoiding loop for bridge patterns using 2 routes in opposite directions. For instance with a DDS/JMS bridge, you configure 2 routes:
If this option is set to false, the 2nd route will receive the publications from the 1st route, sending back to JMS data which are coming from JMS. |
partition |
String |
Corresponds to DDS A list of DDS partitions, separated by |
contentFilter |
String |
Corresponds to a DDS filter expression to be used on topic (actually a ContentFilteredTopic is created by the endpoint). This is a SQL-like expression as defined in the DDS specification. WARNING : the expression must use percent-encoding for URI reserved and
unsafe characters (e.g. |
deadlinePeriod |
Double |
Corresponds to DDS A float representing a time in seconds. |
destinationOrder |
String |
Corresponds to DDS Possible values:
|
durabilityKind |
String |
Corresponds to DDS Possible values:
|
durabilityServiceCleanupDelay |
Double |
Corresponds to DDS A float representing a time in seconds. |
durabilityServiceDepth |
Int |
Corresponds to DDS An integer representing the history depth. |
durabilityServiceKind |
String |
Corresponds to DDS Possible values:
|
durabilityServiceMaxInstances |
Int |
Corresponds to DDS An integer representing the maximum number of instances. |
durabilityServiceMaxSamples |
Int |
Corresponds to DDS An integer representing the maximum number of samples. |
durabilityServiceMaxSamplesPerInstance |
Int |
Corresponds to DDS An integer representing the history depth. |
historyDepth |
String |
Corresponds to DDS An integer representing the maximum number of samples per instance. |
historyKind |
String |
Corresponds to DDS Possible values:
|
latencyDuration |
Double |
Corresponds to DDS A float representing a time in seconds. |
lifespanDuration |
Double |
Corresponds to DDS A float representing a time in seconds. |
livelinessDuration |
Double |
Corresponds to DDS A float representing a time in seconds. |
livelinessKind |
String |
Corresponds to DDS Possible values:
|
ownershipKind |
String |
Corresponds to DDS Possible values:
|
ownershipStrength |
String |
Corresponds to DDS An integer representing the ownership strength. |
presentationAccessScope |
String |
Corresponds to DDS Possible values:
|
presentationCoherentAccess |
Boolean |
Corresponds to DDS |
presentationOrderedAccess |
Boolean |
Corresponds to DDS |
readerDataLifecycleAutopurgeDisposedDelay |
Double |
Corresponds to DDS A float representing a time in seconds. |
readerDataLifecycleAutopurgeNowriterDelay |
Double |
Corresponds to DDS A float representing a time in seconds. |
reliabilityBlockingTime |
Double |
Corresponds to DDS A float representing a time in seconds. |
reliabilityKind |
String |
Corresponds to DDS Possible values:
|
resourceLimitsMaxInstances |
Int |
Corresponds to DDS An integer representing the maximum number of instances. |
resourceLimitsMaxSamples |
Int |
Corresponds to DDS An integer representing the maximum number of samples. |
resourceLimitsMaxSamplesPerInstance |
Int |
Corresponds to DDS An integer representing the maximum number of samples per instance. |
timebasedFilter |
Double |
Corresponds to DDS A float representing a time in seconds. |
transportPriority |
Int |
Corresponds to DDS An integer representing the transport priority. |
writerDataLifecycleAutodispose |
Boolean |
Corresponds to DDS WriterDataLifecycleQosPolicy.autodispose_unregistered_instances |
consumer.exceptionHandler |
String |
The reference to an ExceptionHandler bean that will handle exceptions produced by the consumer. See Error Handling for more details. |
consumer.bridgeErrorHandler |
Boolean |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while reading on DDS will now be processed as a message and handled by the routing Error Handler. See Error Handling for more details. |
Each Camel Exchange produced by the DDS endpoint contains the data as body of the IN message. The data has for its type the Java class representing the Topic Type.
Note that this data might be 'invalid' ( i.e. with some members not
set, or with dummy values), if the instance state changes. See the
Vortex OpenSplice reference guide for more details (ReaderDataLifecycleQos
chapter). This can be checked with the SampleInfo.valid_data
boolean
(see below).
The IN message contains the following information as headers:
Name | Type | Description |
---|---|---|
DDS_SAMPLE_INFO |
DDS.SampleInfo |
The SampleInfo object read by the DDS Reader with the data. |
DDS_DISPOSE |
com.adlinktech.gateway.camelospl.DdsDisposeHeader |
This header is present only if the instance was disposed. In such a case it’s set either to DISPOSE if the body of the message contains invalid data, or to WRITEDISPOSE if the body of the message contains the last valid data sent by the publisher before the disposal. |
Below is an example of a simple Camel route definition from a DDS endpoint to a Processor displaying the data:
final String fromURI =
"dds:ChatMessage:0/Chat.ChatMessage?ReliabilityKind=RELIABLE&Partition=ChatRoom";
final CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
public void configure() {
// from DDS endpoint to a Processor displaying the received message.
from(fromURI).process(new Processor() {
public void process(Exchange e) {
Chat.ChatMessage msg = (Chat.ChatMessage) e.getIn().getBody();
String content = msg.content;
System.out.println(content);
}
});
}
});
ctx.start();
In this example the DDS endpoint subscribes to the 'ChatMessage' topic in DDS Domain '0' using DDS partition 'ChatRoom'. The topic Reliability QoS is set to 'RELIABLE'.
If an error occurs in camel-ospl Producer when it writes on DDS, a com.prismtech.gateway.camelospl.DdsException is thrown internally and caught by the Camel routing engine. The engine store this exception in current Exchange’s properties and continue with routing.
You can handle such error in the usual Camel way, as described here: http://camel.apache.org/error-handling-in-camel.html
The com.prismtech.gateway.camelospl.DdsException class offers the following method to get more details on the error:
-
public int getRetcode()
: to get the DDS "return code" that identify the reason of error. -
public String getMessage()
: to get the OpenSplice’s error message.
Example of Producer’s error handling on a route:
from("...") // from any endpoint
.onException(DdsException.class) // catch error and process it
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
// the caused by exception is stored in a property on the exchange
DdsException exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
DdsException.class);
System.out.println("Error writing to DDS: (code="
+ ddsException.getRetcode() + ") "
+ ddsException.getMessage());
}
})
.end()
.to("dds:...") // to a DDS endpoint
As explained in Apache Camel FAQ, the errors occurring in a Consumer before the Exchange is created are caught by an ErrorHandler which by default is the Camel’s LoggingExceptionHandler. This handler will only log the exceptions and then ignore them.
Similarly to the Camel’s File component, the camel-ospl component offers 2 options to handle errors that may occur in Consumer (i.e. on DDS DataReader usage):
-
consumer.exceptionHandler option allows to configure a custom ExceptionHandler.
Example of usage:
// A custom handler. // Note that it must be registered as a bean in Registry to be referred in URI. private static class MyExceptionHandler implements ExceptionHandler { @Override public void handleException(Throwable exception) { handleException(exception.getMessage(), exception); } @Override public void handleException(String message, Throwable exception) { handleException(message, null, exception); } @Override public void handleException(String message, Exchange exchange, Throwable exception) { // ... handle exception ... } } // Route definition, referring the custom handler bean from("dds:ChatMessage:0/Chat.ChatMessage?consumer.exceptionHandler=#myExceptionHandler") .to("...");
-
consumer.bridgeErrorHandler option, when set to true, instructs Camel to handle any error in Consumer and to create and route an Exchange containing the exception.
Example of usage:
from("dds:ChatMessage:0/Chat.ChatMessage?consumer.bridgeErrorHandler=true") .onException(DdsException.class) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { // the caused by exception is stored in a property on the exchange DdsException exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, DdsException.class); // ... handle exception ... } }) .stop() .end() .to("...");
The camel-ospl component offers the possibility to change at runtime some of the DDS QoS which are used by the Vortex OpenSplice entities (Publisher, Subscriber, DataWriter or DataReader).
The Publisher and DataWriter DDS entities are managed by a com.adlinktech.gateway.camelospl.DdsProducer class which implements the org.apache.camel.Producer interface.
The following operations can be used at runtime to change some of the DDS QoS used for subsequent Camel exchanges:
-
public void changeDeadlinePeriod(double period) throws DdsException;
-
public void changeLatencyDuration(double duration) throws DdsException;
-
public void changeOwnershipStrength(int strength) throws DdsException;
-
public void changeTransportPriority(int priority) throws DdsException;
-
public void changeLifespanDuration(double duration) throws DdsException;
-
public void changePartition(String partitionStr) throws DdsException;
Example of usage:
final String toURI =
"dds:ChatMessage:0/Chat.ChatMessage?Partition=ChatRoom-1";
final CamelContext ctx = new DefaultCamelContext();
final DdsProducer toProducer =
(DdsProducer) ctx.getEndpoint(toURI).createProducer();
ctx.addRoutes(new RouteBuilder() {
public void configure() {
from("...") // from any endpoint
.process(toProducer); // equivalent to: .to(toUri);
}
});
// ... later, change the partition used by the DdsProducer's entities
toProducer.changePartition("ChatRoom-2");
The Subscriber and DataReader DDS entities are managed by the class com.adlinktech.gateway.camelospl.DdsConsumer which implements the org.apache.camel.Consumer interface.
The following operations can be used at runtime to change some of the DDS QoS used for subsequent Camel exchanges:
-
public void changeDeadlinePeriod(double period) throws DdsException;
-
public void changeLatencyDuration(double duration) throws DdsException;
-
public void changeTimebasedFilter(double timebasedFilter) throws DdsException;
-
public void changePartition(String partitionStr) throws DdsException;
Example of usage:
final String toURI =
"dds:ChatMessage:0/Chat.ChatMessage?Partition=ChatRoom-1";
final CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
public void configure() {
from(fromURI)
.routeId("ChatRoute") //give an ID to the Route for later retrieval
.process(new Processor() {
public void process(Exchange e) {
Chat.ChatMessage msg = (Chat.ChatMessage)e.getIn().getBody();
String content = msg.content;
System.out.println(content);
}
});
}
});
// ... later, change the partition for the DdsConsumer's entities
DdsConsumer ddsConsumer =
(DdsConsumer) ctx.getRoute("ChatRoute").getConsumer();
ddsConsumer.changePartition("ChatRoom-2");
It is also possible to change some of the Producer QoS on-the-fly for a single message. To do this, add the following headers to the Camel message sent to a DDS Producer:
Header name | Header value |
---|---|
|
An integer representing the ownershipStrength value to be used for this message. |
|
An integer representing the transportPriority value to be used for this message. |
|
A float representing the lifespanDuration value (in seconds) to be used for this message. |
Example of usage:
// A route publishing Alarms to DDS and changing the
// transportPriority QoS depending the alarm level
from("...")
.choice()
.when().groovy("request.body.alarmLevel == 'LOW'")
.setHeader("dds.transportPriority", constant(1))
.when().groovy("request.body.alarmLevel == 'HIGH'")
.setHeader("dds.transportPriority", constant(3))
.endChoice()
.to("dds:Alarms:0/com.adlinktech.demo.AlarmType");
This mode allows Camel Messages to be exchanged via DDS. This includes the Message’s headers, attachments and body, providing they contain only Serializable Java objects.
This mode is activated when you don’t specify a topicClass in the endpoint URI.
Below is an example of Camel Messages mode usage, with a Java String as the message body:
final String fromURI = "dds:ExampleStrTopic:0/?target=stringTarget";
final CamelContext ctx = new DefaultCamelContext();
Endpoint endpoint = ctx.getEndpoint(fromURI);
// Define the route from DDS endpoint to a Processor
// displaying the received data.
ctx.addRoutes(new RouteBuilder() {
public void configure() {
from(fromURI)
.process(new Processor() {
public void process(Exchange e) {
System.out.println((String)e.getIn().getBody());
}
});
}
});
// create a ProducerTemplate
ProducerTemplate template = ctx.createProducerTemplate();
// use ProducerTemplate to send Exchange to DDS endpoint
template.send(endpoint, new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setBody("Hello World!");
}
});
The 'target' option used in the URI is specific to the Camel Messages mode. This extra value is sent through DDS with the Camel message and allows consuming routes to filter messages according to this 'target' value.
The Vortex OpenSplice endpoint provides a DDSPollingConsumer implementation allowing it to wait for incoming DDS data. Hence the caller can poll for messages when it is ready.
The DDSPollingConsumer implements the three polling methods of the Camel PollingConsumer interface in addition to three other polling methods that allow the caller to specify some DDS specific options in order to select the set of data to be read.
Method Name | Description |
---|---|
|
Waits until a message is available |
|
blocking forever. |
|
Waits up to the given timeout and |
|
within the given timeout. |
|
Reads a message immediately without |
|
is received. |
The 'options' parameter can express the states of the DDS messages to be read as well as a condition that allows it to read the messages with content-based filtering. The options map keys correspond to the headers names described in the table in DynamicPollEnricher.
Note that the DDSPollingConsumer performs a DDS read()
operation and not
a take()
operation. It means that the data is kept in the DDS middleware
and can be read again if necessary.
Below is an example of how to use the DDS PollingConsumer.
// Getting the DDS topicA endpoint
Endpoint ddsEndpointA = camelContext.getEndpoint("dds:topicA:0/TypeA");
// Create PollingConsumer to poll data from topicA
DynamicPollingConsumer pollConsumer =
(DynamicPollingConsumer) ddsEndpointA.createPollingConsumer();
// Setting the polling options : only not read and alive samples
Map<String, Object> conditions = new HashMap<String, Object>();
conditions.put("DDS_SAMPLE_STATE", NOT_READ_SAMPLE_STATE.value);
conditions.put("DDS_INSTANCE_STATE", ALIVE_INSTANCE_STATE.value);
long wait_timeout = 3000; // milliseconds
// Polling for topicA messages with the given options
Exchange e = pollConsumer.receive(conditions, wait_timeout);
if (e != null) {
// to process the message
} else {
// TIMEOUT
}
Apache Camel already provides a pollEnrich pattern (see http://camel.apache.org/content-enricher.html) allowing in a Camel route to enrich an incoming message with data from a PollingConsumer.
For Camel versions 2.15 and older, the pollEnrich pattern is not able to extract data from current exchange to influence the polling ( e.g. to poll data with the same key as in the received message). See the note in http://camel.apache.org/content-enricher.html.
From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. See more details at Message Endpoint http://camel.apache.org/message-endpoint.html. However, the usage of dynamic endpoints is not appropriate in our case, because it would force the creation of a DataWriter for each polling query.
The Camel DDS component have therefore introduced a new class com.adlinktech.gateway.camelext.DynamicPollEnricher which can be use as a Camel Processor. This Processor implements a missing feature in pollEnricher, passing incoming messages' headers to a specific implementation of PollingConsumer for DDS endpoint creating a DDSQueryCondition according to the headers' options and calling DataReader.read_w_condition() .
Note that the DynamicPollEnricher uses a non blocking operation of the PollingConsumer to receive the messages. If no message is available yet, it returns immediately with a null exchange.
By default the DynamicPollEnricher class uses the reply from PollingConsumer as outgoing message. But another AggregationStrategy can be set to merge the reply with original incoming message. Use the operation PollingConsumer.setAggregationStrategy() to set another strategy.
The accepted header options are:
Name | Type |
---|---|
Description |
DDS_SAMPLE_STATE |
int |
A mask of matching SampleStates. Default value is DDS.ANY_SAMPLE_STATE.value |
DDS_VIEW_STATE |
int |
A mask of matching ViewStates. Default value is DDS.ANY_VIEW_STATE.value |
DDS_INSTANCE_STATE |
int |
A mask of matching InstanceStates. Default value is DDS.ANY_INSTANCE_STATE_value |
DDS_QUERY_EXPRESSION |
String |
A query expression. Default value is null (meaning a ReadCondition is used instead of a QueryCondition ). |
DDS_QUERY_PARAMETERS |
String[] |
An array of query parameters. Default value is an empty array. |
Below is an example of the DynamicPollEnricher class:
// Some DDS endpoints. Messages come from topicA and have to be enriched
// by matching data from topicB.
Endpoint ddsEndpointA = camelContext.getEndpoint("dds:topicA:0/TypeA");
Endpoint ddsEndpointB = camelContext.getEndpoint("dds:topicB:0/TypeB");
// Create a DynamicPollEnricher polling from topicB
DynamicPollEnricher pollEnricher = new
DynamicPollEnricher("dds:topicB:0/TypeB");
// Set custom strategy, creating a TypeC from a TypeA + TypeB
pollEnricher.setAggregationStrategy(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
// get TypeA and TypeB
TypeA a = (TypeA) oldExchange.getIn().getBody();
TypeB b = (TypeB) newExchange.getIn().getBody();
// create TypeC
TypeC c = new TypeC(a,b);
// replace oldExchange's body with TypeC and return it
oldExchange.getIn().setBody(c);
return oldExchange;
}
});
// Route from TopicA topic to the PollEnricher getting a TopicB sample
// with TopicA.index = TopicB.index and aggregating both as a NamedMessage.
from(ddsEndpointA)
// Query expression to get TopicB with index = first param
.setHeader("DDS\_QUERY\_EXPRESSION", constant("index=%0"))
// Query param: use Groovy to get TopicA.index and create String array
.setHeader("DDS\_QUERY\_PARAMETERS")
.groovy("[request.body.index.toString()]")
// send message with headers to pollEnricher
.process(pollEnricher)
// now display resulting message (TypeC is expected)
.process(new Processor() {
public void process(Exchange e) {
System.out.println((TypeC)e.getIn().getBody());
}
});
For another example of DynamicPollEnricher usage, look at Vortex Gateway’s examples/camel-example-dds code. It contains an example of a route from a Circle topic, polling correlated data from a Square topic of the same color and computes the average position between Circle and Square and publishes a Triangle topic in that position.