From 85db75bb450503a4792416ce5efbb0fdb0dd2556 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Wed, 4 Oct 2023 23:03:07 +0530 Subject: [PATCH] fixed lints pt.1 --- flow/connectors/postgres/cdc.go | 14 +++++++++----- flow/connectors/postgres/postgres.go | 4 ++-- flow/connectors/snowflake/snowflake.go | 5 ++++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 76ce9e5640..6bf823ab9b 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -545,13 +545,17 @@ func (p *PostgresCDCSource) processRelationMessage( // retrieve initial RelationMessage for table changed. prevRel := p.relationMessageMapping[currRel.RelationId] // creating maps for lookup later - prevRelMap := make(map[string]*uint32) - currRelMap := make(map[string]*uint32) + prevRelMap := make(map[string]*protos.PostgresTableIdentifier) + currRelMap := make(map[string]*protos.PostgresTableIdentifier) for _, column := range prevRel.Columns { - prevRelMap[column.Name] = &column.DataType + prevRelMap[column.Name] = &protos.PostgresTableIdentifier{ + RelId: column.DataType, + } } for _, column := range currRel.Columns { - currRelMap[column.Name] = &column.DataType + currRelMap[column.Name] = &protos.PostgresTableIdentifier{ + RelId: column.DataType, + } } schemaDelta := &protos.TableSchemaDelta{ @@ -571,7 +575,7 @@ func (p *PostgresCDCSource) processRelationMessage( }) // present in previous and current relation messages, but data types have changed. // so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first. - } else if *prevRelMap[column.Name] != *currRelMap[column.Name] { + } else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId { schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name) schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{ ColumnName: column.Name, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index c86c729fc9..fdca7f07e4 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -688,8 +688,8 @@ func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDel }() for _, droppedColumn := range schemaDelta.DroppedColumns { - _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"", schemaDelta.DstTableName, - droppedColumn)) + _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"", + schemaDelta.DstTableName, droppedColumn)) if err != nil { return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn, schemaDelta.DstTableName, err) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b60d9871bc..d0c702656e 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -256,7 +256,10 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T var columnName, columnType string for rows.Next() { - rows.Scan(&columnName, &columnType) + err = rows.Scan(&columnName, &columnType) + if err != nil { + return nil, fmt.Errorf("error reading row for schema of table %s: %w", tableName, err) + } genericColType, err := snowflakeTypeToQValueKind(columnType) if err != nil { // we use string for invalid types