diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 807d690692..1c5f713416 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -187,9 +187,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping := make(map[string]model.NameAndExclude) for _, v := range input.FlowConnectionConfigs.TableMappings { + exclude := make(map[string]struct{}, len(v.Exclude)) + for _, col := range v.Exclude { + exclude[col] = struct{}{} + } tblNameMapping[v.SourceTableIdentifier] = model.NameAndExclude { Name: v.DestinationTableIdentifier, - Exclude: v.Exclude, + Exclude: exclude, } } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index e1007cf28c..6836c9a181 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "fmt" "reflect" - "slices" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -539,7 +538,7 @@ It takes a tuple and a relation message as input and returns func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, rel *protos.RelationMessage, - exclude []string, + exclude map[string]struct{}, ) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map if tuple == nil { @@ -550,10 +549,9 @@ func (p *PostgresCDCSource) convertTupleToMap( items := model.NewRecordItems() unchangedToastColumns := make(map[string]struct{}) - // TODO need to adjust idx by subtracting 1 from idx for each previously excluded column? for idx, col := range tuple.Columns { colName := rel.Columns[idx].Name - if slices.Contains(exclude, colName) { + if _, ok := exclude[colName]; ok { continue } switch col.DataType { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 692ef27320..658af02824 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -786,7 +786,7 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set for k, v := range req.TableNameMapping { tableNameMapping[k] = model.NameAndExclude { Name: v, - Exclude: make([]string, 0), + Exclude: make(map[string]struct{}, 0), } } // Create the replication slot and publication diff --git a/flow/model/model.go b/flow/model/model.go index 2014f55a8e..9ac58f7608 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -14,7 +14,7 @@ import ( type NameAndExclude struct { Name string - Exclude []string + Exclude map[string]struct{} } type PullRecordsRequest struct { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 7aeb15a646..689db44ffc 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -197,16 +197,19 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] normalizedTableName := s.tableNameMapping[srcTableName] - // TODO srcTableName lookup probably incomplete - // TODO lookup would be