Skip to content

Commit

Permalink
order by ordinal_position when querying information_schema.columns
Browse files Browse the repository at this point in the history
replace model.ColumnInformation with two ordered string arrays
  • Loading branch information
serprex committed Dec 29, 2023
1 parent 0dda6b9 commit d2ae3b1
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 33 deletions.
3 changes: 2 additions & 1 deletion flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func (h *FlowRequestHandler) GetColumns(
cols.column_name = pk.column_name
WHERE
cols.table_schema = $3
AND cols.table_name = $4;
AND cols.table_name = $4
ORDER BY cols.ordinal_position;
`, req.SchemaName, req.TableName, req.SchemaName, req.TableName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
Expand Down
29 changes: 13 additions & 16 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/jackc/pgx/v5/pgtype"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -259,14 +258,13 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig
destTable := config.DestinationTableIdentifier
stageName := c.getStageNameForJob(config.FlowJobName)

colInfo, err := c.getColsFromTable(destTable)
colNames, _, err := c.getColsFromTable(destTable)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to get columns from table %s", destTable), slog.Any("error", err))
return fmt.Errorf("failed to get columns from table %s: %w", destTable, err)
}

allCols := colInfo.Columns
err = CopyStageToDestination(c, config, destTable, stageName, allCols)
err = CopyStageToDestination(c, config, destTable, stageName, colNames)
if err != nil {
c.logger.Error("failed to copy stage to destination", slog.Any("error", err))
return fmt.Errorf("failed to copy stage to destination: %w", err)
Expand All @@ -281,40 +279,39 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
return c.dropStage(config.StagingPath, config.FlowJobName)
}

func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnInformation, error) {
func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []string, error) {
// parse the table name to get the schema and table name
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
return nil, fmt.Errorf("failed to parse table name: %w", err)
return nil, nil, fmt.Errorf("failed to parse table name: %w", err)
}

//nolint:gosec
queryString := fmt.Sprintf(`
SELECT column_name, data_type
FROM information_schema.columns
WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s'
ORDER BY ordinal_position
`, strings.ToUpper(schemaTable.Table), strings.ToUpper(schemaTable.Schema))

rows, err := c.database.QueryContext(c.ctx, queryString)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
return nil, nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

var colName pgtype.Text
var colType pgtype.Text
columnMap := map[string]string{}
var colName, colType pgtype.Text
colNames := make([]string, 0, 8)
colTypes := make([]string, 0, 8)
for rows.Next() {
if err := rows.Scan(&colName, &colType); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
return nil, nil, fmt.Errorf("failed to scan row: %w", err)
}
columnMap[colName.String] = colType.String
colNames = append(colNames, colName.String)
colTypes = append(colTypes, colType.String)
}

return &model.ColumnInformation{
ColumnMap: columnMap,
Columns: maps.Keys(columnMap),
}, nil
return colNames, colTypes, nil
}

// dropStage drops the stage for the given job.
Expand Down
14 changes: 7 additions & 7 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,18 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
}
s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage))

colInfo, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier)
colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier)
if err != nil {
return 0, err
}

allCols := colInfo.Columns
err = s.putFileToStage(avroFile, stage)
if err != nil {
return 0, err
}
s.connector.logger.Info("pushed avro file to stage", tableLog)

err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, allCols)
err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -300,14 +299,15 @@ func (c *SnowflakeConnector) GetCopyTransformation(
dstTableName string,
syncedAtCol string,
) (*CopyInfo, error) {
colInfo, colsErr := c.getColsFromTable(dstTableName)
colNames, colTypes, colsErr := c.getColsFromTable(dstTableName)
if colsErr != nil {
return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr)
}

transformations := make([]string, 0, len(colInfo.ColumnMap))
columnOrder := make([]string, 0, len(colInfo.ColumnMap))
for avroColName, colType := range colInfo.ColumnMap {
transformations := make([]string, 0, len(colNames))
columnOrder := make([]string, 0, len(colNames))
for idx, avroColName := range colNames {
colType := colTypes[idx]
normalizedColName := SnowflakeIdentifierNormalize(avroColName)
columnOrder = append(columnOrder, normalizedColName)
if avroColName == syncedAtCol {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2
GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA=? AND TABLE_NAME=?`
WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY ORDINAL_POSITION`

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)"

Expand Down
8 changes: 0 additions & 8 deletions flow/model/column.go

This file was deleted.

0 comments on commit d2ae3b1

Please sign in to comment.