Skip to content

Commit

Permalink
ignore tables in raw table if no schema known for table (#2317)
Browse files Browse the repository at this point in the history
closes #2312 

however this can cause data to be missed if the schema is missing incorrectly
(as opposed to table being dropped)

---------

Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
serprex and heavycrystal authored Dec 4, 2024
1 parent a9d33fd commit 564ee93
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(
ctx context.Context,
flowJobName string,
batchId int64,
tableToSchema map[string]*protos.TableSchema,
) ([]string, error) {
rawTableIdentifier := getRawTableIdentifier(flowJobName)

Expand All @@ -261,7 +262,11 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(
if err := rows.Scan(&result); err != nil {
return nil, fmt.Errorf("failed to read row: %w", err)
}
destinationTableNames = append(destinationTableNames, result.String)
if _, ok := tableToSchema[result.String]; ok {
destinationTableNames = append(destinationTableNames, result.String)
} else {
c.logger.Warn("table not found in table to schema mapping", "table", result.String)
}
}

if err := rows.Err(); err != nil {
Expand Down Expand Up @@ -520,7 +525,7 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
tableToSchema map[string]*protos.TableSchema,
peerdbCols *protos.PeerDBColumns,
) error {
destinationTableNames, err := c.getDistinctTableNamesInBatch(ctx, flowName, batchId)
destinationTableNames, err := c.getDistinctTableNamesInBatch(ctx, flowName, batchId, tableToSchema)
if err != nil {
return err
}
Expand Down

0 comments on commit 564ee93

Please sign in to comment.