diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index b3eb16c941..b6dc2cff5d 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -32,8 +32,7 @@ func (h *FlowRequestHandler) ValidatePeer( if err != nil { return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, - Message: fmt.Sprintf("peer type is missing or "+ - "your requested configuration for %s peer %s was invalidated: %s", + Message: fmt.Sprintf("%s peer %s was invalidated: %s", req.Peer.Type, req.Peer.Name, err), }, nil } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index fa3e2d5c73..aea3cb9c96 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -3,6 +3,7 @@ package connbigquery import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "reflect" @@ -142,6 +143,51 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s return client, nil } +// TableCheck: +// 1. Creates a table +// 2. Inserts one row into the table +// 3. Deletes the table +func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error { + dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4) + + newTable := client.DatasetInProject(project, dataset).Table(dummyTable) + + createErr := newTable.Create(ctx, &bigquery.TableMetadata{ + Schema: []*bigquery.FieldSchema{ + { + Name: "dummy", + Type: bigquery.BooleanFieldType, + Repeated: false, + }, + }, + }) + if createErr != nil { + return fmt.Errorf("unable to validate table creation within dataset: %w. "+ + "Please check if bigquery.tables.create permission has been granted", createErr) + } + + var errs []error + insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) + insertQuery.DefaultDatasetID = dataset + insertQuery.DefaultProjectID = project + _, insertErr := insertQuery.Run(ctx) + if insertErr != nil { + errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr)) + } + + // Drop the table + deleteErr := newTable.Delete(ctx) + if deleteErr != nil { + errs = append(errs, fmt.Errorf("unable to delete table :%w. ", deleteErr)) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil +} + // NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig. func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) { bqsa, err := NewBigQueryServiceAccount(config) @@ -166,10 +212,16 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to create BigQuery client: %v", err) } - _, checkErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx) - if checkErr != nil { - slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", checkErr)) - return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr) + _, datasetErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx) + if datasetErr != nil { + slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", datasetErr)) + return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) + } + + permissionErr := TableCheck(ctx, client, datasetID, projectID) + if permissionErr != nil { + slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr)) + return nil, permissionErr } storageClient, err := bqsa.CreateStorageClient(ctx) diff --git a/ui/components/PeerForms/BigqueryConfig.tsx b/ui/components/PeerForms/BigqueryConfig.tsx index db435bb17a..9b423de320 100644 --- a/ui/components/PeerForms/BigqueryConfig.tsx +++ b/ui/components/PeerForms/BigqueryConfig.tsx @@ -55,13 +55,13 @@ export default function BigqueryForm(props: BQProps) { A service account JSON file in BigQuery is a file that contains information which allows PeerDB to securely access BigQuery resources. - +