Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate peer: permission check for snowflake #1126

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
_PEERDB_TIMESTAMP INT NOT NULL,_PEERDB_DESTINATION_TABLE_NAME STRING NOT NULL,_PEERDB_DATA STRING NOT NULL,
_PEERDB_RECORD_TYPE INTEGER NOT NULL, _PEERDB_MATCH_DATA STRING,_PEERDB_BATCH_ID INT,
_PEERDB_UNCHANGED_TOAST_COLUMNS STRING)`
createDummyTableSQL = "CREATE TABLE IF NOT EXISTS %s.%s(_PEERDB_DUMMY_COL STRING)"
rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
toVariantColumnName = "VAR_COLS"
Expand Down Expand Up @@ -75,6 +76,7 @@ const (
getLastSyncNormalizeBatchID_SQL = "SELECT SYNC_BATCH_ID, NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropSchemaIfExistsSQL = "DROP SCHEMA IF EXISTS %s"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"
)

Expand Down Expand Up @@ -104,6 +106,64 @@ type UnchangedToastColumnResult struct {
UnchangedToastColumns ArrayString
}

func TableCheck(ctx context.Context, database *sql.DB) error {
dummySchema := "PEERDB_DUMMY_SCHEMA_" + shared.RandomString(4)
dummyTable := "PEERDB_DUMMY_TABLE_" + shared.RandomString(4)

// In a transaction, create a table, insert a row into the table and then drop the table
// If any of these steps fail, the transaction will be rolled back
tx, err := database.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
// in case we return after error, ensure transaction is rolled back
defer func() {
deferErr := tx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
activity.GetLogger(ctx).Error("error while rolling back transaction for table check",
slog.Any("error", deferErr))
}
}()

// create schema
_, err = tx.ExecContext(ctx, fmt.Sprintf(createSchemaSQL, dummySchema))
if err != nil {
return fmt.Errorf("failed to create schema: %w", err)
}

// create table
_, err = tx.ExecContext(ctx, fmt.Sprintf(createDummyTableSQL, dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
}

// insert row
_, err = tx.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s.%s VALUES ('dummy')", dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to insert row: %w", err)
}

// drop table
_, err = tx.ExecContext(ctx, fmt.Sprintf(dropTableIfExistsSQL, dummySchema, dummyTable))
if err != nil {
return fmt.Errorf("failed to drop table: %w", err)
}

// drop schema
_, err = tx.ExecContext(ctx, fmt.Sprintf(dropSchemaIfExistsSQL, dummySchema))
if err != nil {
return fmt.Errorf("failed to drop schema: %w", err)
}

// commit transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

func NewSnowflakeConnector(ctx context.Context,
snowflakeProtoConfig *protos.SnowflakeConfig,
) (*SnowflakeConnector, error) {
Expand Down Expand Up @@ -140,6 +200,11 @@ func NewSnowflakeConnector(ctx context.Context,
return nil, fmt.Errorf("failed to open connection to Snowflake peer: %w", err)
}

err = TableCheck(ctx, database)
if err != nil {
return nil, fmt.Errorf("could not validate snowflake peer: %w", err)
}

metadataSchema := "_PEERDB_INTERNAL"
if snowflakeProtoConfig.MetadataSchema != nil {
metadataSchema = *snowflakeProtoConfig.MetadataSchema
Expand Down
Loading