diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index d6504322c..94155343e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -253,6 +253,7 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( ctx context.Context, flowJobName string, batchId int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTableName := c.getRawTableName(flowJobName) @@ -283,7 +284,11 @@ func (c *BigQueryConnector) getDistinctTableNamesInBatch( } if len(row) > 0 { value := row[0].(string) - distinctTableNames = append(distinctTableNames, value) + if _, ok := tableToSchema[value]; ok { + distinctTableNames = append(distinctTableNames, value) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", value) + } } } @@ -446,6 +451,7 @@ func (c *BigQueryConnector) mergeTablesInThisBatch( ctx, flowName, batchId, + tableToSchema, ) if err != nil { return fmt.Errorf("couldn't get distinct table names to normalize: %w", err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fabe07a35..f7f6d8f98 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -259,6 +259,7 @@ func (c *ClickHouseConnector) NormalizeRecords( req.FlowJobName, req.SyncBatchID, normBatchID, + req.TableNameSchemaMapping, ) if err != nil { c.logger.Error("[clickhouse] error while getting distinct table names in batch", "error", err) @@ -484,6 +485,7 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( flowJobName string, syncBatchID int64, normalizeBatchID int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTbl := c.getRawTableName(flowJobName) @@ -507,7 +509,11 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( return nil, errors.New("table name is not valid") } - tableNames = append(tableNames, tableName.String) + if _, ok := tableToSchema[tableName.String]; ok { + tableNames = append(tableNames, tableName.String) + } else { + c.logger.Warn("table not found in table to schema mapping", "table", tableName.String) + } } if err := rows.Err(); err != nil { diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 70b0d15d1..172a45c6b 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "strings" "github.com/jackc/pgerrcode" @@ -594,6 +595,7 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch( flowJobName string, syncBatchID int64, normalizeBatchID int64, + tableToSchema map[string]*protos.TableSchema, ) ([]string, error) { rawTableIdentifier := getRawTableIdentifier(flowJobName) @@ -607,7 +609,13 @@ func (c *PostgresConnector) getDistinctTableNamesInBatch( if err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } - return destinationTableNames, nil + return slices.DeleteFunc(destinationTableNames, func(name string) bool { + if _, ok := tableToSchema[name]; !ok { + c.logger.Warn("table not found in table to schema mapping", "table", name) + return true + } + return false + }), nil } func (c *PostgresConnector) getTableNametoUnchangedCols( diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 435df65ed..02c61b2eb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -643,7 +643,7 @@ func (c *PostgresConnector) NormalizeRecords( } destinationTableNames, err := c.getDistinctTableNamesInBatch( - ctx, req.FlowJobName, req.SyncBatchID, normBatchID) + ctx, req.FlowJobName, req.SyncBatchID, normBatchID, req.TableNameSchemaMapping) if err != nil { return nil, err }