Persist and restore pipeline state #1834
Closed
chubei
started this conversation in
Feature Requests
Replies: 1 comment 3 replies
-
I'm assuming that when record store is loaded, records need to be looked up by a pk or something. How does this work ? Since record store is append only, it will need compaction at some point and we will have to store every operation. Is my assumption right ? |
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Persist and restore pipeline state
Pipeline state includes two parts:
Record store
Record store will be persisted in an incremental fashion. To enable this, we require the record store to be append only.
The record store is essentially a
Arc<RwLock<Vec<Record>>
. We serialize a slice of theVec
on every commit and upload to S3.During restore, we download all the slices and concatenate them.
Processor state
Processor state can be arbitrary data and is usually not append only, so we persist it fully on every commit.
Every processor is responsible for serializing and deserializing its own state.
Persisting order
Because the record store is shared between all processors, and any processor can append records to the store, we need to make sure that the record store is persisted after all processors.
To achieve this, we'll add a
finalize_epoch
method to theEpochManager
. All sinks will call this method on every commit. TheEpochManager
will persist the record store after all sinks have calledfinalize_epoch
.It doesn't matter if records belonging to the next epoch are appended before epoch is finalized, because it won't harm if the record store has more records than necessary.
Organizing checkpoints
The checkpoints of a pipeline looks like following:
Because we always persist processor states before the record store, all
.bin
files must exist if the corresponding.slice
file exists.Additional tweaking
Persisting doesn't need to happen on every commit. We can let
EpochManager
decide if persisting should happen during this commit by checking the record store size and the duration since last persisting.@mediuminvader @snork-alt @v3g42
Beta Was this translation helpful? Give feedback.
All reactions