-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Change OpIdentifier
to RestartableState
#2263
Conversation
I'm worried about the memory use for connectors that need to keep a lot of state. For example, for object_store, we'd have to keep track of all the files we've already ingested from (and how many rows from each, if we want to support appending to files). That's ~32 bytes per file name, let's say 1000 files, which adds a 32 KB allocation per message. Messages/records themselves are typically much smaller than that, so that's potentially a lot of overhead. What we could do instead is, for example, make the ingestor aware of when to checkpoint and add a closure parameter to ingestor.handle_message(
IngestionMessage::OperationEvent{0, Insert},
|| -> State{state.clone()}) or make the state parameter a reference with |
The worry is legit. Current implementation is not good. But I don't like using a closure either. Adding a closure parameter makes code more difficult to read. I'm leaning towards changing the |
A pull-based model would make connectors act like streams (async iterators), which could be nice. Downside: connectors would have to keep track of more non-persistent state (they need to keep a "current" batch and iterate over that), instead of just dumping the entire batch into a sink and being rate-limited using back-pressure on the channel. We could also allow the pull to receive a batch instead of only a single item, if that's how the connector works. Something like: trait Connector {
async fn start(&self, state: State) -> impl IngestionStream{}
}
enum ConnectorState {
Snapshotting,
Streaming(State),
}
trait IngestionStream {
async fn next(&self) -> IngestionBatch;
async fn state(&self) -> ConnectorState;
} |
Regarding the state management inside connectors, keeping batch index and offset within batch is what they do now anyway. That's why |
@chubei @Jesse-Bakker Can we get this merged? For postgres to continue ingestion I need to know replication slot name (now it changes on every dozer start). I see the problem with this implementation that we have unnecessary overhead, but maybe we can solve in a different PR? |
I'll merge it to unblock progress. @karolisg can you approve it? |
As discussed in #2212, we allow the connector to use arbitrary data to record its "state". The state can be sent along CDC events and the connector should respect the state to
start
after.