Skip to content

Commit

Permalink
Handle column exclusion for schema deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 16, 2023
1 parent 92741f4 commit 8888b2e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
return syncResponse, nil
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down
25 changes: 24 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -385,9 +386,31 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool {
return isEmpty
}

func (r *CDCRecordStream) WaitForSchemaDeltas() []*protos.TableSchemaDelta {
func (r *CDCRecordStream) WaitForSchemaDeltas(tableMappings []*protos.TableMapping) []*protos.TableSchemaDelta {
schemaDeltas := make([]*protos.TableSchemaDelta, 0)
schemaLoop:
for delta := range r.SchemaDeltas {
for _, tm := range tableMappings {
if delta.SrcTableName == tm.SourceTableIdentifier && delta.DstTableName == tm.DestinationTableIdentifier {
if len(tm.Exclude) == 0 {
break
}
added := make([]*protos.DeltaAddedColumn, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if !slices.Contains(tm.Exclude, column.ColumnName) {
added = append(added, column)
}
}
if len(added) != 0 {
schemaDeltas = append(schemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
continue schemaLoop
}
}
schemaDeltas = append(schemaDeltas, delta)
}
return schemaDeltas
Expand Down

0 comments on commit 8888b2e

Please sign in to comment.