From ee04651b44c7c174ca977f379f75bd846a00b0f5 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Tue, 14 Nov 2023 21:33:40 +0000 Subject: [PATCH] Handle column exclusion for schema deltas --- flow/activities/flowable.go | 4 ++-- flow/model/model.go | 22 +++++++++++++++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 76b55c0e8e..9692f51fca 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push") syncResponse := &model.SyncResponse{} syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping - syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas() + syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings) return syncResponse, nil } @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, if err != nil { return nil, err } - res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas() + res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings) res.RelationMessageMapping = <-recordBatch.RelationMessageMapping pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) diff --git a/flow/model/model.go b/flow/model/model.go index 5d7c627f60..905dc27209 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "slices" "sync" "time" @@ -376,9 +377,28 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool { return isEmpty } -func (r *CDCRecordStream) WaitForSchemaDeltas() []*protos.TableSchemaDelta { +func (r *CDCRecordStream) WaitForSchemaDeltas(tableMappings []*protos.TableMapping) []*protos.TableSchemaDelta { schemaDeltas := make([]*protos.TableSchemaDelta, 0) +schemaLoop: for delta := range r.SchemaDeltas { + for _, tm := range tableMappings { + if delta.SrcTableName == tm.SourceTableIdentifier && delta.DstTableName == tm.DestinationTableIdentifier && len(tm.Exclude) != 0 { + added := make([]*protos.DeltaAddedColumn, len(delta.AddedColumns)) + for _, column := range delta.AddedColumns { + if !slices.Contains(tm.Exclude, column.ColumnName) { + added = append(added, column) + } + } + if len(added) != 0 { + schemaDeltas = append(schemaDeltas, &protos.TableSchemaDelta{ + SrcTableName: delta.SrcTableName, + DstTableName: delta.DstTableName, + AddedColumns: added, + }) + } + continue schemaLoop + } + } schemaDeltas = append(schemaDeltas, delta) } return schemaDeltas