diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 28a3227d42..a8fe5f0091 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -5,45 +5,13 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" ) -type NormalizeFlowState struct { - CDCFlowName string - Progress []string -} - -type NormalizeFlowExecution struct { - NormalizeFlowState - executionID string - logger log.Logger -} - -func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution { - return &NormalizeFlowExecution{ - NormalizeFlowState: *state, - executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), - } -} - func NormalizeFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, -) model.NormalizeFlowResponse { - s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{ - CDCFlowName: config.FlowJobName, - Progress: []string{}, - }) - - return s.executeNormalizeFlow(ctx, config) -} - -func (s *NormalizeFlowExecution) executeNormalizeFlow( - ctx workflow.Context, - config *protos.FlowConnectionConfigs, -) model.NormalizeFlowResponse { - s.logger.Info("executing normalize flow - ", s.CDCFlowName) +) (*model.NormalizeFlowResponse, error) { + logger := workflow.GetLogger(ctx) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, @@ -58,6 +26,7 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( needSync := true for { if needSync { + logger.Info("executing normalize - ", config.FlowJobName) startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, } @@ -91,8 +60,8 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow( } } - return model.NormalizeFlowResponse{ + return &model.NormalizeFlowResponse{ Results: results, Errors: errors, - } + }, nil }