Skip to content

Commit

Permalink
StartFlow: pass errCtx to dstConn to cancel on srcConn error (#1249)
Browse files Browse the repository at this point in the history
Also extend e2e workflow error check to print stacktrace for panics
  • Loading branch information
serprex authored Feb 11, 2024
1 parent 3b91bce commit c5fbb7e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
14 changes: 7 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
Expand All @@ -193,14 +192,15 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
Expand Down Expand Up @@ -257,13 +257,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}, nil
}

syncBatchID, err := dstConn.GetLastSyncBatchID(ctx, flowName)
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}
syncBatchID += 1

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName,
err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID,
RowsInBatch: 0,
Expand All @@ -276,7 +276,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(ctx, &model.SyncRecordsRequest{
res, err := dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
Expand Down
6 changes: 5 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,13 @@ func EnvWaitForEqualTablesWithNames(
func RequireEnvCanceled(t *testing.T, env *testsuite.TestWorkflowEnvironment) {
t.Helper()
err := env.GetWorkflowError()
var panicErr *temporal.PanicError
var canceledErr *temporal.CanceledError
if err == nil {
t.Fatal("Expected workflow to be canceled, not completed")
} else if _, ok := errors.Unwrap(err).(*temporal.CanceledError); !ok {
} else if errors.As(err, &panicErr) {
t.Fatalf("Workflow panic: %s %s", panicErr.Error(), panicErr.StackTrace())
} else if !errors.As(err, &canceledErr) {
t.Fatalf("Expected workflow to be canceled, not %v", err)
}
}
Expand Down

0 comments on commit c5fbb7e

Please sign in to comment.