diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 84f6ba73cb..0662ce6c3c 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -6,36 +6,12 @@ import ( "time" "github.com/google/uuid" - "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) -type XminFlowExecution struct { - config *protos.QRepConfig - flowExecutionID string - logger log.Logger - runUUID string - // being tracked for future workflow signalling - childPartitionWorkflows []workflow.ChildWorkflowFuture - // Current signalled state of the peer flow. - activeSignal shared.CDCFlowSignal -} - -// NewXminFlowExecution creates a new instance of XminFlowExecution. -func NewXminFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUUID string) *XminFlowExecution { - return &XminFlowExecution{ - config: config, - flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), - runUUID: runUUID, - childPartitionWorkflows: nil, - activeSignal: shared.NoopSignal, - } -} - func XminFlowWorkflow( ctx workflow.Context, config *protos.QRepConfig, @@ -57,9 +33,8 @@ func XminFlowWorkflow( return fmt.Errorf("failed to get run uuid: %w", err) } - x := NewXminFlowExecution(ctx, config, runUUID) - q := NewQRepFlowExecution(ctx, config, runUUID) + err = q.SetupWatermarkTableOnDestination(ctx) if err != nil { return fmt.Errorf("failed to setup watermark table: %w", err) @@ -69,7 +44,7 @@ func XminFlowWorkflow( if err != nil { return fmt.Errorf("failed to setup metadata tables: %w", err) } - x.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) + q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) err = q.handleTableCreationForResync(ctx, state) if err != nil { @@ -84,9 +59,9 @@ func XminFlowWorkflow( err = workflow.ExecuteActivity( replicateXminPartitionCtx, flowable.ReplicateXminPartition, - x.config, + q.config, state.LastPartition, - x.runUUID, + q.runUUID, ).Get(ctx, &lastPartition) if err != nil { return fmt.Errorf("xmin replication failed: %w", err) @@ -97,7 +72,7 @@ func XminFlowWorkflow( } if config.InitialCopyOnly { - x.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) + q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) return nil } @@ -107,7 +82,7 @@ func XminFlowWorkflow( } state.LastPartition = &protos.QRepPartition{ - PartitionId: x.runUUID, + PartitionId: q.runUUID, Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } @@ -119,17 +94,17 @@ func XminFlowWorkflow( // and the chance of missing a signal is much higher if the check is before the time consuming parts run signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) q.receiveAndHandleSignalAsync(signalChan) - if x.activeSignal == shared.PauseSignal { + if q.activeSignal == shared.PauseSignal { startTime := time.Now() state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED var signalVal shared.CDCFlowSignal - for x.activeSignal == shared.PauseSignal { - x.logger.Info("mirror has been paused for ", time.Since(startTime)) + for q.activeSignal == shared.PauseSignal { + q.logger.Info("mirror has been paused for ", time.Since(startTime)) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { - x.activeSignal = shared.FlowSignalHandler(x.activeSignal, signalVal, x.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } }