diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index 5bbc1cfb2c..6a77fbe5ca 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -55,6 +55,18 @@ func (h *FlowRequestHandler) ValidatePeer( } } + validationConn, ok := conn.(connectors.ValidationConnector) + if ok { + validErr := validationConn.ValidateCheck(ctx) + if validErr != nil { + return &protos.ValidatePeerResponse{ + Status: protos.ValidatePeerStatus_INVALID, + Message: fmt.Sprintf("failed to validate %s peer %s: %v", + req.Peer.Type, req.Peer.Name, validErr), + }, nil + } + } + connErr := conn.ConnectionActive(ctx) if connErr != nil { return &protos.ValidatePeerResponse{ diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5b2b8b01f8..db56cfd9fc 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -131,14 +131,14 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s return client, nil } -// TableCheck: +// ValidateCheck: // 1. Creates a table // 2. Inserts one row into the table // 3. Deletes the table -func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error { +func (c *BigQueryConnector) ValidateCheck(ctx context.Context) error { dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4) - newTable := client.DatasetInProject(project, dataset).Table(dummyTable) + newTable := c.client.DatasetInProject(c.projectID, c.datasetID).Table(dummyTable) createErr := newTable.Create(ctx, &bigquery.TableMetadata{ Schema: []*bigquery.FieldSchema{ @@ -155,9 +155,9 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr } var errs []error - insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) - insertQuery.DefaultDatasetID = dataset - insertQuery.DefaultProjectID = project + insertQuery := c.client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) + insertQuery.DefaultDatasetID = c.datasetID + insertQuery.DefaultProjectID = c.projectID _, insertErr := insertQuery.Run(ctx) if insertErr != nil { errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr)) @@ -207,12 +207,6 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) } - permissionErr := TableCheck(ctx, client, datasetID, projectID) - if permissionErr != nil { - logger.Error("failed to get run mock table check", "error", permissionErr) - return nil, permissionErr - } - storageClient, err := bqsa.CreateStorageClient(ctx) if err != nil { return nil, fmt.Errorf("failed to create Storage client: %v", err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 2390ad1109..a4e959b505 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -56,27 +56,32 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error } // Creates and drops a dummy table to validate the peer -func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { +func (c *ClickhouseConnector) ValidateCheck(ctx context.Context) error { validateDummyTableName := "peerdb_validation_" + shared.RandomString(4) // create a table - _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", + _, err := c.database.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", validateDummyTableName)) if err != nil { return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err) } // insert a row - _, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) + _, err = c.database.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) if err != nil { return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err) } // drop the table - _, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName) + _, err = c.database.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName) if err != nil { return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err) } + validateErr := ValidateS3(ctx, c.creds) + if validateErr != nil { + return fmt.Errorf("failed to validate S3 bucket: %w", validateErr) + } + return nil } @@ -90,11 +95,6 @@ func NewClickhouseConnector( return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } - err = ValidateClickhouse(ctx, database) - if err != nil { - return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err) - } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { logger.Error("failed to create postgres metadata store", "error", err) @@ -122,11 +122,6 @@ func NewClickhouseConnector( clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) } - validateErr := ValidateS3(ctx, clickhouseS3Creds) - if validateErr != nil { - return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr) - } - return &ClickhouseConnector{ database: database, pgMetadata: pgMetadata, diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 988b8e3f28..39e31f8171 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -27,6 +27,14 @@ type Connector interface { ConnectionActive(context.Context) error } +type ValidationConnector interface { + Connector + + // ValidationCheck performs validation for the connectors, + // usually includes permissions to create and use objects (tables, schema etc). + ValidateCheck(context.Context) error +} + type GetTableSchemaConnector interface { Connector @@ -279,4 +287,9 @@ var ( _ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{} _ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{} + + _ ValidationConnector = &connsnowflake.SnowflakeConnector{} + _ ValidationConnector = &connclickhouse.ClickhouseConnector{} + _ ValidationConnector = &connbigquery.BigQueryConnector{} + _ ValidationConnector = &conns3.S3Connector{} ) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index de24f7e090..ddb6061b0d 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -89,10 +89,10 @@ func (c *S3Connector) Close() error { return nil } -func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { +func (c *S3Connector) ValidateCheck(ctx context.Context) error { reader := strings.NewReader(time.Now().Format(time.RFC3339)) - bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketURL) + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(c.url) if parseErr != nil { return fmt.Errorf("failed to parse bucket url: %w", parseErr) } @@ -100,7 +100,7 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta // Write an empty file and then delete it // to check if we have write permissions bucketName := aws.String(bucketPrefix.Bucket) - _, putErr := s3Client.PutObject(ctx, &s3.PutObjectInput{ + _, putErr := c.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: bucketName, Key: aws.String(_peerDBCheck), Body: reader, @@ -109,7 +109,7 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta return fmt.Errorf("failed to write to bucket: %w", putErr) } - _, delErr := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + _, delErr := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: bucketName, Key: aws.String(_peerDBCheck), }) @@ -118,8 +118,8 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta } // check if we can ping external metadata - if metadataDB != nil { - err := metadataDB.Ping(ctx) + if c.pgMetadata != nil { + err := c.pgMetadata.Ping(ctx) if err != nil { return fmt.Errorf("failed to ping external metadata: %w", err) } @@ -129,12 +129,6 @@ func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, meta } func (c *S3Connector) ConnectionActive(ctx context.Context) error { - validErr := ValidCheck(ctx, &c.client, c.url, c.pgMetadata) - if validErr != nil { - c.logger.Error("failed to validate s3 connector:", "error", validErr) - return validErr - } - return nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b1877320e1..f67fe59df0 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -104,10 +104,11 @@ type UnchangedToastColumnResult struct { UnchangedToastColumns ArrayString } -func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) error { +func (c *SnowflakeConnector) ValidateCheck(ctx context.Context) error { + schemaName := c.rawSchema // check if schema exists var schemaExists sql.NullBool - err := database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists) + err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, schemaName).Scan(&schemaExists) if err != nil { return fmt.Errorf("error while checking if schema exists: %w", err) } @@ -116,9 +117,9 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e // In a transaction, create a table, insert a row into the table and then drop the table // If any of these steps fail, the transaction will be rolled back - tx, err := database.BeginTx(ctx, nil) + tx, err := c.database.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to begin transaction for table check: %w", err) } // in case we return after error, ensure transaction is rolled back defer func() { @@ -158,7 +159,7 @@ func ValidationCheck(ctx context.Context, database *sql.DB, schemaName string) e // commit transaction err = tx.Commit() if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + return fmt.Errorf("failed to commit transaction for table check: %w", err) } return nil @@ -212,11 +213,6 @@ func NewSnowflakeConnector( rawSchema = *snowflakeProtoConfig.MetadataSchema } - err = ValidationCheck(ctx, database, rawSchema) - if err != nil { - return nil, fmt.Errorf("could not validate snowflake peer: %w", err) - } - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { return nil, fmt.Errorf("could not connect to metadata store: %w", err)