Skip to content

Commit

Permalink
Remove redundancy between cdc state & sync flow options
Browse files Browse the repository at this point in the history
Only pass config & options to StartFlow, removing StartFlowInput

In fact, while we're at it, rename StartFlow to SyncFlow,
the name doesn't really make sense anymore,
& it'll make less sense after #1211
  • Loading branch information
serprex committed Feb 21, 2024
1 parent 37d88f3 commit 8c14909
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 105 deletions.
59 changes: 27 additions & 32 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,24 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigs,
options *protos.SyncFlowOptions,
) (*model.SyncResponse, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName)
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
config := input.FlowConnectionConfigs
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings))
for _, v := range input.SyncFlowOptions.TableMappings {
tblNameMapping := make(map[string]model.NameAndExclude, len(options.TableMappings))
for _, v := range options.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

Expand All @@ -230,45 +232,43 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
return fmt.Sprintf("transferring records for job - %s", flowName)
})
defer shutdown()

batchSize := input.SyncFlowOptions.BatchSize
batchSize := options.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
}

lastOffset, err := dstConn.GetLastOffset(ctx, input.FlowConnectionConfigs.FlowJobName)
lastOffset, err := dstConn.GetLastOffset(ctx, config.FlowJobName)
if err != nil {
return nil, err
}

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if input.RelationMessageMapping == nil {
input.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
if options.RelationMessageMapping == nil {
options.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
}

return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: lastOffset,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
int(options.IdleTimeoutSeconds),
),
TableNameSchemaMapping: input.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
TableNameSchemaMapping: options.TableNameSchemaMapping,
OverridePublicationName: config.PublicationName,
OverrideReplicationSlotName: config.ReplicationSlotName,
RelationMessageMapping: options.RelationMessageMapping,
RecordStream: recordBatch,
})
})
Expand All @@ -293,7 +293,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return &model.SyncResponse{
CurrentSyncBatchID: -1,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
RelationMessageMapping: options.RelationMessageMapping,
}, nil
}

Expand Down Expand Up @@ -322,16 +322,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
res, err = dstConn.SyncRecords(errCtx, &model.SyncRecordsRequest{
SyncBatchID: syncBatchID,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.SyncFlowOptions.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to push records: %w", err)
}
res.RelationMessageMapping = input.RelationMessageMapping
res.RelationMessageMapping = options.RelationMessageMapping

return nil
})
Expand All @@ -352,7 +352,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
flowName,
res.CurrentSyncBatchID,
uint32(numRecords),
lastCheckpoint,
Expand All @@ -362,18 +362,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, err
}

err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
lastCheckpoint,
)
err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
if res.TableNameRowsMapping != nil {
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, flowName,
res.CurrentSyncBatchID, res.TableNameRowsMapping)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,8 +1246,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 1)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 1)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 1)
// we have limited batch size to 6, so atleast 3 syncs needed
assert.GreaterOrEqual(s.t, len(workflowState.SyncFlowStatuses), 3)

Expand Down Expand Up @@ -1292,8 +1292,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.TableMappings, 2)
assert.Len(s.t, workflowState.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.TableNameSchemaMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.SrcTableIdNameMapping, 2)
assert.Len(s.t, workflowState.SyncFlowOptions.TableNameSchemaMapping, 2)
// 3 from first insert of 18 rows in 1 table
// 1 from pre-pause
// 3 from second insert of 18 rows in 2 tables, batch size updated
Expand Down
91 changes: 37 additions & 54 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,33 @@ type CDCFlowWorkflowState struct {
// Needed to support schema changes.
RelationMessageMapping model.RelationMessageMapping
CurrentFlowStatus protos.FlowStatus
// moved from config here, set by SetupFlow
SrcTableIdNameMapping map[uint32]string
TableNameSchemaMapping map[string]*protos.TableSchema
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
// initially copied from config, all changes are made here though
TableMappings []*protos.TableMapping
}

// returns a new empty PeerFlowState
func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState {
tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings))
for _, tableMapping := range cfgTableMappings {
func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflowState {
tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings))
for _, tableMapping := range cfg.TableMappings {
tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping))
}
return &CDCFlowWorkflowState{
Progress: []string{"started"},
// 1 more than the limit of 10
SyncFlowStatuses: make([]*model.SyncResponse, 0, 11),
NormalizeFlowStatuses: nil,
ActiveSignal: shared.NoopSignal,
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
RelationMessageMapping: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
SrcTableIdNameMapping: nil,
TableNameSchemaMapping: nil,
FlowConfigUpdates: nil,
SyncFlowOptions: nil,
TableMappings: tableMappings,
SyncFlowStatuses: make([]*model.SyncResponse, 0, 11),
NormalizeFlowStatuses: nil,
ActiveSignal: shared.NoopSignal,
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdates: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
TableMappings: tableMappings,
},
}
}

Expand Down Expand Up @@ -197,11 +192,11 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
return err
}

