Skip to content

Commit

Permalink
fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 26, 2023
1 parent 29b580b commit 6629834
Show file tree
Hide file tree
Showing 14 changed files with 993 additions and 993 deletions.
13 changes: 7 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
Schemas: schemas,
AllowTableAdditions: input.FlowConnectionConfigs.AllowTableAdditions,
})
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
Expand Down Expand Up @@ -645,11 +646,11 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

// GetTableSchema returns the schema of a table.
// PopulateTableMappingFromSchemas sets up the TableNameMapping from SchemaMapping for MappingType SCHEMA
func (a *FlowableActivity) PopulateTableMappingFromSchemas(
ctx context.Context,
config *protos.PopulateTableMappingFromSchemasInput,
) (*protos.PopulateTableMappingFromSchemasOutput, error) {
config *protos.ListTablesInSchemasInput,
) (*protos.ListTablesInSchemasOutput, error) {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
Expand All @@ -658,7 +659,7 @@ func (a *FlowableActivity) PopulateTableMappingFromSchemas(

srcSchemas := maps.Keys(config.SchemaMapping)

schemaTablesMapping, err := srcConn.GetAllTablesInSchemas(srcSchemas)
schemaTablesMapping, err := srcConn.ListTablesInSchemas(srcSchemas)
if err != nil {
return nil, fmt.Errorf("failed to get schemaTablesMapping: %w", err)
}
Expand All @@ -670,8 +671,8 @@ func (a *FlowableActivity) PopulateTableMappingFromSchemas(
}
}

return &protos.PopulateTableMappingFromSchemasOutput{
TableMapping: schemaTablesMappingProto,
return &protos.ListTablesInSchemasOutput{
SchemaToTables: schemaTablesMappingProto,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type CDCPullConnector interface {
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// GetAllTablesInSchemas... gets all the tables in multiple schemas
GetAllTablesInSchemas(schemas []string) (map[string][]string, error)
// ListTablesInSchemas... gets all the tables in multiple schemas
ListTablesInSchemas(schemas []string) (map[string][]string, error)
}

type CDCSyncConnector interface {
Expand Down
29 changes: 20 additions & 9 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type postgresCDCSource struct {
typeMap *pgtype.Map
startLSN pglogrepl.LSN
schemas []string
allowTableAdditions bool
}

type postgresCDCConfig struct {
Expand All @@ -42,6 +43,7 @@ type postgresCDCConfig struct {
tableNameMapping map[string]string
relationMessageMapping model.RelationMessageMapping
schemas []string
allowTableAdditions bool
}

// Create a new PostgresCDCSource
Expand All @@ -56,6 +58,7 @@ func NewPostgresCDCSource(cdcConfig *postgresCDCConfig) (*postgresCDCSource, err
relationMessageMapping: cdcConfig.relationMessageMapping,
typeMap: pgtype.NewMap(),
schemas: cdcConfig.schemas,
allowTableAdditions: cdcConfig.allowTableAdditions,
}, nil
}

Expand Down Expand Up @@ -335,12 +338,17 @@ func (p *postgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
if ok || slices.Contains(p.schemas, msg.Namespace) {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
if !ok {
// stop processing, return this to ensure new table is created
return &model.AddedTableRecord{
CheckPointID: int64(xld.WALStart),
TableName: msg.RelationName,
SrcSchema: msg.Namespace,
}, nil
if p.allowTableAdditions {
// stop processing, return this to ensure new table is created
return &model.AddedTableRecord{
CheckPointID: int64(xld.WALStart),
TableName: msg.RelationName,
SrcSchema: msg.Namespace,
}, nil
} else {
// the table is new, but we aren't going to add it to the destination.
delete(p.relationMessageMapping, msg.RelationID)
}
}
}
} else {
Expand Down Expand Up @@ -371,7 +379,8 @@ func (p *postgresCDCSource) processInsertMessage(

rel, ok := p.relationMessageMapping[msg.RelationID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
log.Warnf("unknown relation id: %d", msg.RelationID)
return nil, nil
}

// create empty map of string to interface{}
Expand Down Expand Up @@ -404,7 +413,8 @@ func (p *postgresCDCSource) processUpdateMessage(

rel, ok := p.relationMessageMapping[msg.RelationID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
log.Warnf("unknown relation id: %d", msg.RelationID)
return nil, nil
}

// create empty map of string to interface{}
Expand Down Expand Up @@ -443,7 +453,8 @@ func (p *postgresCDCSource) processDeleteMessage(

rel, ok := p.relationMessageMapping[msg.RelationID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
log.Warnf("unknown relation id: %d", msg.RelationID)
return nil, nil
}

// create empty map of string to interface{}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
tableNameMapping: req.TableNameMapping,
relationMessageMapping: req.RelationMessageMapping,
schemas: req.Schemas,
allowTableAdditions: req.AllowTableAdditions,
})
if err != nil {
return nil, fmt.Errorf("failed to create cdc source: %w", err)
Expand Down Expand Up @@ -882,7 +883,7 @@ func (c *PostgresConnector) recordHeartbeatWithRecover(details ...interface{}) {
activity.RecordHeartbeat(c.ctx, details...)
}

func (c *PostgresConnector) GetAllTablesInSchemas(schemas []string) (map[string][]string, error) {
func (c *PostgresConnector) ListTablesInSchemas(schemas []string) (map[string][]string, error) {
schemaTablesMap := make(map[string][]string)

for _, schema := range schemas {
Expand Down
Loading

0 comments on commit 6629834

Please sign in to comment.