diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 06f6813fa8..33f4f6c002 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push") syncResponse := &model.SyncResponse{} syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping - syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas() + syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings) return syncResponse, nil } @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, if err != nil { return nil, err } - res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas() + res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings) res.RelationMessageMapping = <-recordBatch.RelationMessageMapping pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) diff --git a/flow/model/model.go b/flow/model/model.go index 4e285c85d4..02e85327e0 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "slices" "sync" "time" @@ -385,9 +386,28 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool { return isEmpty } -func (r *CDCRecordStream) WaitForSchemaDeltas() []*protos.TableSchemaDelta { +func (r *CDCRecordStream) WaitForSchemaDeltas(tableMappings []*protos.TableMapping) []*protos.TableSchemaDelta { schemaDeltas := make([]*protos.TableSchemaDelta, 0) +schemaLoop: for delta := range r.SchemaDeltas { + for _, tm := range tableMappings { + if delta.SrcTableName == tm.SourceTableIdentifier && delta.DstTableName == tm.DestinationTableIdentifier && len(tm.Exclude) != 0 { + added := make([]*protos.DeltaAddedColumn, len(delta.AddedColumns)) + for _, column := range delta.AddedColumns { + if !slices.Contains(tm.Exclude, column.ColumnName) { + added = append(added, column) + } + } + if len(added) != 0 { + schemaDeltas = append(schemaDeltas, &protos.TableSchemaDelta{ + SrcTableName: delta.SrcTableName, + DstTableName: delta.DstTableName, + AddedColumns: added, + }) + } + continue schemaLoop + } + } schemaDeltas = append(schemaDeltas, delta) } return schemaDeltas