Skip to content

Commit

Permalink
fix snowflake issues
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 31, 2023
1 parent 333aed0 commit 785c74d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
3 changes: 2 additions & 1 deletion dev-peerdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ then
exit 1
fi

docker compose -f docker-compose-dev.yml up --build
docker compose -f docker-compose-dev.yml up --build\
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
35 changes: 22 additions & 13 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,29 +208,38 @@ func (s *SnowflakeAvroSyncMethod) addMissingColumns(
}

if len(colsToTypes) > 0 {
// construct an alter table statement
alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName)
tx, err := s.connector.database.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

for colName, colType := range colsToTypes {
sfColType, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake)
if err != nil {
return fmt.Errorf("failed to convert QValueKind to Snowflake column type: %w", err)
}
alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s, ", colName, sfColType)
}
alterTableCmd = strings.TrimSuffix(alterTableCmd, ", ")
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("altering destination table %s with command `%s`", dstTableName, alterTableCmd)
upperCasedColName := strings.ToUpper(colName)
alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName)
alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType)

if _, err := s.connector.database.Exec(alterTableCmd); err != nil {
return fmt.Errorf("failed to alter destination table: %w", err)
} else {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("added missing columns to destination table %s", dstTableName)
}).Infof("altering destination table %s with command `%s`", dstTableName, alterTableCmd)

if _, err := tx.Exec(alterTableCmd); err != nil {
return fmt.Errorf("failed to alter destination table: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("successfully added missing columns to destination table %s", dstTableName)
} else {
log.WithFields(log.Fields{
"flowName": flowJobName,
Expand Down

0 comments on commit 785c74d

Please sign in to comment.