for tableID, tableName := range res.SrcTableIdNameMapping {
state.SrcTableIdNameMapping[tableID] = tableName
for tableID, tableName := range res.SyncFlowOptions.SrcTableIdNameMapping {
state.SyncFlowOptions.SrcTableIdNameMapping[tableID] = tableName
}
for tableName, tableSchema := range res.TableNameSchemaMapping {
state.TableNameSchemaMapping[tableName] = tableSchema
for tableName, tableSchema := range res.SyncFlowOptions.TableNameSchemaMapping {
state.SyncFlowOptions.TableNameSchemaMapping[tableName] = tableSchema
}
state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...)
state.SyncFlowOptions.TableMappings = state.TableMappings
Expand All @@ -219,8 +214,16 @@ func CDCFlowWorkflowWithConfig(
if cfg == nil {
return nil, fmt.Errorf("invalid connection configs")
}

if state == nil {
state = NewCDCFlowWorkflowState(cfg.TableMappings)
state = NewCDCFlowWorkflowState(cfg)
}

// when we carry forward state, don't remake the options
if state.SyncFlowOptions == nil {
state.SyncFlowOptions = &protos.SyncFlowOptions{
TableMappings: state.TableMappings,
}
}

w := NewCDCFlowWorkflowExecution(ctx)
Expand Down Expand Up @@ -286,8 +289,8 @@ func CDCFlowWorkflowWithConfig(
if err := setupFlowFuture.Get(setupFlowCtx, &setupFlowOutput); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}
state.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping
state.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping
state.SyncFlowOptions.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping
state.SyncFlowOptions.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

// next part of the setup is to snapshot-initial-copy and setup replication slots.
Expand Down Expand Up @@ -330,14 +333,14 @@ func CDCFlowWorkflowWithConfig(
CurrentName: oldName,
NewName: newName,
// oldName is what was used for the TableNameSchema mapping
TableSchema: state.TableNameSchemaMapping[oldName],
TableSchema: state.SyncFlowOptions.TableNameSchemaMapping[oldName],
})
mapping.DestinationTableIdentifier = newName
// TableNameSchemaMapping is referring to the _resync tables, not the actual names
correctedTableNameSchemaMapping[newName] = state.TableNameSchemaMapping[oldName]
correctedTableNameSchemaMapping[newName] = state.SyncFlowOptions.TableNameSchemaMapping[oldName]
}

state.TableNameSchemaMapping = correctedTableNameSchemaMapping
state.SyncFlowOptions.TableNameSchemaMapping = correctedTableNameSchemaMapping
renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: time.Minute,
Expand All @@ -357,18 +360,6 @@ func CDCFlowWorkflowWithConfig(
}
}

// when we carry forward state, don't remake the options
if state.SyncFlowOptions == nil {
state.SyncFlowOptions = &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
// this means the env variable assignment path is never hit
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
TableMappings: state.TableMappings,
}
}

currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

Expand Down Expand Up @@ -492,16 +483,8 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
})

state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: cfg,
SyncFlowOptions: state.SyncFlowOptions,
RelationMessageMapping: state.SyncFlowOptions.RelationMessageMapping,
SrcTableIdNameMapping: state.SyncFlowOptions.SrcTableIdNameMapping,
TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping,
}
w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName))
fStartFlow := workflow.ExecuteActivity(startFlowCtx, flowable.StartFlow, startFlowInput)
fStartFlow := workflow.ExecuteActivity(startFlowCtx, flowable.StartFlow, cfg, state.SyncFlowOptions)

var syncDone bool
var normalizeSignalError error
Expand All @@ -515,7 +498,7 @@ func CDCFlowWorkflowWithConfig(
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else if childSyncFlowRes != nil {
state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes)
state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping
state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping
totalRecordsSynced += childSyncFlowRes.NumRecordsSynced
w.logger.Info("Total records synced: ",
slog.Int64("totalRecordsSynced", totalRecordsSynced))
Expand Down Expand Up @@ -550,15 +533,15 @@ func CDCFlowWorkflowWithConfig(
} else {
for i, srcTable := range modifiedSrcTables {
dstTable := modifiedDstTables[i]
state.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
state.SyncFlowOptions.TableNameSchemaMapping[dstTable] = getModifiedSchemaRes.TableNameSchemaMapping[srcTable]
}
}
}

signalFuture := childNormalizeFlowFuture.SignalChildWorkflow(ctx, shared.NormalizeSyncSignalName, model.NormalizeSignal{
Done: false,
SyncBatchID: childSyncFlowRes.CurrentSyncBatchID,
TableNameSchemaMapping: state.TableNameSchemaMapping,
TableNameSchemaMapping: state.SyncFlowOptions.TableNameSchemaMapping,
})
normalizeSignalError = signalFuture.Get(ctx, nil)
} else {
Expand Down
15 changes: 0 additions & 15 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,6 @@ message SyncFlowOptions {
repeated TableMapping table_mappings = 6;
}

// deprecated, unused
message LastSyncState {
int64 checkpoint = 1;
}

message StartFlowInput {
// deprecated, unused
LastSyncState last_sync_state = 1;
FlowConnectionConfigs flow_connection_configs = 2;
SyncFlowOptions sync_flow_options = 3;
map<uint32, RelationMessage> relation_message_mapping = 4;
map<uint32, string> src_table_id_name_mapping = 5;
map<string, TableSchema> table_name_schema_mapping = 6;
}

message StartNormalizeInput {
FlowConnectionConfigs flow_connection_configs = 1;
map<string, TableSchema> table_name_schema_mapping = 2;
Expand Down

0 comments on commit 8c14909

Please sign in to comment.