From 81a7c1f8c70d4b5de757e60a7090b603e6340739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 20:50:39 +0000 Subject: [PATCH] cdc: also run SyncRecords in error group Main thread waits on errGroup.Wait sooner, so that if an error happens it'll be the original error, instead of reporting context canceled from SyncRecords after PullRecords error Also causes context cancelation to be two way: if SyncRecords fails, PullRecords will now be canceled --- flow/activities/flowable.go | 64 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2086aa8cfd..b34ccf6f48 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -261,7 +261,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, startTime := time.Now() flowName := input.FlowConnectionConfigs.FlowJobName errGroup.Go(func() error { - return srcConn.PullRecords(ctx, a.CatalogPool, &model.PullRecordsRequest{ + return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, SrcTableIDNameMapping: input.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, @@ -302,38 +302,44 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }, nil } - syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) - if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { - return nil, err - } - syncBatchID += 1 + var syncStartTime time.Time + var res *model.SyncResponse + errGroup.Go(func() error { + syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) + if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { + return err + } + syncBatchID += 1 + + err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, + monitoring.CDCBatchInfo{ + BatchID: syncBatchID, + RowsInBatch: 0, + BatchEndlSN: 0, + StartTime: startTime, + }) + if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) + return err + } - err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName, - monitoring.CDCBatchInfo{ - BatchID: syncBatchID, - RowsInBatch: 0, - BatchEndlSN: 0, - StartTime: startTime, + syncStartTime = time.Now() + res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{ + SyncBatchID: syncBatchID, + Records: recordBatch, + FlowJobName: input.FlowConnectionConfigs.FlowJobName, + TableMappings: input.SyncFlowOptions.TableMappings, + StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) - if err != nil { - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, err - } + if err != nil { + logger.Warn("failed to push records", slog.Any("error", err)) + a.Alerter.LogFlowError(ctx, flowName, err) + return fmt.Errorf("failed to push records: %w", err) + } + res.RelationMessageMapping = input.RelationMessageMapping - syncStartTime := time.Now() - res, err := dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{ - SyncBatchID: syncBatchID, - Records: recordBatch, - FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.SyncFlowOptions.TableMappings, - StagingPath: input.FlowConnectionConfigs.CdcStagingPath, + return nil }) - if err != nil { - logger.Warn("failed to push records", slog.Any("error", err)) - a.Alerter.LogFlowError(ctx, flowName, err) - return nil, fmt.Errorf("failed to push records: %w", err) - } - res.RelationMessageMapping = input.RelationMessageMapping err = errGroup.Wait() if err != nil {