From 1c5b4c4b97cad49f7e318bde9281c4fcebbfb63b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 15 Oct 2024 19:15:20 +0530 Subject: [PATCH 1/2] fix setup flow for table addition --- flow/activities/flowable.go | 4 +++- flow/connectors/clickhouse/normalize.go | 12 ++++++------ flow/connectors/core.go | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 65502da0bf..31a2b9a920 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -247,7 +247,9 @@ func (a *FlowableActivity) CreateNormalizedTable( defer shutdown() tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) - for tableIdentifier, tableSchema := range tableNameSchemaMapping { + for _, tableMapping := range config.TableMappings { + tableIdentifier := tableMapping.DestinationTableIdentifier + tableSchema := tableNameSchemaMapping[tableMapping.SourceTableIdentifier] existing, err := conn.SetupNormalizedTable( ctx, tx, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 372a691349..a7ea33af26 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -42,22 +42,22 @@ func (c *ClickHouseConnector) SetupNormalizedTable( ctx context.Context, tx interface{}, config *protos.SetupNormalizedTableBatchInput, - tableIdentifier string, - tableSchema *protos.TableSchema, + destinationTableIdentifier string, + sourceTableSchema *protos.TableSchema, ) (bool, error) { - tableAlreadyExists, err := c.checkIfTableExists(ctx, c.config.Database, tableIdentifier) + tableAlreadyExists, err := c.checkIfTableExists(ctx, c.config.Database, destinationTableIdentifier) if err != nil { return false, fmt.Errorf("error occurred while checking if normalized table exists: %w", err) } if tableAlreadyExists && !config.IsResync { - c.logger.Info("[ch] normalized table already exists, skipping", "table", tableIdentifier) + c.logger.Info("[ch] normalized table already exists, skipping", "table", destinationTableIdentifier) return true, nil } normalizedTableCreateSQL, err := generateCreateTableSQLForNormalizedTable( config, - tableIdentifier, - tableSchema, + destinationTableIdentifier, + sourceTableSchema, ) if err != nil { return false, fmt.Errorf("error while generating create table sql for normalized table: %w", err) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e7dc2bf688..b01c3eaf83 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -131,8 +131,8 @@ type NormalizedTablesConnector interface { ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, - tableIdentifier string, - tableSchema *protos.TableSchema, + destinationTableIdentifier string, + sourceTableSchema *protos.TableSchema, ) (bool, error) } From 3da405bf47a263e771478363a9b5867b22efac1e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 15 Oct 2024 23:06:52 +0530 Subject: [PATCH 2/2] minor fix --- flow/activities/flowable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 31a2b9a920..d078d39559 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -249,7 +249,7 @@ func (a *FlowableActivity) CreateNormalizedTable( tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) for _, tableMapping := range config.TableMappings { tableIdentifier := tableMapping.DestinationTableIdentifier - tableSchema := tableNameSchemaMapping[tableMapping.SourceTableIdentifier] + tableSchema := tableNameSchemaMapping[tableMapping.DestinationTableIdentifier] existing, err := conn.SetupNormalizedTable( ctx, tx,