Skip to content

Commit

Permalink
Remove redundancy between cdc state & sync flow options
Browse files Browse the repository at this point in the history
Only pass config & options to StartFlow, removing StartFlowInput

In fact, while we're at it, rename StartFlow to SyncFlow,
the name doesn't really make sense anymore,
& it'll make less sense after #1211
  • Loading branch information
serprex committed Feb 21, 2024
1 parent 37d88f3 commit 2f0597c
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 116 deletions.
59 changes: 27 additions & 32 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,24 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) (*model.SyncResponse, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
config := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings))
for _, v := range input.SyncFlowOptions.TableMappings {
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

Expand All @@ -230,45 +232,43 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
return fmt.Sprintf("transferring records for job - %s", flowName)
})
defer shutdown()

batchSize := input.SyncFlowOptions.BatchSize
batchSize := options.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
}

lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName)
lastOffset, err := dstConn.GetLastOffset(ctx, config.FlowJobName)
if err != nil {
return nil, err
}

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if input.RelationMessageMapping == nil {
input.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
if options.RelationMessageMapping == nil {
options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
}

return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: lastOffset,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
int(options.IdleTimeoutSeconds),
),
TableNameSchemaMapping: input.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
TableNameSchemaMapping: options.TableNameSchemaMapping,
OverridePublicationName: config.PublicationName,
OverrideReplicationSlotName: config.ReplicationSlotName,
RelationMessageMapping: options.RelationMessageMapping,
RecordStream: recordBatch,
})
})
Expand All @@ -293,7 +293,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
RelationMessageMapping: options.RelationMessageMapping,
}, nil
}

Expand Down Expand Up @@ -322,16 +322,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping
res.RelationMessageMapping = options.RelationMessageMapping

return nil
})
Expand All @@ -352,7 +352,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
flowName,
res.CurrentSyncBatchID,
uint32(numRecords),
lastCheckpoint,
Expand All @@ -362,18 +362,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, err
}

err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
lastCheckpoint,
)
err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
if res.TableNameRowsMapping != nil {
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, flowName,
res.CurrentSyncBatchID, res.TableNameRowsMapping)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *FlowRequestHandler) CDCFlowStatus(
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.TableMappings
config.TableMappings = state.SyncFlowOptions.TableMappings
}

var initialCopyStatus *protos.SnapshotStatus
Expand Down
12 changes: 6 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,9 +1245,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
workflowState := getWorkFlowState()
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 1)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

Expand Down Expand Up @@ -1291,9 +1291,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
workflowState = getWorkFlowState()
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
Expand Down
Loading

0 comments on commit 2f0597c

Please sign in to comment.