Skip to content

Commit

Permalink
fixed lints pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 4, 2023
1 parent 03a8f0a commit 85db75b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
14 changes: 9 additions & 5 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,17 @@ func (p *PostgresCDCSource) processRelationMessage(
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
// creating maps for lookup later
prevRelMap := make(map[string]*uint32)
currRelMap := make(map[string]*uint32)
prevRelMap := make(map[string]*protos.PostgresTableIdentifier)
currRelMap := make(map[string]*protos.PostgresTableIdentifier)
for _, column := range prevRel.Columns {
prevRelMap[column.Name] = &column.DataType
prevRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
}
for _, column := range currRel.Columns {
currRelMap[column.Name] = &column.DataType
currRelMap[column.Name] = &protos.PostgresTableIdentifier{
RelId: column.DataType,
}
}

schemaDelta := &protos.TableSchemaDelta{
Expand All @@ -571,7 +575,7 @@ func (p *PostgresCDCSource) processRelationMessage(
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if *prevRelMap[column.Name] != *currRelMap[column.Name] {
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,8 @@ func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDel
}()

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"", schemaDelta.DstTableName,
droppedColumn))
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"",
schemaDelta.DstTableName, droppedColumn))
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.DstTableName, err)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.T

var columnName, columnType string
for rows.Next() {
rows.Scan(&columnName, &columnType)
err = rows.Scan(&columnName, &columnType)
if err != nil {
return nil, fmt.Errorf("error reading row for schema of table %s: %w", tableName, err)
}
genericColType, err := snowflakeTypeToQValueKind(columnType)
if err != nil {
// we use string for invalid types
Expand Down

0 comments on commit 85db75b

Please sign in to comment.