Skip to content

Commit

Permalink
fix up result/options confusion
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 23, 2024
1 parent ab44726 commit d56fb3e
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func SyncFlowWorkflow(
parent := workflow.GetInfo(ctx).ParentWorkflowExecution
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

result := &CDCFlowWorkflowState{}
sessionOptions := &workflow.SessionOptions{
CreationTimeout: 5 * time.Minute,
ExecutionTimeout: 144 * time.Hour,
Expand Down Expand Up @@ -70,7 +69,7 @@ func SyncFlowWorkflow(
})
sessionSelector.Select(ctx)
if sessionError != nil {
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, result)
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}

var waitChan model.TypedReceiveChannel[struct{}]
Expand Down Expand Up @@ -170,7 +169,12 @@ func SyncFlowWorkflow(
var getModifiedSchemaRes *protos.GetTableSchemaBatchOutput
if err := getModifiedSchemaFuture.Get(ctx, &getModifiedSchemaRes); err != nil {
logger.Error("failed to execute schema update at source: ", err)
result.SyncFlowErrors = append(result.SyncFlowErrors, err.Error())
model.SyncErrorSignal.SignalExternalWorkflow(
ctx,
parent.ID,
parent.RunID,
err.Error(),
)
} else {
for i, srcTable := range modifiedSrcTables {
dstTable := modifiedDstTables[i]
Expand Down Expand Up @@ -205,7 +209,7 @@ func SyncFlowWorkflow(
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}
if normalizeSignalError != nil {
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, result)
return workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, config, options)
}
if mustWait {
waitChan.Receive(ctx)
Expand Down

0 comments on commit d56fb3e

Please sign in to comment.