Dozer App State Machine #763
Replies: 1 comment 3 replies
-
Why must each source maintain a I do not think we need to keep the whole history. It should be enough to maintain all the operations with a Let's take an example. Let's say cache is at position 5 and |
Beta Was this translation helpful? Give feedback.
-
Dozer App State Machine
We only consider the single sink case now.
We intentionally don't distinguish between
CacheSink
andCache
because that should be an implementation detail irrelevant to this design.State
The state of a node (source, processor, or sink) is a
HashMap
from source node handles to(u64, u64)
, where the firstu64
is the transaction id, the secondu64
the sequence number.We say a state is
None
if theHashMap
is empty,Some
otherwise.State is persisted to disk on commit. A node's state is persisted after all its parent nodes' states are persisted, so if a node has no persisted state, it's guaranteed that all its children have no persisted state.
Persisted states have following invariants:
Some
.1
and the key is the source node handle.Consistency
A (sub)DAG is said to be consistent if and only if for every child node, its state is the union of its parent nodes' state.
We're going to test the consistency of the sub DAG that's generated by removing all the source nodes from the pipeline DAG. Let's call it sourceless DAG.
States
Dozer app has following states:
When the app starts, it's always
Bootstrapping
.State Transfer
Bootstrapping
Read the states of all nodes. If the persisted state can't be found, the node's state is
None
.If any of the source states is
None
, transfer toSnapshotting
.At this point all source states are
Some
.If the DAG is consistent, transfer to
Streaming
.If the sourceless DAG is consistent, transfer to
PopulatingPipeline
.Otherwise transfer to
PopulatingProcessor
.Snapshotting
For every source node, if its state is
None
, run itssnapshot
method and persist state.snapshot
does what current source'sstart
does whenfrom_seq
isNone
, but returns when the snapshotting is done.Transfer to
Bootstrapping
.PopulatingProcessor
Precondition: All sources states are
Some
.Remove all processor's state.
If the sink state is
None
, the sourceless DAG is consistent now. Transfer toPopulatingPipeline
.Otherwise, construct execution DAG by creating source replay nodes, connecting them to processor nodes, and don't connect the sink node.
The replay
from
is set toNone
,to
to the value of the sink state's corresponding value.Run this execution DAG.
Postcondition: Sourceless DAG is consistent.
Transfer to
PopulatingPipeline
.PopulatingPipeline
Precondition: All source states are
Some
. Sourceless DAG is consistent.Construct execution DAG by creating source replay nodes, connecting them to the processor and sink nodes.
The replay
from
is set to the value of the sink state's corresponding value,to
to the source state.Run this execution DAG.
Postcondition: DAG is consistent.
Transfer to
Streaming
.Streaming
Precondition: All source states are
Some
. DAG is consistent.Construct execution DAG by creating source stream nodes, connecting them to the processor and sink nodes.
The stream
from
is set to the value of the source state.Run this execution DAG, until stopped.
Source Replay
Transaction Log
Every source node maintains a
Vec<TransactionLogEntry>
as its state.The persisted source state's only value is the last entry's
to
value.The transaction log entries have following invariants:
If the sink state is
Some
, there exists an entry such thatentry.to
equals to the value of the sink state's corresponding value.Replay
Using the transaction log, we can replay the source node's output operations between states. A source replay node in an execution DAG is defined as following:
When started, the node emits the operations in the transaction log entries one by one.
Source Stream
What current source's
start
does whenfrom_seq
isSome
.@v3g42 @mediuminvader @karolisg @snork-alt
Beta Was this translation helpful? Give feedback.
All reactions