The native protocol version 4 introduces a new feature called Custom Payloads.
According to the protocol V4 specification, custom payloads are generic key-value maps
where keys are strings and each value is an arbitrary sequence of bytes. Currently payloads
can be sent by clients along with all QUERY
, PREPARE
, EXECUTE
and BATCH
requests.
Custom payloads can be used to convey user-specific information from clients to servers and vice versa. Please note that this is an advanced feature that requires specific server-side configuration (see below).
What the server is supposed to do with a payload depends on the QueryHandler used server-side
to decode requests. Unsurprisingly, the default QueryHandler
implementation used by Cassandra
simply ignores all payloads. However, users can replace it with their own implementation,
thus being able to to process user-specific payloads; the payload format as well as
the serialization and deserialization logic is entirely left for clients to implement.
Needless to say, the same encoding and decoding logic should be applied on both client
and server endpoints.
This section is a small how-to to help driver users getting started with custom payload processing server-side. For more detailed information, please refer to the Cassandra documentation itself.
To enable a custom QueryHandler
for a Cassandra node, its JVM should be
started with the system property cassandra.custom_query_handler_class
set
to a fully qualified name of a class implementing org.apache.cassandra.cql3.QueryHandler
.
This class should be of course available on the node's classpath.
This can be achieved with the following steps:
- Add your implementation jar to the
CLASSPATH
environment variable; - Add the following to
$CASSANDRA_CONF/cassandra-env.sh
:
JVM_EXTRA_OPTS="-Dcassandra.custom_query_handler_class=fully.qualified.name.of.MyQueryHandler"
Of course, all nodes in the cluster must be started with the same QueryHandler, otherwise payloads sent by the driver could get lost.
Payloads in the Java driver are represented as Map<String,ByteBuffer>
instances.
It is safe to use any Map
implementation, including unsynchronized implementations
such as java.util.HashMap
; the driver will create defensive, thread-safe copies of
user-supplied maps. However, ByteBuffer
instances are inherently mutable,
so callers should take care not to modify the ByteBuffer
values in a payload map after submitting it
to the driver as it could lead to unexpected results.
Note that, for thread safety reasons, the Java driver does not permit null
keys nor null
values in a payload map;
including a null
in your payload will result in a NullPointerException
being immediately thrown.
However, the protocol specification does allow null
values. If you need to include
a null
value in your payload map, this can be achieved with the Java driver
by using the special value Statement.NULL_PAYLOAD_VALUE
.
- Payload maps cannot have more than 65,535 entries.
- Payload map values are limited to a maximum length of
Integer.MAX_VALUE
(231 − 1) bytes each.
A custom payload can be attached to any Statement
via Statement.setOutgoingPayload()
.
Once the payload is set, you can either prepare the statement or execute it;
in both cases, the payload will be sent along with the PREPARE
or QUERY
request respectively:
Statement statement = new SimpleStatement("SELECT foo FROM bar WHERE qix = 1");
statement.setOutgoingPayload(myCustomPayload);
session.execute(statement); // myCustomPayload will be sent with QUERY request
session.prepare(statement); // myCustomPayload will be sent with PREPARE request
Naturally, you can also set a payload when using the QueryBuilder
API:
Statement statement = QueryBuilder.select("c2").from("t1").where(eq("c1", 1)).setOutgoingPayload(myCustomPayload);
session.execute(statement); // myCustomPayload will be sent with EXECUTE request
When sending payloads with batches, please note that payloads must be attached to the
BatchStatement
itself, not to internal statements:
Statement statement = new SimpleStatement("INSERT INTO t1 (c1, c2) values ('foo', 'bar')")
// correct
Statement batch = new BatchStatement().add(statement);
batch.setOutgoingPayload(myCustomPayload);
session.execute(batch); // myCustomPayload will be sent with BATCH request
// wrong
statement.setOutgoingPayload(myCustomPayload);
Statement batch = new BatchStatement().add(statement);
session.execute(batch); // myCustomPayload will NOT be sent with BATCH request
One important note about paging requests: if your query needs to paginate, any outgoing payload set on the executed statement will be transparently sent over and over for every new page request.
Statement statement = new SimpleStatement("...");
statement.setOutgoingPayload(myCustomPayload);
ResultSet rows = session.execute(...); // myCustomPayload will be sent with first QUERY request
for (Row row : rows) {
// if additional QUERY requests are sent, all of them will send the same payload
}
And finally, note that custom payloads can only be used with protocol versions >= 4; trying to set a payload under lower protocol versions will result in an UnsupportedFeatureException (wrapped in a NoHostAvailableException) when the request is encoded.
If you want to defensively protect your code against these errors, you can either:
- Force the protocol version when creating your
Cluster
instance:
Cluster cluster = Cluster.builder()
.addContactPoint("...")
.withProtocolVersion(ProtocolVersion.V4)
.build();
- Inspect the current negotiated protocol version, and only send payloads if version is equal to or greater than 4:
ProtocolVersion myCurrentVersion = cluster.getConfiguration()
.getProtocolOptions()
.getProtocolVersion();
if (myCurrentVersion.compareTo(ProtocolVersion.V4) >= 0) {
// only send custom payloads if current protocol version supports it
statement.setOutgoingPayload(myCustomPayload);
}
Custom payloads sent back by the server can be retrieved in the following ways:
- From a
ResultSet
: use theExecutionInfo
class to retrieve the payload sent by the server.
Statement statement = new SimpleStatement("...");
ResultSet rows = session.execute(...);
// last payload sent by the server
Map<String,ByteBuffer> serverPayload = rows.getExecutionInfo().getIncomingPayload();
If your query required pagination (multiple QUERY
requests),
the above method would only return the last payload sent by server
with its last RESULT
response. If you need to retrieve all the
payloads for all RESULT
responses, use the following method instead:
Statement statement = new SimpleStatement("...");
ResultSet rows = session.execute(...);
//all payloads sent by the server
for (ExecutionInfo info : rows.getAllExecutionInfo()) {
Map<String,ByteBuffer> serverPayload = info.getIncomingPayload();
}
- From a
PreparedStatement
: to retrieve the payload that the server sent back with itsPREPARED
response, use the following method:
PreparedStatement ps = session.prepare(...);
Map<String,ByteBuffer> serverPayload = ps.getIncomingPayload();
If you use either Session.prepare(RegularStatement)
or Session.prepareAsync(RegularStatement)
to prepare a statement, as with all other properties in the given statement, its outgoing
payload, if any, will become the default outgoing payload for the prepared statement.
Statement statement = new SimpleStatement("...");
statement.setOutgoingPayload(myCustomPayload)
PreparedStatement ps = session.prepare(...); // myCustomPayload will be sent with PREPARE request
// AND become the default outgoing payload
Naturally, you can also set a default outgoing payload on the PreparedStatement
instance itself.
The code below is roughly equivalent to the one above - except that the payload is not sent
with the PREPARE
request:
Statement statement = new SimpleStatement("...");
PreparedStatement ps = session.prepare(...); // no payload sent with PREPARE request
ps.setOutgoingPayload(myCustomPayload) // default outgoing payload is now myCustomPayload
But PreparedStatement
has also a getIncomingPayload()
method, which, as we already know,
can be used to retrieve incoming payloads sent back by the server that prepared the request.
When binding a statement - with either PreparedStatement.bind()
or PreparedStatement.bind(Object...)
) -
BoundStatement
instances will also inherit payloads.
To determine which payload is going to be used as outgoing payload for a bound statement, the following rules apply:
- If the prepared statement has a non-null outgoing payload, all instances of
BoundStatement
created out of it will inherit that payload as their outgoing payload; - Otherwise, if the prepared statement has a non-null incoming payload, all instances of
BoundStatement
created out of it will inherit that payload as their outgoing payload; - Otherwise, all instances of
BoundStatement
created out of it won't have any outgoing payload set by default; users can however set an outgoing payload on individualBoundStatement
instances by calling theirsetOutgoingPayload()
method, like in the example below:
Statement statement = new SimpleStatement("...");
PreparedStatement ps = session.prepare(...); // no payload sent with PREPARE request
BoundStatement bs = ps.bind(1); // no outgoing payload by default
bs.setOutgoingPayload(myCustomPayload); // outgoing payload for this specific statement
session.execute(statement); // myCustomPayload will be sent with EXECUTE request for this bound statement only
When debugging custom payloads, set the com.datastax.driver.core.Message
logger to the TRACE
level, e.g. with Log4J:
<logger name="com.datastax.driver.core.Message">
<level value="TRACE"/>
</logger>
This will log a message for every request and every response that contains a non-null payload. The log message contains a pretty-printed version of the payload itself, and its total length in bytes.