Skip to content

Commit

Permalink
cdc: also run SyncRecords in error group (#1294)
Browse files Browse the repository at this point in the history
Main thread waits on errGroup.Wait sooner,
so that if an error happens it'll be the original error,
instead of reporting context canceled from SyncRecords after PullRecords error

Also causes context cancelation to be two way:
if SyncRecords fails, PullRecords will now be canceled
  • Loading branch information
serprex authored Feb 14, 2024
1 parent 758727a commit 5cac242
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName
errGroup.Go(func() error {
return srcConn.PullRecords(ctx, a.CatalogPool, &model.PullRecordsRequest{
return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
Expand Down Expand Up @@ -302,38 +302,44 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}, nil
}

syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}
syncBatchID += 1
var syncStartTime time.Time
var res *model.SyncResponse
errGroup.Go(func() error {
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUB {
return err
}
syncBatchID += 1

err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return err
}

err = monitoring.AddCDCBatchForFlow(errCtx, a.CatalogPool, flowName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
syncStartTime = time.Now()
res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
return nil
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping

err = errGroup.Wait()
if err != nil {
Expand Down

0 comments on commit 5cac242

Please sign in to comment.