Skip to content

Commit

Permalink
added explicit schema creation for qrep setup
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 23, 2023
1 parent 47a8bf7 commit c02ad5a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
27 changes: 24 additions & 3 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,23 @@ func (c *SnowflakeConnector) isPartitionSynced(partitionID string) (bool, error)
}

func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
err := c.createQRepMetadataTable()
// NOTE that Snowflake does not support transactional DDL
createMetadataTablesTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := createMetadataTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.Errorf("unexpected error while rolling back transaction for creating metadata tables: %v", deferErr)
}
}()
err = c.createPeerDBInternalSchema(createMetadataTablesTx)
if err != nil {
return err
}
err = c.createQRepMetadataTable(createMetadataTablesTx)
if err != nil {
return err
}
Expand All @@ -145,10 +161,15 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
}
}

err = createMetadataTablesTx.Commit()
if err != nil {
return fmt.Errorf("unable to commit transaction for creating metadata tables: %w", err)
}

return nil
}

func (c *SnowflakeConnector) createQRepMetadataTable() error {
func (c *SnowflakeConnector) createQRepMetadataTable(createMetadataTableTx *sql.Tx) error {
// Define the schema
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s.%s (
Expand All @@ -161,7 +182,7 @@ func (c *SnowflakeConnector) createQRepMetadataTable() error {
`
queryString := fmt.Sprintf(schemaStatement, c.metadataSchema, qRepMetadataTableName)

_, err := c.database.Exec(queryString)
_, err := createMetadataTableTx.Exec(queryString)
if err != nil {
log.Errorf("failed to create table %s.%s: %v", c.metadataSchema, qRepMetadataTableName, err)
return fmt.Errorf("failed to create table %s.%s: %w", c.metadataSchema, qRepMetadataTableName, err)
Expand Down
16 changes: 12 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const (
mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL,
SYNC_BATCH_ID INT NOT NULL,NORMALIZE_BATCH_ID INT NOT NULL)`
rawTablePrefix = "_PEERDB_RAW"
createPeerDBInternalSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s"
createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL,
rawTablePrefix = "_PEERDB_RAW"
createSchemaSQL = "CREATE TRANSIENT SCHEMA IF NOT EXISTS %s"
createRawTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_UID STRING NOT NULL,
_PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL,
_PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT,
_PEERDB_UNCHANGED_TOAST_COLUMNS STRING)`
Expand Down Expand Up @@ -202,6 +202,14 @@ func (c *SnowflakeConnector) SetupMetadataTables() error {
if err != nil {
return fmt.Errorf("unable to begin transaction for creating metadata tables: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := createMetadataTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.Errorf("unexpected error while rolling back transaction for creating metadata tables: %v", deferErr)
}
}()

err = c.createPeerDBInternalSchema(createMetadataTablesTx)
if err != nil {
return err
Expand Down Expand Up @@ -1105,7 +1113,7 @@ func (c *SnowflakeConnector) updateNormalizeMetadata(flowJobName string,
}

func (c *SnowflakeConnector) createPeerDBInternalSchema(createSchemaTx *sql.Tx) error {
_, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createPeerDBInternalSchemaSQL, c.metadataSchema))
_, err := createSchemaTx.ExecContext(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
if err != nil {
return fmt.Errorf("error while creating internal schema for PeerDB: %w", err)
}
Expand Down

0 comments on commit c02ad5a

Please sign in to comment.