-
Notifications
You must be signed in to change notification settings - Fork 16
PersistentQueue design
Similar to the operator onBackpressureBufferToFile
want to provide an operator persist
used like below (would actually use a builder but this gives the idea). The delivery principle used is at least once.
This operator is being written because I've often wanted fault-tolerant efficient single consumer messaging that doesn't involve other processes and network calls. Instead of setting up JMS/Kafka infrastructure the logic can be self contained with no setup overhead.
Flowable<T> flowable = ...;
DataSerializer<T> serializer = ...;
File directory = ...;
String prefix = ...;
long pageSizeBytes = ...;
Callable<Long> sinceTime = ...;
long lifetimeMs = ...
long forceEvery = 0; //use memory-mapping unforced
Flowable<T> persistedFlowable = flowable.compose(
FlowableTransformers.persist(
serializer,
directory,
prefix,
pageSizeBytes,
sinceTime,
forceEvery,
lifetimeMs,
scheduler
));
All emissions from flowable
would be persisted to a file-based queue and on subscription all records that were persisted at timestamp after sinceTime.call()
would be returned.
- The queue would survive application restarts/crashes reliably (memory-mapped files would not be used because file corruption could occur if the box crashed during the os` persistence phase)
- Queues are accessed in the style of Kafka. When you read from a queue you don't mark items as read with the queue but rather always start reading from the queue from a timestamp that guarantees for you that you won't miss any messages. This implies at least once delivery as opposed to exactly once delivery that would be expected from a JMS queue.
- Queues are local (not designed for access over network)
- Only one subscription should be acting on a file queue at a time (but you can serialize multiple producers yourself and fan-out to multiple subscribers yourself around that single subscription).
Actions are:
- load write page number and position on startup
- establish read number and position (using binary search in the index files)
- start read from
sinceTime
- read next from queue
- write to queue
- recover space for reuse based on an
lifetimeMs
The messages on the queue are stored in pages which are fixed size files named as below:
<prefix>-1.page
<prefix>-2.page
<prefix>-3.page
...
Each page has a corresponding index file that has fixed length 12 byte records containing the arrival timestamp (8 bytes) and position (4 bytes) of messages in the page file:
<prefix>-1.idx
<prefix>-2.idx
...
Queue metadata is captured as follows:
<prefix>-writeMeta
for the queue is a small file that records
-
position
(int) -
pageNumber
(int)
<prefix>-readMeta
for the queue is another small file that records
-
firstReadPageNumber
(int)
<prefix>-allMeta
for the queue is another small file read on startup only that records
-
pageSizeBytes
(int)
setup stuff for write (do before preparing read and write positions):
if allMeta does not exist then create it with the current size of `pageSizeBytes`
writePosition = writeMeta.position
while 4 bytes at writePosition > 0
writePosition = nextWritePosition();
messageStart = writePosition;
arrivalTime = currentTime();
write zero length; (ordered write so can't be reordered with next write)
write content;
write length at messageStart (volatile write for read visibility);
writeMeta.position = position;
append (arrivalTime, messageStart) to index;
if (forceEvery > 0 and --counter == 0) {
writeMeta.force();
counter = forceEvery;
}
writePosition = position;
find index number between startPageNumber to writePageNumber with latest first record timestamp less than
or equal to `sinceTime` (use binary search);
if last timestamp in index less than `sinceTime` then
use next page and its index;
find first time in index greater than or equal
to `sinceTime` using binary search;
start reading from page at position found in index;
if readPosition = pageSizeBytes then
if readPage < writePage
readPage = nextPage
readPosition = 0
else
return NO_MESSAGE
length = read at readPosition 4 bytes
if length == 0
return NO_MESSAGE
else
read content
advance read position
return content;
The primary scenario to consider for failure is an error or a crash during write (either of the process or the whole machine).