-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PAUSE MIRROR support #605
PAUSE MIRROR support #605
Conversation
4bbc07c
to
359fc73
Compare
flow/workflows/cdc_flow.go
Outdated
if v == shared.ShutdownSignal { | ||
w.logger.Info("received shutdown signal") | ||
state.ActiveSignal = v | ||
} else if v == shared.PauseSignal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So PauseSignal triggers flipping pause state? Seems like a source of flakiness compared to having explicit Pause/Unpause signal. But then you'd have to consider what to do when both signals are set. Are signals not able to carry boolean data?
Guess it also doesn't matter since handler manages sequencing, so you handle pause/unpause in order received
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the current logic is a little flaky, but at the same time I can't think of an obvious way it will fail, since we only read a signal at a time and we only maintain one state variable. Temporal Signals are able to carry serializable data, but carrying a boolean value on the Pause signal isn't very clear IMO.
We could also use explicit Pause and Unpause signals and ignore an illegal one [pausing when already paused] but this would require two RPC endpoints exposed. Another consideration is that the UI can prevent illegal state changes by reading the current state of the workflow, but our query layer has no such ability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the minute while it's asleep one can imagine someone sending redundant pause signals
flow/workflows/qrep_flow.go
Outdated
@@ -254,6 +279,33 @@ func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition * | |||
return nil | |||
} | |||
|
|||
func (q *QRepFlowExecution) signalHandler(v shared.CDCFlowSignal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there some way to share this shutdown/pause logic?
Also seems like activeSignal
shouldn't be switched from ShutdownSignal
to PauseSignal
. So should be checking activeSignal to prevent that sort of state transition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data structures associated with CDCFlow
and QRepFlow
are different enough for sharing the logic to be not trivial.
I don't think there's a way to transition from ShutdownSignal
to PauseSignal
, as we only process one signal at a time and the return from shutdown happens immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could have utility function in shared that takes signal value + active signal value & returns new active signal value
ae54c7d
to
72a9ce0
Compare
1f76f3e
to
345f342
Compare
PAUSE MIRROR
supports both CDC and QRep mirrors. This is implemented by signalling the Temporal workflow. A PauseSignal delivered to the workflow should pause it, while a NoopSignal delivered to the workflow should resume it. Signals that don't make sense for the current workflow state are ignored. Currently supported only via the query layer, not from the UI. The following caveats exist for now: