-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Xmin rep #747
Xmin rep #747
Conversation
flow/workflows/snapshot_flow.go
Outdated
@@ -185,6 +185,7 @@ func (s *SnapshotFlowExecution) cloneTable( | |||
}, | |||
} | |||
|
|||
// TODO handle xmin? if yes, maybe move xmin redirection to QRepFlowWorkflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO I don't know what snapshot_flow
is for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is initial load for CDC. XMIN shouldn't be relevant here
flow/activities/flowable.go
Outdated
bufferSize := shared.FetchAndChannelSize | ||
var wg sync.WaitGroup | ||
|
||
var goroutineErr error = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in question seems like it shouldn't progress when an error is caught earlier on
@serprex should'nt this be |
Wrote this all up while still pretty jet lagged so not sure what's here & what more needs to be done & haven't tested at all
…ng, since test is failing on destination table not being created
37d85ba
to
e51974d
Compare
Yes, which is what's in the code. Fixed description |
Potential followups:
|
flow/workflows/xmin_flow.go
Outdated
@@ -239,7 +238,10 @@ func XminFlowWorkflow( | |||
return fmt.Errorf("xmin replication failed: %w", err) | |||
} | |||
|
|||
state.LastPartition = &protos.QRepPartition{PartitionId: strconv.FormatInt(lastPartition&0xffffffff, 10)} | |||
state.LastPartition = &protos.QRepPartition{ | |||
PartitionId: uuid.New().String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generating a uuid in a workflow is not repeatable and causes state issues, you have to do it as a side-effect, see https://github.com/PeerDB-io/peerdb/blob/main/flow/workflows/cdc_flow.go#L126 for an example side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with reusing q.runUUID which is generated a bit earlier in this code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, i just fixed it. feel free to force overwrite.
…e when pull fails
return totalRecordsFetched, err | ||
} | ||
|
||
func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider renaming the method
QRep based xmin replication has a fault: xmin isn't monotonic & has wraparound issues
Wraparound workaround: grab all records with
0 < age(xmin) <= age(last snapshot xmin)
which'll include some records from before wraparound but so it goesLogic is copied from qrep code