diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2086aa8cfd..b34ccf6f48 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -261,7 +261,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, startTime := time.Now() flowName := input.FlowConnectionConfigs.FlowJobName errGroup.Go(func() error { - return srcConn.PullRecords(ctx, a.CatalogPool, &model.PullRecordsRequest{ + return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, @@ -302,38 +302,44 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }, nil } - syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) - if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { - return nil, err - } - syncBatchID += 1 + var syncStartTime time.Time + var res *model.SyncResponse + errGroup.Go(func() error { + syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) + if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { + return err + } + syncBatchID += 1 + + err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, + monitoring.CDCBatchInfo{ + BatchID: syncBatchID, + RowsInBatch: 0, + BatchEndlSN: 0, + StartTime: startTime, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) + return err + } - err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, - monitoring.CDCBatchInfo{ - BatchID: syncBatchID, - RowsInBatch: 0, - BatchEndlSN: 0, - StartTime: startTime, + syncStartTime = time.Now() + res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{ + SyncBatchID: syncBatchID, + Records: recordBatch, + FlowJobName: input.FlowConnectionConfigs.FlowJobName, + TableMappings: input.SyncFlowOptions.TableMappings, + StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) - if err != nil { - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err - } + if err != nil { + logger.Warn("failed to push records", slog.Any("error", err)) + a.Alerter.LogFlowError(ctx, flowName, err) + return fmt.Errorf("failed to push records: %w", err) + } + res.RelationMessageMapping = input.RelationMessageMapping - syncStartTime := time.Now() - res, err := dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{ - SyncBatchID: syncBatchID, - Records: recordBatch, - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.SyncFlowOptions.TableMappings, - StagingPath: input.FlowConnectionConfigs.CdcStagingPath, + return nil }) - if err != nil { - logger.Warn("failed to push records", slog.Any("error", err)) - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, fmt.Errorf("failed to push records: %w", err) - } - res.RelationMessageMapping = input.RelationMessageMapping err = errGroup.Wait() if err != nil {