From 329d77c6ce343a79e1861be5c53c7206d71f07eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 17:10:05 +0000 Subject: [PATCH] remove raw schema exists check with mirror jobs table --- flow/connectors/snowflake/snowflake.go | 28 +++----------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7f9b3c33fe..6c50aecfe8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -26,10 +26,9 @@ import ( ) const ( - mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" - rawTablePrefix = "_PEERDB_RAW" - createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" - createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, + rawTablePrefix = "_PEERDB_RAW" + createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" + createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, _PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL, _PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT, _PEERDB_UNCHANGED_TOAST_COLUMNS STRING)` @@ -63,7 +62,6 @@ const ( checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? and TABLE_NAME=?` - checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" @@ -533,16 +531,6 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest }, nil } - rawSchemaExists, err := c.rawSchemaExists(req.FlowJobName) - if err != nil { - return nil, err - } - // sync hasn't created job metadata yet, chill. - if !rawSchemaExists { - return &model.NormalizeResponse{ - Done: false, - }, nil - } destinationTableNames, err := c.getDistinctTableNamesInBatch( req.FlowJobName, req.SyncBatchID, @@ -748,16 +736,6 @@ func getRawTableIdentifier(jobName string) string { return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } -func (c *SnowflakeConnector) rawSchemaExists(jobName string) (bool, error) { - var result pgtype.Bool - err := c.database.QueryRowContext(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.rawSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) - if err != nil { - return false, fmt.Errorf("error reading result row: %w", err) - } - return result.Bool, nil -} - func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { renameTablesTx, err := c.database.BeginTx(c.ctx, nil) if err != nil {