From c372ad62b49e4439e9f8d180598da34dce826a4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 17 Jan 2024 23:46:40 +0000 Subject: [PATCH] Revert "[snowflake] Cache information schema queries on snowflake" (#1093) Caching is running into issues with things like ``` CREATE MIRROR CREATE TARGET SCHEMA DROP MIRROR DROP TARGET SCHEMA CREATE TARGET SCHEMA CREATE MIRROR ``` where cache tricks system into thinking metadata tables exist & the system gets stuck trying to use tables it thinks it upsert Possible there's other issues, but caching needs to be thought over to address: 1. invalidate cache on specific errors 2. either have code bypass cache or make sure the code doesn't rely on the information being fresh 3. whole system being built to recover/resync better --- .../snowflake/information_schema_cache.go | 296 ------------------ flow/connectors/snowflake/qrep.go | 36 ++- flow/connectors/snowflake/snowflake.go | 94 ++++-- flow/go.mod | 1 - flow/go.sum | 2 - flow/peerdbenv/config.go | 6 - 6 files changed, 98 insertions(+), 337 deletions(-) delete mode 100644 flow/connectors/snowflake/information_schema_cache.go diff --git a/flow/connectors/snowflake/information_schema_cache.go b/flow/connectors/snowflake/information_schema_cache.go deleted file mode 100644 index c599e328ad..0000000000 --- a/flow/connectors/snowflake/information_schema_cache.go +++ /dev/null @@ -1,296 +0,0 @@ -package connsnowflake - -import ( - "context" - "database/sql" - "fmt" - "log/slog" - "strings" - "sync" - "time" - - "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/peerdbenv" - - "github.com/hashicorp/golang-lru/v2/expirable" -) - -const ( - schemaExistsSQL = ` - SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE UPPER(SCHEMA_NAME)=? - ` - - tableExistsSQL = ` - SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES - WHERE UPPER(TABLE_SCHEMA)=? and UPPER(TABLE_NAME)=? - ` - - tableSchemaSQL = ` - SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS - WHERE UPPER(TABLE_SCHEMA)=? AND UPPER(TABLE_NAME)=? ORDER BY ORDINAL_POSITION - ` - - tableSchemasInSchema = ` - SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS - WHERE UPPER(TABLE_SCHEMA)=? ORDER BY TABLE_NAME, ORDINAL_POSITION - ` -) - -type dbCacheKey struct { - dbName string - value string -} - -type informationSchemaCache struct { - tableSchemaCache *expirable.LRU[dbCacheKey, *protos.TableSchema] - schemaExistsCache *expirable.LRU[dbCacheKey, bool] - tableExistsCache *expirable.LRU[dbCacheKey, bool] -} - -func newInformationSchemaCache() *informationSchemaCache { - schemaExistsCache := expirable.NewLRU[dbCacheKey, bool](100_000, nil, time.Hour*1) - tableExistsCache := expirable.NewLRU[dbCacheKey, bool](100_000, nil, time.Hour*1) - - tsCacheExpiry := peerdbenv.PeerDBSnowflakeTableSchemaCacheSeconds() - tableSchemaCache := expirable.NewLRU[dbCacheKey, *protos.TableSchema](100_000, nil, tsCacheExpiry) - - return &informationSchemaCache{ - tableSchemaCache: tableSchemaCache, - schemaExistsCache: schemaExistsCache, - tableExistsCache: tableExistsCache, - } -} - -var ( - cache *informationSchemaCache - cacheInit sync.Once -) - -type SnowflakeInformationSchemaCache struct { - ctx context.Context - database *sql.DB - logger slog.Logger - dbname string -} - -func NewSnowflakeInformationSchemaCache( - ctx context.Context, - database *sql.DB, - logger slog.Logger, - dbname string, -) *SnowflakeInformationSchemaCache { - cacheInit.Do(func() { - cache = newInformationSchemaCache() - }) - return &SnowflakeInformationSchemaCache{ - ctx: ctx, - database: database, - logger: logger, - dbname: dbname, - } -} - -// SchemaExists returns true if the schema exists in the database. -func (c *SnowflakeInformationSchemaCache) SchemaExists(schemaName string) (bool, error) { - schemaName = strings.ToUpper(schemaName) - cacheKey := dbCacheKey{ - dbName: c.dbname, - value: schemaName, - } - - if cachedExists, ok := cache.schemaExistsCache.Get(cacheKey); ok { - if cachedExists { - return true, nil - } - } - - // If a schema doesn't exist in the cache, fall back to the database. - row := c.database.QueryRowContext(c.ctx, schemaExistsSQL, schemaName) - - var exists bool - err := row.Scan(&exists) - if err != nil { - return false, fmt.Errorf("error querying Snowflake peer for schema %s: %w", schemaName, err) - } - - if exists { - err = c.cacheAllTablesInSchema(schemaName) - if err != nil { - return false, fmt.Errorf("error caching all tables in schema %s: %w", schemaName, err) - } - } - - cache.schemaExistsCache.Add(cacheKey, exists) - - return exists, nil -} - -// TableExists returns true if the table exists in the database. -func (c *SnowflakeInformationSchemaCache) TableExists(schemaName string, tableName string) (bool, error) { - schemaName = strings.ToUpper(schemaName) - tableName = strings.ToUpper(tableName) - - schemaTable := &utils.SchemaTable{ - Schema: schemaName, - Table: tableName, - } - - cacheKey := dbCacheKey{ - dbName: c.dbname, - value: schemaTable.String(), - } - - if cachedExists, ok := cache.tableExistsCache.Get(cacheKey); ok { - if cachedExists { - return true, nil - } - } - - row := c.database.QueryRowContext(c.ctx, tableExistsSQL, schemaTable.Schema, schemaTable.Table) - - var exists bool - err := row.Scan(&exists) - if err != nil { - return false, fmt.Errorf("error querying Snowflake peer for table %s: %w", tableName, err) - } - - cache.tableExistsCache.Add(cacheKey, exists) - - return exists, nil -} - -func (c *SnowflakeInformationSchemaCache) TableSchemaForTable(tableName string) (*protos.TableSchema, error) { - tableName = strings.ToUpper(tableName) - cacheKey := dbCacheKey{ - dbName: c.dbname, - value: tableName, - } - - schemaTable, err := utils.ParseSchemaTable(tableName) - if err != nil { - return nil, fmt.Errorf("error while parsing table schema and name: %w", err) - } - - exists, err := c.SchemaExists(schemaTable.Schema) - if err != nil { - return nil, fmt.Errorf("error while checking if schema exists: %w", err) - } - - if !exists { - return nil, fmt.Errorf("schema %s does not exist", schemaTable.Schema) - } - - if cachedSchema, ok := cache.tableSchemaCache.Get(cacheKey); ok { - return cachedSchema, nil - } - - rows, err := c.database.QueryContext(c.ctx, tableSchemaSQL, schemaTable.Schema, schemaTable.Table) - if err != nil { - return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err) - } - - defer func() { - err = rows.Close() - if err != nil { - c.logger.Error("error while closing rows for reading schema of table", - slog.String("tableName", tableName), - slog.Any("error", err)) - } - }() - - var columnName, columnType sql.NullString - columnNames := make([]string, 0, 8) - columnTypes := make([]string, 0, 8) - for rows.Next() { - err = rows.Scan(&columnName, &columnType) - if err != nil { - return nil, fmt.Errorf("error reading row for schema of table %s: %w", tableName, err) - } - - genericColType, err := snowflakeTypeToQValueKind(columnType.String) - if err != nil { - // we use string for invalid types - genericColType = qvalue.QValueKindString - } - - columnNames = append(columnNames, columnName.String) - columnTypes = append(columnTypes, string(genericColType)) - } - - tblSchema := &protos.TableSchema{ - TableIdentifier: tableName, - ColumnNames: columnNames, - ColumnTypes: columnTypes, - } - - cache.tableSchemaCache.Add(cacheKey, tblSchema) - - return tblSchema, nil -} - -// cacheAllTablesInSchema caches all tables in a schema as exists. -func (c *SnowflakeInformationSchemaCache) cacheAllTablesInSchema(schemaName string) error { - c.logger.Info("[snowflake] caching all table schemas in schema", slog.String("schemaName", schemaName)) - - schemaName = strings.ToUpper(schemaName) - - rows, err := c.database.QueryContext(c.ctx, tableSchemasInSchema, schemaName) - if err != nil { - return fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", schemaName, err) - } - - defer func() { - err = rows.Close() - if err != nil { - c.logger.Error("error while closing rows for reading schema of table", - slog.String("schemaName", schemaName), - slog.Any("error", err)) - } - }() - - // current schema for a given table. - cs := make(map[string]*protos.TableSchema) - - var tableName, columnName, columnType sql.NullString - for rows.Next() { - err = rows.Scan(&tableName, &columnName, &columnType) - if err != nil { - return fmt.Errorf("error reading row for schema of table %s: %w", schemaName, err) - } - - colType, err := snowflakeTypeToQValueKind(columnType.String) - if err != nil { - colType = qvalue.QValueKindString - } - - if _, ok := cs[tableName.String]; !ok { - cs[tableName.String] = &protos.TableSchema{ - TableIdentifier: tableName.String, - ColumnNames: make([]string, 0, 8), - ColumnTypes: make([]string, 0, 8), - } - } - - cs[tableName.String].ColumnNames = append(cs[tableName.String].ColumnNames, columnName.String) - cs[tableName.String].ColumnTypes = append(cs[tableName.String].ColumnTypes, string(colType)) - } - - for _, tblSchema := range cs { - schemaTable := &utils.SchemaTable{ - Schema: schemaName, - Table: strings.ToUpper(tblSchema.TableIdentifier), - } - - stCacheKey := dbCacheKey{ - dbName: c.dbname, - value: schemaTable.String(), - } - - cache.tableExistsCache.Add(stCacheKey, true) - cache.tableSchemaCache.Add(stCacheKey, tblSchema) - } - - return nil -} diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 441e09d8a0..0c43ab6a00 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -281,23 +280,38 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error { } func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, []string, error) { - schema, err := c.informationSchemaCache.TableSchemaForTable(tableName) + // parse the table name to get the schema and table name + schemaTable, err := utils.ParseSchemaTable(tableName) if err != nil { - return nil, nil, fmt.Errorf("failed to get schema for table %s: %w", tableName, err) + return nil, nil, fmt.Errorf("failed to parse table name: %w", err) } - colNames := schema.ColumnNames - colTypes := schema.ColumnTypes + //nolint:gosec + queryString := fmt.Sprintf(` + SELECT column_name, data_type + FROM information_schema.columns + WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s' + ORDER BY ordinal_position + `, strings.ToUpper(schemaTable.Table), strings.ToUpper(schemaTable.Schema)) - dwhColTypes := make([]string, len(colTypes)) - for i, colType := range colTypes { - dwhColTypes[i], err = qValueKindToSnowflakeType(qvalue.QValueKind(colType)) - if err != nil { - return nil, nil, fmt.Errorf("failed to convert column type %s to DWH type: %w", colType, err) + rows, err := c.database.QueryContext(c.ctx, queryString) + if err != nil { + return nil, nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + var colName, colType pgtype.Text + colNames := make([]string, 0, 8) + colTypes := make([]string, 0, 8) + for rows.Next() { + if err := rows.Scan(&colName, &colType); err != nil { + return nil, nil, fmt.Errorf("failed to scan row: %w", err) } + colNames = append(colNames, colName.String) + colTypes = append(colTypes, colType.String) } - return colNames, dwhColTypes, nil + return colNames, colTypes, nil } // dropStage drops the stage for the given job. diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 58833871a2..929b840c29 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -58,6 +58,8 @@ const ( ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2 GROUP BY _PEERDB_DESTINATION_TABLE_NAME` + getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY ORDINAL_POSITION` insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" @@ -65,6 +67,8 @@ const ( WHERE MIRROR_JOB_NAME=?` updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" + checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA=? and TABLE_NAME=?` checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" @@ -72,14 +76,14 @@ const ( getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?" + checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?" ) type SnowflakeConnector struct { - ctx context.Context - database *sql.DB - logger slog.Logger - informationSchemaCache *SnowflakeInformationSchemaCache - metadataSchema string + ctx context.Context + database *sql.DB + metadataSchema string + logger slog.Logger } // creating this to capture array results from snowflake. @@ -143,16 +147,11 @@ func NewSnowflakeConnector(ctx context.Context, } flowName, _ := ctx.Value(shared.FlowNameKey).(string) - logger := *slog.With(slog.String(string(shared.FlowNameKey), flowName)) - - informationSchemaCache := NewSnowflakeInformationSchemaCache(ctx, database, logger, snowflakeProtoConfig.Database) - return &SnowflakeConnector{ - ctx: ctx, - database: database, - metadataSchema: metadataSchema, - logger: logger, - informationSchemaCache: informationSchemaCache, + ctx: ctx, + database: database, + metadataSchema: metadataSchema, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } @@ -224,7 +223,7 @@ func (c *SnowflakeConnector) GetTableSchema( ) (*protos.GetTableSchemaBatchOutput, error) { res := make(map[string]*protos.TableSchema) for _, tableName := range req.TableIdentifiers { - tableSchema, err := c.informationSchemaCache.TableSchemaForTable(tableName) + tableSchema, err := c.getTableSchemaForTable(strings.ToUpper(tableName)) if err != nil { return nil, err } @@ -237,6 +236,49 @@ func (c *SnowflakeConnector) GetTableSchema( }, nil } +func (c *SnowflakeConnector) getTableSchemaForTable(tableName string) (*protos.TableSchema, error) { + schemaTable, err := utils.ParseSchemaTable(tableName) + if err != nil { + return nil, fmt.Errorf("error while parsing table schema and name: %w", err) + } + rows, err := c.database.QueryContext(c.ctx, getTableSchemaSQL, schemaTable.Schema, schemaTable.Table) + if err != nil { + return nil, fmt.Errorf("error querying Snowflake peer for schema of table %s: %w", tableName, err) + } + defer func() { + err = rows.Close() + if err != nil { + c.logger.Error("error while closing rows for reading schema of table", + slog.String("tableName", tableName), + slog.Any("error", err)) + } + }() + + var columnName, columnType pgtype.Text + columnNames := make([]string, 0, 8) + columnTypes := make([]string, 0, 8) + for rows.Next() { + err = rows.Scan(&columnName, &columnType) + if err != nil { + return nil, fmt.Errorf("error reading row for schema of table %s: %w", tableName, err) + } + genericColType, err := snowflakeTypeToQValueKind(columnType.String) + if err != nil { + // we use string for invalid types + genericColType = qvalue.QValueKindString + } + + columnNames = append(columnNames, columnName.String) + columnTypes = append(columnTypes, string(genericColType)) + } + + return &protos.TableSchema{ + TableIdentifier: tableName, + ColumnNames: columnNames, + ColumnTypes: columnTypes, + }, nil +} + func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) @@ -707,12 +749,14 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } }() - metadataSchemaExists, err := c.informationSchemaCache.SchemaExists(c.metadataSchema) + row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) + var schemaExists pgtype.Bool + err = row.Scan(&schemaExists) if err != nil { return fmt.Errorf("unable to check if internal schema exists: %w", err) } - if metadataSchemaExists { + if schemaExists.Bool { _, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, c.metadataSchema, getRawTableIdentifier(jobName))) if err != nil { @@ -739,7 +783,12 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error { } func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableIdentifier string) (bool, error) { - return c.informationSchemaCache.TableExists(schemaIdentifier, tableIdentifier) + var result pgtype.Bool + err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, schemaIdentifier, tableIdentifier).Scan(&result) + if err != nil { + return false, fmt.Errorf("error while reading result row: %w", err) + } + return result.Bool, nil } func generateCreateTableSQLForNormalizedTable( @@ -861,12 +910,15 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, normali } func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - metaSchemaExists, err := c.informationSchemaCache.SchemaExists(c.metadataSchema) + // check if the internal schema exists + row := createSchemaTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, c.metadataSchema) + var schemaExists pgtype.Bool + err := row.Scan(&schemaExists) if err != nil { - return fmt.Errorf("unable to check if internal schema exists: %w", err) + return fmt.Errorf("error while reading result row: %w", err) } - if metaSchemaExists { + if schemaExists.Bool { c.logger.Info(fmt.Sprintf("internal schema %s already exists", c.metadataSchema)) return nil } diff --git a/flow/go.mod b/flow/go.mod index eb3eccd246..5239fe48b1 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -133,7 +133,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect diff --git a/flow/go.sum b/flow/go.sum index 6c865c3566..7e8d745158 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -260,8 +260,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 1ba9bb0f64..ca238899ac 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -74,9 +74,3 @@ func PeerDBCatalogDatabase() string { func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } - -// PEERDB_SNOWFLAKE_TABLE_SCHEMA_CACHE_SECONDS -func PeerDBSnowflakeTableSchemaCacheSeconds() time.Duration { - x := getEnvInt("PEERDB_SNOWFLAKE_TABLE_SCHEMA_CACHE_SECONDS", 600) - return time.Duration(x) * time.Second -}