Skip to content

PersistentQueue design

Dave Moten edited this page Feb 17, 2017 · 29 revisions

Similar to the operator onBackpressureBufferToFile want to provide an operator persist used like below (would actually use a builder but this gives the idea). The delivery principle used is at least once.

This operator is being written because I've often wanted fault-tolerant efficient single consumer messaging that doesn't involve other processes and network calls. Instead of setting up JMS/Kafka infrastructure the logic can be self contained with no setup overhead.

Flowable<T> flowable = ...;
DataSerializer<T> serializer = ...;
File directory = ...;
String prefix = ...;
long pageSizeBytes = ...;
Callable<Long> sinceTime = ...;
long lifetimeMs = ...
long forceEvery  = 0; //use memory-mapping unforced

Flowable<T> persistedFlowable = flowable.compose(
  FlowableTransformers.persist(
    serializer,
    directory,
    prefix, 
    pageSizeBytes,
    sinceTime, 
    forceEvery, 
    lifetimeMs,
    scheduler
));

All emissions from flowable would be persisted to a file-based queue and on subscription all records that were persisted at timestamp after sinceTime.call() would be returned.

  • The queue would survive application restarts/crashes reliably (memory-mapped files would not be used because file corruption could occur if the box crashed during the os` persistence phase)
  • Queues are accessed in the style of Kafka. When you read from a queue you don't mark items as read with the queue but rather always start reading from the queue from a timestamp that guarantees for you that you won't miss any messages. This implies at least once delivery as opposed to exactly once delivery that would be expected from a JMS queue.
  • Queues are local (not designed for access over network)
  • Only one subscription should be acting on a file queue at a time (but you can serialize multiple producers yourself and fan-out to multiple subscribers yourself around that single subscription).

Design

Actions are:

  • load write page number and position on startup
  • establish read number and position (using binary search in the index files)
  • start read from sinceTime
  • read next from queue
  • write to queue
  • recover space for reuse based on an lifetimeMs

The messages on the queue are stored in pages which are fixed size files named as below:

<prefix>-1.page
<prefix>-2.page
<prefix>-3.page
...

Each page has a corresponding index file that has fixed length 12 byte records containing the arrival timestamp (8 bytes) and position (4 bytes) of messages in the page file:

<prefix>-1.idx
<prefix>-2.idx
... 

Queue metadata is captured as follows:

<prefix>-writeMeta for the queue is a small file that records

  • position (int)
  • pageNumber (int)

<prefix>-readMeta for the queue is another small file that records

  • firstReadPageNumber (int)

<prefix>-allMeta for the queue is another small file read on startup only that records

  • pageSizeBytes (int)

on startup

setup stuff for write (do before preparing read and write positions):

if allMeta does not exist then create it with the current size of `pageSizeBytes`

writePosition = writeMeta.position

while 4 bytes at writePosition > 0 
    writePosition = nextWritePosition();

write

messageStart = writePosition;

arrivalTime = currentTime();

write zero length; (ordered write so can't be reordered with next write)

write content;

write length at messageStart (volatile write for read visibility);

writeMeta.position = position;

append (arrivalTime, messageStart) to index;

if (forceEvery > 0 and --counter == 0) {
  writeMeta.force();
  counter = forceEvery;
}
writePosition = position;

read-since

find index number between startPageNumber to writePageNumber with latest first record timestamp less than
    or equal to `sinceTime` (use binary search);

if last timestamp in index less than `sinceTime` then 
    use next page and its index;

find first time in index greater than or equal 
    to `sinceTime` using binary search;

start reading from page at position found in index;

read

if readPosition = pageSizeBytes then 
    if readPage < writePage
        readPage = nextPage
        readPosition = 0
    else 
        return NO_MESSAGE

length = read at readPosition 4 bytes
if length == 0
    return NO_MESSAGE
else 
    read content
    advance read position
    return content;

Failure scenarios and verification

The primary scenario to consider for failure is an error or a crash during write (either of the process or the whole machine).

Clone this wiki locally