diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 65502da0bf..d078d39559 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -247,7 +247,9 @@ func (a *FlowableActivity) CreateNormalizedTable( defer shutdown() tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for tableIdentifier, tableSchema := range tableNameSchemaMapping { + for _, tableMapping := range config.TableMappings { + tableIdentifier := tableMapping.DestinationTableIdentifier + tableSchema := tableNameSchemaMapping[tableMapping.DestinationTableIdentifier] existing, err := conn.SetupNormalizedTable( ctx, tx, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 372a691349..a7ea33af26 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -42,22 +42,22 @@ func (c *ClickHouseConnector) SetupNormalizedTable( ctx context.Context, tx interface{}, config *protos.SetupNormalizedTableBatchInput, - tableIdentifier string, - tableSchema *protos.TableSchema, + destinationTableIdentifier string, + sourceTableSchema *protos.TableSchema, ) (bool, error) { - tableAlreadyExists, err := c.checkIfTableExists(ctx, c.config.Database, tableIdentifier) + tableAlreadyExists, err := c.checkIfTableExists(ctx, c.config.Database, destinationTableIdentifier) if err != nil { return false, fmt.Errorf("error occurred while checking if normalized table exists: %w", err) } if tableAlreadyExists && !config.IsResync { - c.logger.Info("[ch] normalized table already exists, skipping", "table", tableIdentifier) + c.logger.Info("[ch] normalized table already exists, skipping", "table", destinationTableIdentifier) return true, nil } normalizedTableCreateSQL, err := generateCreateTableSQLForNormalizedTable( config, - tableIdentifier, - tableSchema, + destinationTableIdentifier, + sourceTableSchema, ) if err != nil { return false, fmt.Errorf("error while generating create table sql for normalized table: %w", err) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e7dc2bf688..b01c3eaf83 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -131,8 +131,8 @@ type NormalizedTablesConnector interface { ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, - tableIdentifier string, - tableSchema *protos.TableSchema, + destinationTableIdentifier string, + sourceTableSchema *protos.TableSchema, ) (bool, error) }