Skip to content

Commit

Permalink
Validate peer: permission check for snowflake (#1126)
Browse files Browse the repository at this point in the history
Checks:
1. Create schema
2. Create table
3. Insertion into table
4. Drop table
5. Drop schema
  • Loading branch information
Amogh-Bharadwaj authored Jan 24, 2024
1 parent 63f3717 commit 932820f
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
_PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL,
_PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT,
_PEERDB_UNCHANGED_TOAST_COLUMNS STRING)`
createDummyTableSQL = "CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_DUMMY_COL STRING)"
rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
toVariantColumnName = "VAR_COLS"
Expand Down Expand Up @@ -76,6 +77,7 @@ const (
getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropSchemaIfExistsSQL = "DROP SCHEMA IF EXISTS %s"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
)

Expand Down Expand Up @@ -105,6 +107,64 @@ type UnchangedToastColumnResult struct {
UnchangedToastColumns ArrayString
}

func TableCheck(ctx context.Context, database *sql.DB) error {
dummySchema := "PEERDB_DUMMY_SCHEMA_" + shared.RandomString(4)
dummyTable := "PEERDB_DUMMY_TABLE_" + shared.RandomString(4)

// In a transaction, create a table, insert a row into the table and then drop the table
// If any of these steps fail, the transaction will be rolled back
tx, err := database.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := tx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
activity.GetLogger(ctx).Error("error while rolling back transaction for table check",
slog.Any("error", deferErr))
}
}()

// create schema
_, err = tx.ExecContext(ctx, fmt.Sprintf(createSchemaSQL, dummySchema))
if err != nil {
return fmt.Errorf("failed to create schema: %w", err)
}

// create table
_, err = tx.ExecContext(ctx, fmt.Sprintf(createDummyTableSQL, dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
}

// insert row
_, err = tx.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s.%s VALUES ('dummy')", dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to insert row: %w", err)
}

// drop table
_, err = tx.ExecContext(ctx, fmt.Sprintf(dropTableIfExistsSQL, dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to drop table: %w", err)
}

// drop schema
_, err = tx.ExecContext(ctx, fmt.Sprintf(dropSchemaIfExistsSQL, dummySchema))
if err != nil {
return fmt.Errorf("failed to drop schema: %w", err)
}

// commit transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

func NewSnowflakeConnector(ctx context.Context,
snowflakeProtoConfig *protos.SnowflakeConfig,
) (*SnowflakeConnector, error) {
Expand Down Expand Up @@ -141,6 +201,11 @@ func NewSnowflakeConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Snowflake peer: %w", err)
}

err = TableCheck(ctx, database)
if err != nil {
return nil, fmt.Errorf("could not validate snowflake peer: %w", err)
}

metadataSchema := "_PEERDB_INTERNAL"
if snowflakeProtoConfig.MetadataSchema != nil {
metadataSchema = *snowflakeProtoConfig.MetadataSchema
Expand Down

0 comments on commit 932820f

Please sign in to comment.