diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index e65e74572d..3fcc8720a0 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -126,7 +126,23 @@ func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error) } func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - err := c.createQRepMetadataTable() + // NOTE that Snowflake does not support transactional DDL + createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil) + if err != nil { + return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) + } + // in case we return after error, ensure transaction is rolled back + defer func() { + deferErr := createMetadataTablesTx.Rollback() + if deferErr != sql.ErrTxDone && deferErr != nil { + log.Errorf("unexpected error while rolling back transaction for creating metadata tables: %v", deferErr) + } + }() + err = c.createPeerDBInternalSchema(createMetadataTablesTx) + if err != nil { + return err + } + err = c.createQRepMetadataTable(createMetadataTablesTx) if err != nil { return err } @@ -145,10 +161,15 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) } } + err = createMetadataTablesTx.Commit() + if err != nil { + return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err) + } + return nil } -func (c *SnowflakeConnector) createQRepMetadataTable() error { +func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.Tx) error { // Define the schema schemaStatement := ` CREATE TABLE IF NOT EXISTS %s.%s ( @@ -161,7 +182,7 @@ func (c *SnowflakeConnector) createQRepMetadataTable() error { ` queryString := fmt.Sprintf(schemaStatement, c.metadataSchema, qRepMetadataTableName) - _, err := c.database.Exec(queryString) + _, err := createMetadataTableTx.Exec(queryString) if err != nil { log.Errorf("failed to create table %s.%s: %v", c.metadataSchema, qRepMetadataTableName, err) return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6463249fc5..a5935bfefe 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -27,9 +27,9 @@ const ( mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL, SYNC_BATCH_ID INT NOT NULL,NORMALIZE_BATCH_ID INT NOT NULL)` - rawTablePrefix = "_PEERDB_RAW" - createPeerDBInternalSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" - createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, + rawTablePrefix = "_PEERDB_RAW" + createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s" + createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL, _PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL, _PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT, _PEERDB_UNCHANGED_TOAST_COLUMNS STRING)` @@ -202,6 +202,14 @@ func (c *SnowflakeConnector) SetupMetadataTables() error { if err != nil { return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err) } + // in case we return after error, ensure transaction is rolled back + defer func() { + deferErr := createMetadataTablesTx.Rollback() + if deferErr != sql.ErrTxDone && deferErr != nil { + log.Errorf("unexpected error while rolling back transaction for creating metadata tables: %v", deferErr) + } + }() + err = c.createPeerDBInternalSchema(createMetadataTablesTx) if err != nil { return err @@ -1105,7 +1113,7 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string, } func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error { - _, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createPeerDBInternalSchemaSQL, c.metadataSchema)) + _, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema)) if err != nil { return fmt.Errorf("error while creating internal schema for PeerDB: %w", err) }