Skip to content

Commit

Permalink
Fix nil relationMessageMapping not updating original input field (#1340)
Browse files Browse the repository at this point in the history
Introduced hastily in #1335,
`state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping` needs to happen before constructing arguments for activity
  • Loading branch information
serprex authored Feb 20, 2024
1 parent 7228412 commit 37d88f3
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 11 deletions.
4 changes: 4 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
if input.RelationMessageMapping == nil {
input.RelationMessageMapping = make(map[uint32]*protos.RelationMessage)
}

return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
Expand Down
12 changes: 2 additions & 10 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,7 @@ func (p *PostgresCDCSource) processMessage(
p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

if p.relationMessageMapping == nil {
p.relationMessageMapping = map[uint32]*protos.RelationMessage{
msg.RelationID: convertRelationMessageToProto(msg),
}
} else if p.relationMessageMapping[msg.RelationID] == nil {
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
Expand Down Expand Up @@ -855,11 +851,7 @@ func (p *PostgresCDCSource) processRelationMessage(
}
}

if p.relationMessageMapping == nil {
p.relationMessageMapping = map[uint32]*protos.RelationMessage{currRel.RelationId: currRel}
} else {
p.relationMessageMapping[currRel.RelationId] = currRel
}
p.relationMessageMapping[currRel.RelationId] = currRel
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckpointID: int64(lsn),
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
})

state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping
startFlowInput := &protos.StartFlowInput{
FlowConnectionConfigs: cfg,
SyncFlowOptions: state.SyncFlowOptions,
Expand All @@ -501,7 +502,6 @@ func CDCFlowWorkflowWithConfig(
}
w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName))
fStartFlow := workflow.ExecuteActivity(startFlowCtx, flowable.StartFlow, startFlowInput)
state.SyncFlowOptions.RelationMessageMapping = state.RelationMessageMapping

var syncDone bool
var normalizeSignalError error
Expand Down

0 comments on commit 37d88f3

Please sign in to comment.