diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d1c49bb663..4216440476 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -184,9 +184,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, "flowName": input.FlowConnectionConfigs.FlowJobName, }).Info("pulling records...") - tblNameMapping := make(map[string]string) + tblNameMapping := make(map[string]model.NameAndExclude) for _, v := range input.FlowConnectionConfigs.TableMappings { - tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier + tblNameMapping[v.SourceTableIdentifier] = model.NameAndExclude { + Name: v.DestinationTableIdentifier, + Exclude: v.Exclude, + } } idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 533b4ee0c5..0a18c4e337 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "reflect" + "slices" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -25,7 +26,7 @@ type PostgresCDCSource struct { ctx context.Context replPool *pgxpool.Pool SrcTableIDNameMapping map[uint32]string - TableNameMapping map[string]string + TableNameMapping map[string]model.NameAndExclude slot string publication string relationMessageMapping model.RelationMessageMapping @@ -44,7 +45,7 @@ type PostgresCDCConfig struct { Slot string Publication string SrcTableIDNameMapping map[uint32]string - TableNameMapping map[string]string + TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping } @@ -439,7 +440,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.Tuple, rel) + items, _, err := p.convertTupleToMap(msg.Tuple, rel, p.TableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -447,7 +448,7 @@ func (p *PostgresCDCSource) processInsertMessage( return &model.InsertRecord{ CheckPointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName], + DestinationTableName: p.TableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -473,12 +474,12 @@ func (p *PostgresCDCSource) processUpdateMessage( } // create empty map of string to interface{} - oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel) + oldItems, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting old tuple to map: %w", err) } - newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel) + newItems, unchangedToastColumns, err := p.convertTupleToMap(msg.NewTuple, rel, p.TableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting new tuple to map: %w", err) } @@ -487,7 +488,7 @@ func (p *PostgresCDCSource) processUpdateMessage( CheckPointID: int64(lsn), OldItems: oldItems, NewItems: newItems, - DestinationTableName: p.TableNameMapping[tableName], + DestinationTableName: p.TableNameMapping[tableName].Name, SourceTableName: tableName, UnchangedToastColumns: unchangedToastColumns, }, nil @@ -514,7 +515,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // create empty map of string to interface{} - items, _, err := p.convertTupleToMap(msg.OldTuple, rel) + items, _, err := p.convertTupleToMap(msg.OldTuple, rel, p.TableNameMapping[tableName].Exclude) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } @@ -522,7 +523,7 @@ func (p *PostgresCDCSource) processDeleteMessage( return &model.DeleteRecord{ CheckPointID: int64(lsn), Items: items, - DestinationTableName: p.TableNameMapping[tableName], + DestinationTableName: p.TableNameMapping[tableName].Name, SourceTableName: tableName, }, nil } @@ -537,6 +538,7 @@ It takes a tuple and a relation message as input and returns func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, rel *protos.RelationMessage, + exclude []string, ) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map if tuple == nil { @@ -547,8 +549,12 @@ func (p *PostgresCDCSource) convertTupleToMap( items := model.NewRecordItems() unchangedToastColumns := make(map[string]struct{}) + // TODO need to adjust idx by subtracting 1 from idx for each previously excluded column? for idx, col := range tuple.Columns { colName := rel.Columns[idx].Name + if slices.Contains(exclude, colName) { + continue + } switch col.DataType { case 'n': // null val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil} @@ -669,7 +675,7 @@ func (p *PostgresCDCSource) processRelationMessage( // set it to the source table for now, so we can update the schema on the source side // then at the Workflow level we set it t SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId], - DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]], + DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]].Name, AddedColumns: make([]*protos.DeltaAddedColumn, 0), } for _, column := range currRel.Columns { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index c9bed66d2f..ded0c16670 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" @@ -207,7 +208,7 @@ func (c *PostgresConnector) createSlotAndPublication( s *SlotCheckResult, slot string, publication string, - tableNameMapping map[string]string, + tableNameMapping map[string]model.NameAndExclude, doInitialCopy bool, ) error { /* diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 6c06cb60c4..4c734a7e0f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -775,9 +775,16 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set return fmt.Errorf("error checking for replication slot and publication: %w", err) } + tableNameMapping := make(map[string]model.NameAndExclude); + for k, v := range req.TableNameMapping { + tableNameMapping[k] = model.NameAndExclude { + Name: v, + Exclude: make([]string, 0), + } + } // Create the replication slot and publication err = c.createSlotAndPublication(signal, exists, - slotName, publicationName, req.TableNameMapping, req.DoInitialCopy) + slotName, publicationName, tableNameMapping, req.DoInitialCopy) if err != nil { return fmt.Errorf("error creating replication slot and publication: %w", err) } diff --git a/flow/model/model.go b/flow/model/model.go index 2789dd9625..2014f55a8e 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -12,6 +12,11 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) +type NameAndExclude struct { + Name string + Exclude []string +} + type PullRecordsRequest struct { // FlowJobName is the name of the flow job. FlowJobName string @@ -24,7 +29,7 @@ type PullRecordsRequest struct { //relId to name Mapping SrcTableIDNameMapping map[uint32]string // source to destination table name mapping - TableNameMapping map[string]string + TableNameMapping map[string]NameAndExclude // tablename to schema mapping TableNameSchemaMapping map[string]*protos.TableSchema // override publication name diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 4702fef8e8..7aeb15a646 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -197,7 +197,20 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] normalizedTableName := s.tableNameMapping[srcTableName] + // TODO srcTableName lookup probably incomplete + // TODO lookup would be ", tableSchema) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index fab06d3ce1..e6628ceed5 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -3,6 +3,8 @@ package peerflow import ( "fmt" "regexp" + "slices" + "strings" "time" "github.com/PeerDB-io/peer-flow/concurrency" @@ -139,8 +141,25 @@ func (s *SnapshotFlowExecution) cloneTable( }).Errorf("unable to parse source table") return fmt.Errorf("unable to parse source table: %w", err) } - query := fmt.Sprintf("SELECT * FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}", - parsedSrcTable.String(), partitionCol) + from := "*" + if len(mapping.Exclude) != 0 { + var sb strings.Builder + // TODO what's the correct key? + for _, col := range s.config.TableNameSchemaMapping[srcName].Columns { + // TODO optimize lookup + if !slices.Contains(mapping.Exclude, col) { + if sb.Len() != 0 { + sb.WriteString(", ") + } + // TODO escape + sb.WriteString(col) + } + } + from = sb.String() + } + + query := fmt.Sprintf("SELECT %s FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}", + from, parsedSrcTable.String(), partitionCol) numWorkers := uint32(8) if s.config.SnapshotMaxParallelWorkers > 0 {