diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index f9383d8c5e..f6e687435d 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -178,7 +178,8 @@ func (h *FlowRequestHandler) GetColumns( cols.column_name = pk.column_name WHERE cols.table_schema = $3 - AND cols.table_name = $4; + AND cols.table_name = $4 + ORDER BY cols.ordinal_position; `, req.SchemaName, req.TableName, req.SchemaName, req.TableName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index ab460f01ea..0c43ab6a00 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -15,7 +15,6 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/jackc/pgx/v5/pgtype" - "golang.org/x/exp/maps" "google.golang.org/protobuf/encoding/protojson" ) @@ -259,14 +258,13 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig destTable := config.DestinationTableIdentifier stageName := c.getStageNameForJob(config.FlowJobName) - colInfo, err := c.getColsFromTable(destTable) + colNames, _, err := c.getColsFromTable(destTable) if err != nil { c.logger.Error(fmt.Sprintf("failed to get columns from table %s", destTable), slog.Any("error", err)) return fmt.Errorf("failed to get columns from table %s: %w", destTable, err) } - allCols := colInfo.Columns - err = CopyStageToDestination(c, config, destTable, stageName, allCols) + err = CopyStageToDestination(c, config, destTable, stageName, colNames) if err != nil { c.logger.Error("failed to copy stage to destination", slog.Any("error", err)) return fmt.Errorf("failed to copy stage to destination: %w", err) @@ -281,11 +279,11 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error { return c.dropStage(config.StagingPath, config.FlowJobName) } -func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnInformation, error) { +func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []string, error) { // parse the table name to get the schema and table name schemaTable, err := utils.ParseSchemaTable(tableName) if err != nil { - return nil, fmt.Errorf("failed to parse table name: %w", err) + return nil, nil, fmt.Errorf("failed to parse table name: %w", err) } //nolint:gosec @@ -293,28 +291,27 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnIn SELECT column_name, data_type FROM information_schema.columns WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s' + ORDER BY ordinal_position `, strings.ToUpper(schemaTable.Table), strings.ToUpper(schemaTable.Schema)) rows, err := c.database.QueryContext(c.ctx, queryString) if err != nil { - return nil, fmt.Errorf("failed to execute query: %w", err) + return nil, nil, fmt.Errorf("failed to execute query: %w", err) } defer rows.Close() - var colName pgtype.Text - var colType pgtype.Text - columnMap := map[string]string{} + var colName, colType pgtype.Text + colNames := make([]string, 0, 8) + colTypes := make([]string, 0, 8) for rows.Next() { if err := rows.Scan(&colName, &colType); err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) + return nil, nil, fmt.Errorf("failed to scan row: %w", err) } - columnMap[colName.String] = colType.String + colNames = append(colNames, colName.String) + colTypes = append(colTypes, colType.String) } - return &model.ColumnInformation{ - ColumnMap: columnMap, - Columns: maps.Keys(columnMap), - }, nil + return colNames, colTypes, nil } // dropStage drops the stage for the given job. diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 86ceed4b5d..b9f7e270c4 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -73,19 +73,18 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( } s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage)) - colInfo, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) + colNames, _, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) if err != nil { return 0, err } - allCols := colInfo.Columns err = s.putFileToStage(avroFile, stage) if err != nil { return 0, err } s.connector.logger.Info("pushed avro file to stage", tableLog) - err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, allCols) + err = CopyStageToDestination(s.connector, s.config, s.config.DestinationTableIdentifier, stage, colNames) if err != nil { return 0, err } @@ -300,14 +299,15 @@ func (c *SnowflakeConnector) GetCopyTransformation( dstTableName string, syncedAtCol string, ) (*CopyInfo, error) { - colInfo, colsErr := c.getColsFromTable(dstTableName) + colNames, colTypes, colsErr := c.getColsFromTable(dstTableName) if colsErr != nil { return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr) } - transformations := make([]string, 0, len(colInfo.ColumnMap)) - columnOrder := make([]string, 0, len(colInfo.ColumnMap)) - for avroColName, colType := range colInfo.ColumnMap { + transformations := make([]string, 0, len(colNames)) + columnOrder := make([]string, 0, len(colNames)) + for idx, avroColName := range colNames { + colType := colTypes[idx] normalizedColName := SnowflakeIdentifierNormalize(avroColName) columnOrder = append(columnOrder, normalizedColName) if avroColName == syncedAtCol { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 35c82adf76..9a2e28b20c 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -59,7 +59,7 @@ const ( _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2 GROUP BY _PEERDB_DESTINATION_TABLE_NAME` getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA=? AND TABLE_NAME=?` + WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY ORDINAL_POSITION` insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" diff --git a/flow/model/column.go b/flow/model/column.go deleted file mode 100644 index 5cbf25dc2e..0000000000 --- a/flow/model/column.go +++ /dev/null @@ -1,8 +0,0 @@ -package model - -type ColumnInformation struct { - // This is a mapping from column name to column type - // Example: "name" -> "VARCHAR" - ColumnMap map[string]string - Columns []string // List of column names -}