diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index cf4d2e373e..99961c42ba 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -25,7 +25,6 @@ func SyncFlowWorkflow( parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - result := &CDCFlowWorkflowState{} sessionOptions := &workflow.SessionOptions{ CreationTimeout: 5 * time.Minute, ExecutionTimeout: 144 * time.Hour, @@ -70,7 +69,7 @@ func SyncFlowWorkflow( }) sessionSelector.Select(ctx) if sessionError != nil { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, result) + return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) } var waitChan model.TypedReceiveChannel[struct{}] @@ -170,7 +169,12 @@ func SyncFlowWorkflow( var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil { logger.Error("failed to execute schema update at source: ", err) - result.SyncFlowErrors = append(result.SyncFlowErrors, err.Error()) + model.SyncErrorSignal.SignalExternalWorkflow( + ctx, + parent.ID, + parent.RunID, + err.Error(), + ) } else { for i, srcTable := range modifiedSrcTables { dstTable := modifiedDstTables[i] @@ -205,7 +209,7 @@ func SyncFlowWorkflow( return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) } if normalizeSignalError != nil { - return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, result) + return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options) } if mustWait { waitChan.Receive(ctx)