Skip to content

Latest commit

 

History

History
794 lines (590 loc) · 27.6 KB

README.adoc

File metadata and controls

794 lines (590 loc) · 27.6 KB

camel-ospl User Guide

The camel-ospl Endpoint configuration

An Apache Camel endpoint using Vortex OpenSplice is configured using following URI format:

dds:topicName:domainID/[topicClass][?options]

Where topicName is the DDS topic name to publish or subscribe, domainID is the DDS domain identifier, and topicClass is the generated class for the topic type (optional in the case of Camel Messages mode; see Camel Messages mode).

Options is a list of optional parameters, separated by '&' characters; for example:

?option1=value1&option2=value2&...

The following options are accepted:

Name Type Description

readMode

Boolean

Only applies to a consumer endpoint (from("dds:…​')).

By default the DDS DataReader of a consumer endpoint uses the take() operation. Setting this option to true makes it to use the `read()`operation instead.

The advantage is that in case of a DISPOSE or WRITEDISPOSE message the data will be valid ( i.e. the latest-read data). The drawback is that the DataReader consumes more memory to keepthe latest data in a cache.

ignoreLocalPublishers

Boolean

Only applies to a consumer endpoint (from("dds:…​')).

Indicate if a consumer must ignore the data published by a local producer (i.e. in the same JVM).

By default this option is set to true, avoiding loop for bridge patterns using 2 routes in opposite directions. For instance with a DDS/JMS bridge, you configure 2 routes:

  • a route from JMS to DDS

  • a route from DDS to JMS

If this option is set to false, the 2nd route will receive the publications from the 1st route, sending back to JMS data which are coming from JMS.

partition

String

Corresponds to DDS PartitionQosPolicy.name.

A list of DDS partitions, separated by , characters.

contentFilter

String

Corresponds to a DDS filter expression to be used on topic (actually a ContentFilteredTopic is created by the endpoint). This is a SQL-like expression as defined in the DDS specification.

WARNING : the expression must use percent-encoding for URI reserved and unsafe characters (e.g. = = %3D , > = %3E, ..)

deadlinePeriod

Double

Corresponds to DDS DeadlineQosPolicy.period.

A float representing a time in seconds.

destinationOrder

String

Corresponds to DDS DestinationOrderQosPolicy.kind.

Possible values:

  • BY_RECEPTION_TIMESTAMP

  • BY_SOURCE_TIMESTAMP

durabilityKind

String

Corresponds to DDS DurabilityQosPolicy.kind.

Possible values:

  • VOLATILE

  • TRANSIENT

  • TRANSIENT_LOCAL

  • PERSISTENT

durabilityServiceCleanupDelay

Double

Corresponds to DDS DurabilityServiceQosPolicy.service_cleanup_delay

A float representing a time in seconds.

durabilityServiceDepth

Int

Corresponds to DDS DurabilityServiceQosPolicy.history_depth

An integer representing the history depth.

durabilityServiceKind

String

Corresponds to DDS DurabilityServiceQosPolicy.history_kind

Possible values:

  • KEEP_ALL

  • KEEP_LAST

durabilityServiceMaxInstances

Int

Corresponds to DDS DurabilityServiceQosPolicy.max_instances

An integer representing the maximum number of instances.

durabilityServiceMaxSamples

Int

Corresponds to DDS DurabilityServiceQosPolicy.max_samples

An integer representing the maximum number of samples.

durabilityServiceMaxSamplesPerInstance

Int

Corresponds to DDS DurabilityServiceQosPolicy.max_samples_per_instance

An integer representing the history depth.

historyDepth

String

Corresponds to DDS HistoryQosPolicy.depth.

An integer representing the maximum number of samples per instance.

historyKind

String

Corresponds to DDS HistoryQosPolicy.kind.

Possible values:

  • KEEP_ALL

  • KEEP_LAST

latencyDuration

Double

Corresponds to DDS LatencyBudgetQosPolicy.duration.

A float representing a time in seconds.

lifespanDuration

Double

Corresponds to DDS LifespanQosPolicy.duration.

A float representing a time in seconds.

livelinessDuration

Double

Corresponds to DDS LivelinessQosPolicy.lease_duration.

A float representing a time in seconds.

livelinessKind

String

Corresponds to DDS LivelinessQosPolicy.kind.

Possible values:

  • AUTOMATIC

  • MANUAL_BY_PARTICIPANT

  • MANUAL_BY_TOPIC

ownershipKind

String

Corresponds to DDS OwnershipQosPolicy.kind.

Possible values:

  • SHARED

  • EXCLUSIVE

ownershipStrength

String

Corresponds to DDS OwnershipStrengthQosPolicy.value.

An integer representing the ownership strength.

presentationAccessScope

String

Corresponds to DDS PresentationQosPolicy.access_scope.

Possible values:

  • INSTANCE

  • TOPIC

  • GROUP

presentationCoherentAccess

Boolean

Corresponds to DDS PresentationQosPolicy.coherent_access

presentationOrderedAccess

Boolean

Corresponds to DDS PresentationQosPolicy.ordered_access

readerDataLifecycleAutopurgeDisposedDelay

Double

Corresponds to DDS ReaderDataLifecycleQosPolicy.autopurge_disposed_samples_delay

A float representing a time in seconds.

readerDataLifecycleAutopurgeNowriterDelay

Double

Corresponds to DDS ReaderDataLifecycleQosPolicy.autopurge_nowriter_samples_delay

A float representing a time in seconds.

reliabilityBlockingTime

Double

Corresponds to DDS ReliabilityQosPolicy.max_blocking_time

A float representing a time in seconds.

reliabilityKind

String

Corresponds to DDS ReliabilityQosPolicy.kind.

Possible values:

  • RELIABLE

  • BEST_EFFORT

resourceLimitsMaxInstances

Int

Corresponds to DDS ResourceLimitsQosPolicy.max_instances

An integer representing the maximum number of instances.

resourceLimitsMaxSamples

Int

Corresponds to DDS ResourceLimitsQosPolicy.max_samples.

An integer representing the maximum number of samples.

resourceLimitsMaxSamplesPerInstance

Int

Corresponds to DDS ResourceLimitsQosPolicy.max_samples_per_instance

An integer representing the maximum number of samples per instance.

timebasedFilter

Double

Corresponds to DDS TimeBasedFilterQosPolicy.minimum_separation

A float representing a time in seconds.

transportPriority

Int

Corresponds to DDS TransportPriorityQosPolicy.value.

An integer representing the transport priority.

writerDataLifecycleAutodispose

Boolean

Corresponds to DDS WriterDataLifecycleQosPolicy.autodispose_unregistered_instances

consumer.exceptionHandler

String

The reference to an ExceptionHandler bean that will handle exceptions produced by the consumer. See Error Handling for more details.

consumer.bridgeErrorHandler

Boolean

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while reading on DDS will now be processed as a message and handled by the routing Error Handler. See Error Handling for more details.

Content of Camel Exchanges

Each Camel Exchange produced by the DDS endpoint contains the data as body of the IN message. The data has for its type the Java class representing the Topic Type.

Note that this data might be 'invalid' ( i.e. with some members not set, or with dummy values), if the instance state changes. See the Vortex OpenSplice reference guide for more details (ReaderDataLifecycleQos chapter). This can be checked with the SampleInfo.valid_data boolean (see below).

The IN message contains the following information as headers:

Name Type Description

DDS_SAMPLE_INFO

DDS.SampleInfo

The SampleInfo object read by the DDS Reader with the data.

DDS_DISPOSE

com.adlinktech.gateway.camelospl.DdsDisposeHeader

This header is present only if the instance was disposed. In such a case it’s set either to DISPOSE if the body of the message contains invalid data, or to WRITEDISPOSE if the body of the message contains the last valid data sent by the publisher before the disposal.

Example of route

Below is an example of a simple Camel route definition from a DDS endpoint to a Processor displaying the data:

final String fromURI =
   "dds:ChatMessage:0/Chat.ChatMessage?ReliabilityKind=RELIABLE&Partition=ChatRoom";
final CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
  public void configure() {
    // from DDS endpoint to a Processor displaying the received message.
    from(fromURI).process(new Processor() {
       public void process(Exchange e) {
          Chat.ChatMessage msg = (Chat.ChatMessage) e.getIn().getBody();
          String content = msg.content;
          System.out.println(content);
       }
    });
  }
});

ctx.start();

In this example the DDS endpoint subscribes to the 'ChatMessage' topic in DDS Domain '0' using DDS partition 'ChatRoom'. The topic Reliability QoS is set to 'RELIABLE'.

Error Handling

Errors in Producer (i.e. on DDS DataWriter usage)

If an error occurs in camel-ospl Producer when it writes on DDS, a com.prismtech.gateway.camelospl.DdsException is thrown internally and caught by the Camel routing engine. The engine store this exception in current Exchange’s properties and continue with routing.

You can handle such error in the usual Camel way, as described here: http://camel.apache.org/error-handling-in-camel.html

The com.prismtech.gateway.camelospl.DdsException class offers the following method to get more details on the error:

  • public int getRetcode() : to get the DDS "return code" that identify the reason of error.

  • public String getMessage() : to get the OpenSplice’s error message.

Example of Producer’s error handling on a route:

from("...")                            // from any endpoint
   .onException(DdsException.class)     // catch error and process it
      .process(new Processor()
          {
             @Override
             public void process(Exchange exchange) throws Exception
             {
                // the caused by exception is stored in a property on the exchange
                DdsException exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
                      DdsException.class);
               System.out.println("Error writing to DDS: (code="
                  + ddsException.getRetcode() + ") "
                  + ddsException.getMessage());
             }
          })
      .end()
  .to("dds:...")     // to a DDS endpoint

Errors in Consumer (i.e. on DDS DataReader usage)

As explained in Apache Camel FAQ, the errors occurring in a Consumer before the Exchange is created are caught by an ErrorHandler which by default is the Camel’s LoggingExceptionHandler. This handler will only log the exceptions and then ignore them.

Similarly to the Camel’s File component, the camel-ospl component offers 2 options to handle errors that may occur in Consumer (i.e. on DDS DataReader usage):

  • consumer.exceptionHandler option allows to configure a custom ExceptionHandler.

    Example of usage:

    // A custom handler.
    // Note that it must be registered as a bean in Registry to be referred in URI.
    private static class MyExceptionHandler
       implements ExceptionHandler
    {
       @Override
       public void handleException(Throwable exception)
       {
          handleException(exception.getMessage(), exception);
       }
    
       @Override
       public void handleException(String message, Throwable exception)
       {
          handleException(message, null, exception);
       }
    
       @Override
       public void handleException(String message, Exchange exchange, Throwable exception)
       {
           // ... handle exception ...
       }
    }
    
    //  Route definition, referring the custom handler bean
    from("dds:ChatMessage:0/Chat.ChatMessage?consumer.exceptionHandler=#myExceptionHandler")
       .to("...");
  • consumer.bridgeErrorHandler option, when set to true, instructs Camel to handle any error in Consumer and to create and route an Exchange containing the exception.

    Example of usage:

    from("dds:ChatMessage:0/Chat.ChatMessage?consumer.bridgeErrorHandler=true")
       .onException(DdsException.class)
          .process(new Processor()
          {
             @Override
             public void process(Exchange exchange) throws Exception
             {
                // the caused by exception is stored in a property on the exchange
                DdsException exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
                      DdsException.class);
                // ... handle exception ...
             }
          })
          .stop()
      .end()
      .to("...");

