Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Change OpIdentifier to RestartableState #2263

Merged
merged 3 commits into from
Jan 3, 2024
Merged

refactor: Change OpIdentifier to RestartableState #2263

merged 3 commits into from
Jan 3, 2024

Conversation

chubei
Copy link
Contributor

@chubei chubei commented Dec 22, 2023

As discussed in #2212, we allow the connector to use arbitrary data to record its "state". The state can be sent along CDC events and the connector should respect the state to start after.

@Jesse-Bakker
Copy link
Contributor

I'm worried about the memory use for connectors that need to keep a lot of state. For example, for object_store, we'd have to keep track of all the files we've already ingested from (and how many rows from each, if we want to support appending to files). That's ~32 bytes per file name, let's say 1000 files, which adds a 32 KB allocation per message. Messages/records themselves are typically much smaller than that, so that's potentially a lot of overhead.

What we could do instead is, for example, make the ingestor aware of when to checkpoint and add a closure parameter to handle_message() to get the state at that point in time:

ingestor.handle_message(
    IngestionMessage::OperationEvent{0, Insert},
    || -> State{state.clone()})

or make the state parameter a reference with impl Serialize, so it's only serialized (and cloned) if it's needed and discarded otherwise.

@chubei
Copy link
Contributor Author

chubei commented Dec 22, 2023

The worry is legit. Current implementation is not good.

But I don't like using a closure either. Adding a closure parameter makes code more difficult to read.

I'm leaning towards changing the Connector to a pull model, i.e., instead of outputting toIngestor in Connector::start, we call Connector::next_message to fetch a message, and call Connector::get_restartable_state_opt to checkpoint its state.

@Jesse-Bakker
Copy link
Contributor

A pull-based model would make connectors act like streams (async iterators), which could be nice. Downside: connectors would have to keep track of more non-persistent state (they need to keep a "current" batch and iterate over that), instead of just dumping the entire batch into a sink and being rate-limited using back-pressure on the channel. We could also allow the pull to receive a batch instead of only a single item, if that's how the connector works.

Something like:

trait Connector {
    async fn start(&self, state: State) -> impl IngestionStream{}
}
enum ConnectorState {
    Snapshotting,
    Streaming(State),
}

trait IngestionStream {
    async fn next(&self) -> IngestionBatch;
    async fn state(&self) -> ConnectorState;
}

@chubei
Copy link
Contributor Author

chubei commented Dec 22, 2023

Regarding the state management inside connectors, keeping batch index and offset within batch is what they do now anyway. That's why OpIdentifier was two u64s.

@karolisg
Copy link
Contributor

karolisg commented Jan 3, 2024

@chubei @Jesse-Bakker Can we get this merged? For postgres to continue ingestion I need to know replication slot name (now it changes on every dozer start). I see the problem with this implementation that we have unnecessary overhead, but maybe we can solve in a different PR?

@chubei
Copy link
Contributor Author

chubei commented Jan 3, 2024

@chubei @Jesse-Bakker Can we get this merged? For postgres to continue ingestion I need to know replication slot name (now it changes on every dozer start). I see the problem with this implementation that we have unnecessary overhead, but maybe we can solve in a different PR?

I'll merge it to unblock progress. @karolisg can you approve it?

@chubei chubei enabled auto-merge January 3, 2024 07:25
@chubei chubei added this pull request to the merge queue Jan 3, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 3, 2024
@chubei chubei enabled auto-merge January 3, 2024 07:51
@chubei chubei added this pull request to the merge queue Jan 3, 2024
Merged via the queue into getdozer:main with commit 0455c07 Jan 3, 2024
4 checks passed
@chubei chubei deleted the refactor/state branch January 3, 2024 08:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants