This document describes the high-level protocol and handshaking that is used by Chronicle Queue Enterprise to replicate data using TCP/IP from queues that are on different machines. Chronicle Queue Enterprise is a licenced product; if you are interested in obtaining more information about this please contact [email protected]
Chronicle Queue Enterprise sends its messages in binary wire format (see net.openhft.chronicle.wire.BinaryWire
);
this is a binary form of YAML
.
In order to make the messages human-readable in this document, the messages are shown in text wire format (see
net.openhft.chronicle.wire.TextWire
).
Chronicle Queue Enterprise replication uses an UberHandler. UberHandlers act as message routers, They are serialised locally, and then sent to the remote host using TCP/IP. The serialised form of UberHandler is shown below:
--- !!data #binary
handler: !net.openhft.chronicle.network.cluster.handlers.UberHandler {
remoteIdentifier: 1,
localIdentifier: 2,
wireType: !WireType BINARY_LIGHT,
clusterName: global
}
When the Uberhandler is loaded by the remote machine, its onInitialize()
method is automatically called (see
net.openhft.chronicle.network.cluster.handlers.UberHandler#onInitialize
).
Note
|
When sending the Uberhandler, only its data is serialised. The Java byte code of the Uberhandler must already
exist on the remote machine. Only the data of the Uberhandler instance is sent, not the byte code for the Uberhandler
class itself. This is because automatically running any arbitrary code using the onInitialize() method remotely could
lead to security concerns.This is also true for any SubHandler (see net.openhft.chronicle.network.api.session.SubHandler ) which also has an
onInitialize() method. This is covered in more detail below.
|
The Uberhandler is used to route messages to SubHandlers based upon the csp
, or the cid
which is just an alias to the csp
.
The reason that we set up a cid
is that a cid
is numeric, and as such is limited (using BinaryWire) to 8 bytes (long
type).
A csp
is a string field which can be much larger. Once the relationship between the csp
and its alias cid
is established, then only the more compact cid
will be sent.
To summarise;
-
first, the Uberhandler is sent, which is used to route the subHandler messages.
-
then, any number of SubHandlers are sent. There is a relationship that is maintained by the Uberhandler, between the SubHandler and its
cip
/cid
used for routing purposes.
The message shown below sets up the following relationship between the csp: /
also aliased as cid: !int 1671070996
and its heartbeat handler`, so that any message sent with cid: !int 1671070996
will be received by this heartbeat handler instance running on the remote machine.
--- !!meta-data #binary
csp: /
cid: !int 1671070996
--- !!meta-data #binary
handler: !net.openhft.chronicle.network.cluster.handlers.HeartbeatHandler {
heartbeatTimeoutMs: !int 40000,
heartbeatIntervalMs: !short 30000
}
Heartbeat handlers are set up on both the source, and the sink machines. Periodically (set using the heartbeatIntervalMs
field) they send a heartbeat message to their corresponding remote host. If the heartbeatTimeoutMs
has passed and no heartbeat message has been received, then the heartbeat handler will close the socket connection and attempt to reconnect.
Chronicle Queues are replicated using the SourceReplicationHandler
, which sends its messages to the SinkReplicationHandler
. Both the SourceReplicationHandler
and SinkReplicationHandler
are SubHandlers.
However, handshaking messages are sent back from the SinkReplicationHandler
to the SourceReplicationHandler
. One such message is the acknowledgement message which the sink sends the source. By inspecting the idx: 0x451600000000
(see example message below) this allows you to add code on your source machine to ensure that the message was received by the remote machine, and therefore successfully replicated.
--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
idx: 0x451600000000
ns: 10849029994071
The Sink and Source handlers are as follows:
from |
to |
msg sent |
source |
sink |
|
sink |
source |
|
The source-side of the connection sends a SinkReplicationHandler
to be run on the remote sink host.
--- !!meta-data #binary
csp: /replication/out-1/2
cid: 292323922173575
--- !!data #binary
handler: !SinkReplicationHandler {
queueName: out-1,
wireType: BINARY_LIGHT,
acknowledgement: true,
nextIndexRequired: 0x451600000001,
sourceId: !short 1002
}
The sink-side of the connection will respond by setting up a SourceReplicationHandler
to be run on the remote source host.
--- !!meta-data #binary
csp: /replication/out-1/2
cid: 292323922173575
--- !!data #binary
handler: !SourceReplicationHandler {
queueName: out-1,
wireType: BINARY_LIGHT,
acknowledgement: true,
nextIndexRequired: 0x0,
sourceId: !short 1002
}
Whenever your application appends data to the source queue, the SourceReplicationHandler
will read this queue using a queue tailer, and then immediately stream any new data to the remote host.
Chronicle Queue Enterprise establishes a stream rather than a polling protocol. If the network buffers are full, then data will not be sent by the SourceReplicationHandler
. Therefore, it is not strictly reactive, but rather, it is sensitive to push back.
Chronicle Queue Enterprise uses queues which page data to disk, rather than holding it all in memory. Therefor Chronicle Queue will not get saturated by a slow consumer, because the data is not paged into memory from the queue until the TCP/IP buffers have sufficient free space.
Before the sink replication handler starts to read messages from the source machine, it first copies back messages from the sink machine, to the source machine; this is called the back copy.
Although rare, it is useful for example, if the source machine was replicating to two (or more) sinks, and the source suffered a power outage.
Chronicle Queue Enterprise will fail-over to one of the remaining sinks, and therefore we need to ensure that, whichever sink is chosen, it has the latest messages.
Therefore, in the event that one of the sinks has more messages than the other, we will first copy any messages from the other sink before we establish this sink as our new source.
When the ServiceSinkReplicationHandler
starts, it calls software.chronicle.enterprise.queue.replication.SinkReplicationHandler#onInitialize
.
When all the data has been replicated, an END_OF_STREAM
message is sent to notify the SourceReplicationHandler
that the back copy is complete.
--- !!meta-data #binary
cid: 573798926109737
--- !!data #binary
eos: !!null "" # END_OF_STREAM
The sink replication handler then receives new messages from the SourceReplicationHandler. When it receives these new messages it uses a Chronicle Queue appender to write them to a Chronicle Queue.
Note
|
When the messages are written to the Chronicle Queue by the SinkReplicationHandler , we write the message and take account of the source index, to guarantee that the order of messages from the source exactly matches the order of messages on the sink. In addition, using the index also ensures that the message has always been written to the correct queue file. This is even if events such has roll-over have occurred. This would typically cause a normal appender to write the message to the next queue file. This is not what we want, because we create an exact copy of messages, from the source to the sink.
|
The SourceReplicationHandler
sends messages to the SinkReplicationHandler
. The SourceReplicationHandler
uses a Chronicle tailer to read new messages from your Chronicle Queue. The messages will be written to the queue by your application logic. When the SourceReplicationHandler
comes to read the contents of this Chronicle queue, it does not de-serialize the message in any way, it treats the message as a blob of bytes, and writes the bytes to the replication event. This is also known as the re
in the message below:
--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
re: < replication-event> # see below
The bytes that make up the replication-event
follow the following format:
public void writeMarshallable(@NotNull WireOut wire) {
@NotNull ValueOut out = wire.getValueOut();
out.int64_0x(index);
out.bytesLiteral(payload);
// nano-timestamp create with the timestamp from the source machine
out.int64(nanoTimeStamp = System.nanoTime());
}
When the message is received by the sink, it sends an acknowledgement to the source:
--- !!meta-data #binary
cid: 292323922173575
--- !!data #binary
idx: 0x451600000000
ns: 10849029994071