Dynamic DDS QoS changes

The camel-ospl component offers the possibility to change at runtime some of the DDS QoS which are used by the Vortex OpenSplice entities (Publisher, Subscriber, DataWriter or DataReader).

Publisher / DataWriter changeable QoS

The Publisher and DataWriter DDS entities are managed by a com.adlinktech.gateway.camelospl.DdsProducer class which implements the org.apache.camel.Producer interface.

The following operations can be used at runtime to change some of the DDS QoS used for subsequent Camel exchanges:

  • public void changeDeadlinePeriod(double period) throws DdsException;

  • public void changeLatencyDuration(double duration) throws DdsException;

  • public void changeOwnershipStrength(int strength) throws DdsException;

  • public void changeTransportPriority(int priority) throws DdsException;

  • public void changeLifespanDuration(double duration) throws DdsException;

  • public void changePartition(String partitionStr) throws DdsException;

Example of usage:

final String toURI =
   "dds:ChatMessage:0/Chat.ChatMessage?Partition=ChatRoom-1";
final CamelContext ctx = new DefaultCamelContext();
final DdsProducer toProducer =
(DdsProducer) ctx.getEndpoint(toURI).createProducer();
ctx.addRoutes(new RouteBuilder() {
   public void configure() {
      from("...") // from any endpoint
      .process(toProducer); // equivalent to: .to(toUri);
   }
});

