Skip to content

Commit

Permalink
Revert "[snowflake] Cache information schema queries on snowflake" (#…
Browse files Browse the repository at this point in the history
…1093)

Caching is running into issues with things like
```
CREATE MIRROR
CREATE TARGET SCHEMA
DROP MIRROR
DROP TARGET SCHEMA
CREATE TARGET SCHEMA
CREATE MIRROR
```
where cache tricks system into thinking metadata tables exist
& the system gets stuck trying to use tables it thinks it upsert

Possible there's other issues, but caching needs to be thought over to address:
1. invalidate cache on specific errors
2. either have code bypass cache or make sure the code doesn't rely on the information being fresh
3. whole system being built to recover/resync better
  • Loading branch information
serprex authored Jan 17, 2024
1 parent 84aaef2 commit c372ad6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 337 deletions.
296 changes: 0 additions & 296 deletions flow/connectors/snowflake/information_schema_cache.go

This file was deleted.

36 changes: 25 additions & 11 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -281,23 +280,38 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
}

func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []string, error) {
schema, err := c.informationSchemaCache.TableSchemaForTable(tableName)
// parse the table name to get the schema and table name
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
return nil, nil, fmt.Errorf("failed to get schema for table %s: %w", tableName, err)
return nil, nil, fmt.Errorf("failed to parse table name: %w", err)
}

colNames := schema.ColumnNames
colTypes := schema.ColumnTypes
//nolint:gosec
queryString := fmt.Sprintf(`
SELECT column_name, data_type
FROM information_schema.columns
WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s'
ORDER BY ordinal_position
`, strings.ToUpper(schemaTable.Table), strings.ToUpper(schemaTable.Schema))

dwhColTypes := make([]string, len(colTypes))
for i, colType := range colTypes {
dwhColTypes[i], err = qValueKindToSnowflakeType(qvalue.QValueKind(colType))
if err != nil {
return nil, nil, fmt.Errorf("failed to convert column type %s to DWH type: %w", colType, err)
rows, err := c.database.QueryContext(c.ctx, queryString)
if err != nil {
return nil, nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

var colName, colType pgtype.Text
colNames := make([]string, 0, 8)
colTypes := make([]string, 0, 8)
for rows.Next() {
if err := rows.Scan(&colName, &colType); err != nil {
return nil, nil, fmt.Errorf("failed to scan row: %w", err)
}
colNames = append(colNames, colName.String)
colTypes = append(colTypes, colType.String)
}

return colNames, dwhColTypes, nil
return colNames, colTypes, nil
}

// dropStage drops the stage for the given job.
Expand Down
Loading

0 comments on commit c372ad6

Please sign in to comment.