diff --git a/flow/utils/signals.go b/flow/utils/signals.go new file mode 100644 index 0000000000..94eddd6289 --- /dev/null +++ b/flow/utils/signals.go @@ -0,0 +1,27 @@ +package util + +import ( + "github.com/PeerDB-io/peer-flow/shared" + "go.temporal.io/sdk/log" +) + +func FlowSignalHandler(activeSignal shared.CDCFlowSignal, + v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal { + if v == shared.ShutdownSignal { + logger.Info("received shutdown signal") + return v + } else if v == shared.PauseSignal { + logger.Info("received pause signal") + if activeSignal == shared.NoopSignal { + logger.Info("workflow was running, pausing it") + return v + } + } else if v == shared.NoopSignal { + logger.Info("received resume signal") + if activeSignal == shared.PauseSignal { + logger.Info("workflow was paused, resuming it") + return v + } + } + return activeSignal +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 1ea18dd9eb..59d58c8803 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -148,7 +148,7 @@ func GetChildWorkflowID( // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. type CDCFlowWorkflowResult = CDCFlowWorkflowState -func (w *CDCFlowWorkflowExecution) receiveAndHandleSignal(ctx workflow.Context, state *CDCFlowWorkflowState) { +func (w *CDCFlowWorkflowExecution) receiveAndHandleSignalAsync(ctx workflow.Context, state *CDCFlowWorkflowState) { signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -276,7 +276,7 @@ func CDCFlowWorkflowWithConfig( for { // check and act on signals before a fresh flow starts. - w.receiveAndHandleSignal(ctx, state) + w.receiveAndHandleSignalAsync(ctx, state) if state.ActiveSignal == shared.PauseSignal { startTime := time.Now() diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index b195b55b79..7ff248ec55 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -343,7 +343,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta return nil } -func (q *QRepFlowExecution) receiveAndHandleSignal(ctx workflow.Context) { +func (q *QRepFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) { signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) var signalVal shared.CDCFlowSignal @@ -452,7 +452,7 @@ func QRepFlowWorkflow( // here, we handle signals after the end of the flow because a new workflow does not inherit the signals // and the chance of missing a signal is much higher if the check is before the time consuming parts run - q.receiveAndHandleSignal(ctx) + q.receiveAndHandleSignalAsync(ctx) if q.activeSignal == shared.PauseSignal { startTime := time.Now() signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) @@ -472,30 +472,6 @@ func QRepFlowWorkflow( return nil } - // here, we handle signals after the end of the flow because a new workflow does not inherit the signals - // and the chance of missing a signal is much higher if the check is before the time consuming parts run - q.receiveAndHandleSignal(ctx) - if q.activeSignal == shared.ShutdownSignal { - q.logger.Info("terminating workflow - ", config.FlowJobName) - return nil - } - if q.activeSignal == shared.PauseSignal { - startTime := time.Now() - for q.activeSignal == shared.PauseSignal { - err = workflow.Sleep(ctx, 1*time.Minute) - if err != nil { - return err - } - q.logger.Info("mirror has been paused for ", time.Since(startTime)) - q.receiveAndHandleSignal(ctx) - } - if q.activeSignal == shared.ShutdownSignal { - // handling going from paused to shutdown - q.logger.Info("terminating workflow - ", config.FlowJobName) - return nil - } - } - // Continue the workflow with new state return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state) }