// ... later, change the partition used by the DdsProducer's entities
toProducer.changePartition("ChatRoom-2");

Subscriber / DataReader changeable QoS

The Subscriber and DataReader DDS entities are managed by the class com.adlinktech.gateway.camelospl.DdsConsumer which implements the org.apache.camel.Consumer interface.

The following operations can be used at runtime to change some of the DDS QoS used for subsequent Camel exchanges:

  • public void changeDeadlinePeriod(double period) throws DdsException;

  • public void changeLatencyDuration(double duration) throws DdsException;

  • public void changeTimebasedFilter(double timebasedFilter) throws DdsException;

  • public void changePartition(String partitionStr) throws DdsException;

Example of usage:

final String toURI =
   "dds:ChatMessage:0/Chat.ChatMessage?Partition=ChatRoom-1";
final CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
   public void configure() {
      from(fromURI)
      .routeId("ChatRoute") //give an ID to the Route for later retrieval
      .process(new Processor() {
         public void process(Exchange e) {
            Chat.ChatMessage msg = (Chat.ChatMessage)e.getIn().getBody();
            String content = msg.content;
            System.out.println(content);
         }
      });
   }
});

// ... later, change the partition for the DdsConsumer's entities
DdsConsumer ddsConsumer =
   (DdsConsumer) ctx.getRoute("ChatRoute").getConsumer();
ddsConsumer.changePartition("ChatRoom-2");

Per-message changes

It is also possible to change some of the Producer QoS on-the-fly for a single message. To do this, add the following headers to the Camel message sent to a DDS Producer:

Header name Header value

dds.ownershipStrength

An integer representing the ownershipStrength value to be used for this message.

dds.transportPriority

An integer representing the transportPriority value to be used for this message.

dds.lifespanDuration

A float representing the lifespanDuration value (in seconds) to be used for this message.

Example of usage:

// A route publishing Alarms to DDS and changing the
// transportPriority QoS depending the alarm level
from("...")
.choice()
   .when().groovy("request.body.alarmLevel == 'LOW'")
      .setHeader("dds.transportPriority", constant(1))
   .when().groovy("request.body.alarmLevel == 'HIGH'")
      .setHeader("dds.transportPriority", constant(3))
.endChoice()
.to("dds:Alarms:0/com.adlinktech.demo.AlarmType");

Camel Messages mode

This mode allows Camel Messages to be exchanged via DDS. This includes the Message’s headers, attachments and body, providing they contain only Serializable Java objects.

This mode is activated when you don’t specify a topicClass in the endpoint URI.

Below is an example of Camel Messages mode usage, with a Java String as the message body:

final String fromURI = "dds:ExampleStrTopic:0/?target=stringTarget";
final CamelContext ctx = new DefaultCamelContext();
Endpoint endpoint = ctx.getEndpoint(fromURI);
// Define the route from DDS endpoint to a Processor
// displaying the received data.
ctx.addRoutes(new RouteBuilder() {
   public void configure() {
      from(fromURI)
      .process(new Processor() {
         public void process(Exchange e) {
            System.out.println((String)e.getIn().getBody());
         }
      });
   }
});

// create a ProducerTemplate
ProducerTemplate template = ctx.createProducerTemplate();

// use ProducerTemplate to send Exchange to DDS endpoint
template.send(endpoint, new Processor() {
   public void process(Exchange exchange) {
      exchange.getIn().setBody("Hello World!");
   }
});

The 'target' option used in the URI is specific to the Camel Messages mode. This extra value is sent through DDS with the Camel message and allows consuming routes to filter messages according to this 'target' value.

DDS Polling Consumer

The Vortex OpenSplice endpoint provides a DDSPollingConsumer implementation allowing it to wait for incoming DDS data. Hence the caller can poll for messages when it is ready.

The DDSPollingConsumer implements the three polling methods of the Camel PollingConsumer interface in addition to three other polling methods that allow the caller to specify some DDS specific options in order to select the set of data to be read.

Method Name Description

receive()

Waits until a message is available

receive(Map<String, Object> options)

blocking forever.

receive(long timeout)

Waits up to the given timeout and

receive(Map<String, Object> options, long timeout)

within the given timeout.

receiveNoWait()

Reads a message immediately without

receiveNoWait(Map<String, Object> options)

is received.

The 'options' parameter can express the states of the DDS messages to be read as well as a condition that allows it to read the messages with content-based filtering. The options map keys correspond to the headers names described in the table in DynamicPollEnricher.

Note that the DDSPollingConsumer performs a DDS read() operation and not a take() operation. It means that the data is kept in the DDS middleware and can be read again if necessary.

Below is an example of how to use the DDS PollingConsumer.

   // Getting the DDS topicA endpoint
   Endpoint ddsEndpointA = camelContext.getEndpoint("dds:topicA:0/TypeA");

   // Create PollingConsumer to poll data from topicA
   DynamicPollingConsumer pollConsumer =
      (DynamicPollingConsumer) ddsEndpointA.createPollingConsumer();

   // Setting the polling options : only not read and alive samples

   Map<String, Object> conditions = new HashMap<String, Object>();
   conditions.put("DDS_SAMPLE_STATE", NOT_READ_SAMPLE_STATE.value);
   conditions.put("DDS_INSTANCE_STATE", ALIVE_INSTANCE_STATE.value);

   long wait_timeout = 3000; // milliseconds

   // Polling for topicA messages with the given options
   Exchange e = pollConsumer.receive(conditions, wait_timeout);
   if (e != null) {
      // to process the message
   } else {
      // TIMEOUT
   }

