Skip to content

Commit

Permalink
did not commit the new file oops
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 8, 2023
1 parent 02c56bc commit ae233c7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
27 changes: 27 additions & 0 deletions flow/utils/signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package util

import (
"github.com/PeerDB-io/peer-flow/shared"
"go.temporal.io/sdk/log"
)

func FlowSignalHandler(activeSignal shared.CDCFlowSignal,
v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal {
if v == shared.ShutdownSignal {
logger.Info("received shutdown signal")
return v
} else if v == shared.PauseSignal {
logger.Info("received pause signal")
if activeSignal == shared.NoopSignal {
logger.Info("workflow was running, pausing it")
return v
}
} else if v == shared.NoopSignal {
logger.Info("received resume signal")
if activeSignal == shared.PauseSignal {
logger.Info("workflow was paused, resuming it")
return v
}
}
return activeSignal
}
4 changes: 2 additions & 2 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func GetChildWorkflowID(
// CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.
type CDCFlowWorkflowResult = CDCFlowWorkflowState

func (w *CDCFlowWorkflowExecution) receiveAndHandleSignal(ctx workflow.Context, state *CDCFlowWorkflowState) {
func (w *CDCFlowWorkflowExecution) receiveAndHandleSignalAsync(ctx workflow.Context, state *CDCFlowWorkflowState) {
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

var signalVal shared.CDCFlowSignal
Expand Down Expand Up @@ -276,7 +276,7 @@ func CDCFlowWorkflowWithConfig(

for {
// check and act on signals before a fresh flow starts.
w.receiveAndHandleSignal(ctx, state)
w.receiveAndHandleSignalAsync(ctx, state)

if state.ActiveSignal == shared.PauseSignal {
startTime := time.Now()
Expand Down
28 changes: 2 additions & 26 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
return nil
}

func (q *QRepFlowExecution) receiveAndHandleSignal(ctx workflow.Context) {
func (q *QRepFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) {
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

var signalVal shared.CDCFlowSignal
Expand Down Expand Up @@ -452,7 +452,7 @@ func QRepFlowWorkflow(

// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
q.receiveAndHandleSignal(ctx)
q.receiveAndHandleSignalAsync(ctx)
if q.activeSignal == shared.PauseSignal {
startTime := time.Now()
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
Expand All @@ -472,30 +472,6 @@ func QRepFlowWorkflow(
return nil
}

// here, we handle signals after the end of the flow because a new workflow does not inherit the signals
// and the chance of missing a signal is much higher if the check is before the time consuming parts run
q.receiveAndHandleSignal(ctx)
if q.activeSignal == shared.ShutdownSignal {
q.logger.Info("terminating workflow - ", config.FlowJobName)
return nil
}
if q.activeSignal == shared.PauseSignal {
startTime := time.Now()
for q.activeSignal == shared.PauseSignal {
err = workflow.Sleep(ctx, 1*time.Minute)
if err != nil {
return err
}
q.logger.Info("mirror has been paused for ", time.Since(startTime))
q.receiveAndHandleSignal(ctx)
}
if q.activeSignal == shared.ShutdownSignal {
// handling going from paused to shutdown
q.logger.Info("terminating workflow - ", config.FlowJobName)
return nil
}
}

// Continue the workflow with new state
return workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state)
}
Expand Down

0 comments on commit ae233c7

Please sign in to comment.