Additional Apache Camel Processors

DynamicPollEnricher

Apache Camel already provides a pollEnrich pattern (see http://camel.apache.org/content-enricher.html) allowing in a Camel route to enrich an incoming message with data from a PollingConsumer.

For Camel versions 2.15 and older, the pollEnrich pattern is not able to extract data from current exchange to influence the polling ( e.g. to poll data with the same key as in the received message). See the note in http://camel.apache.org/content-enricher.html.

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. See more details at Message Endpoint http://camel.apache.org/message-endpoint.html. However, the usage of dynamic endpoints is not appropriate in our case, because it would force the creation of a DataWriter for each polling query.

The Camel DDS component have therefore introduced a new class com.adlinktech.gateway.camelext.DynamicPollEnricher which can be use as a Camel Processor. This Processor implements a missing feature in pollEnricher, passing incoming messages' headers to a specific implementation of PollingConsumer for DDS endpoint creating a DDSQueryCondition according to the headers' options and calling DataReader.read_w_condition() .

Note that the DynamicPollEnricher uses a non blocking operation of the PollingConsumer to receive the messages. If no message is available yet, it returns immediately with a null exchange.

By default the DynamicPollEnricher class uses the reply from PollingConsumer as outgoing message. But another AggregationStrategy can be set to merge the reply with original incoming message. Use the operation PollingConsumer.setAggregationStrategy() to set another strategy.

The accepted header options are:

Name Type

Description

DDS_SAMPLE_STATE

int

A mask of matching SampleStates.

Default value is DDS.ANY_SAMPLE_STATE.value

DDS_VIEW_STATE

int

A mask of matching ViewStates.

Default value is DDS.ANY_VIEW_STATE.value

DDS_INSTANCE_STATE

int

A mask of matching InstanceStates.

Default value is DDS.ANY_INSTANCE_STATE_value

DDS_QUERY_EXPRESSION

String

A query expression.

Default value is null (meaning a ReadCondition is used instead of a QueryCondition ).

DDS_QUERY_PARAMETERS

String[]

An array of query parameters.

Default value is an empty array.

Below is an example of the DynamicPollEnricher class:

// Some DDS endpoints. Messages come from topicA and have to be enriched
// by matching data from topicB.
Endpoint ddsEndpointA = camelContext.getEndpoint("dds:topicA:0/TypeA");
Endpoint ddsEndpointB = camelContext.getEndpoint("dds:topicB:0/TypeB");

// Create a DynamicPollEnricher polling from topicB
DynamicPollEnricher pollEnricher = new
   DynamicPollEnricher("dds:topicB:0/TypeB");

// Set custom strategy, creating a TypeC from a TypeA + TypeB
pollEnricher.setAggregationStrategy(new AggregationStrategy() {
   @Override
   public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
   {
      // get TypeA and TypeB
      TypeA a = (TypeA) oldExchange.getIn().getBody();
      TypeB b = (TypeB) newExchange.getIn().getBody();
      // create TypeC
      TypeC c = new TypeC(a,b);
      // replace oldExchange's body with TypeC and return it
      oldExchange.getIn().setBody(c);
      return oldExchange;
   }
});

// Route from TopicA topic to the PollEnricher getting a TopicB sample
// with TopicA.index = TopicB.index and aggregating both as a NamedMessage.
from(ddsEndpointA)
// Query expression to get TopicB with index = first param
.setHeader("DDS\_QUERY\_EXPRESSION", constant("index=%0"))
// Query param: use Groovy to get TopicA.index and create String array
.setHeader("DDS\_QUERY\_PARAMETERS")
.groovy("[request.body.index.toString()]")
// send message with headers to pollEnricher
.process(pollEnricher)
// now display resulting message (TypeC is expected)
.process(new Processor() {
   public void process(Exchange e) {
      System.out.println((TypeC)e.getIn().getBody());
   }
});

For another example of DynamicPollEnricher usage, look at Vortex Gateway’s examples/camel-example-dds code. It contains an example of a route from a Circle topic, polling correlated data from a Square topic of the same color and computes the average position between Circle and Square and publishes a Triangle topic in that position.