diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go
index b3eb16c941..b6dc2cff5d 100644
--- a/flow/cmd/validate_peer.go
+++ b/flow/cmd/validate_peer.go
@@ -32,8 +32,7 @@ func (h *FlowRequestHandler) ValidatePeer(
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
- Message: fmt.Sprintf("peer type is missing or "+
- "your requested configuration for %s peer %s was invalidated: %s",
+ Message: fmt.Sprintf("%s peer %s was invalidated: %s",
req.Peer.Type, req.Peer.Name, err),
}, nil
}
diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go
index fa3e2d5c73..aea3cb9c96 100644
--- a/flow/connectors/bigquery/bigquery.go
+++ b/flow/connectors/bigquery/bigquery.go
@@ -3,6 +3,7 @@ package connbigquery
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"log/slog"
"reflect"
@@ -142,6 +143,51 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s
return client, nil
}
+// TableCheck:
+// 1. Creates a table
+// 2. Inserts one row into the table
+// 3. Deletes the table
+func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error {
+ dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4)
+
+ newTable := client.DatasetInProject(project, dataset).Table(dummyTable)
+
+ createErr := newTable.Create(ctx, &bigquery.TableMetadata{
+ Schema: []*bigquery.FieldSchema{
+ {
+ Name: "dummy",
+ Type: bigquery.BooleanFieldType,
+ Repeated: false,
+ },
+ },
+ })
+ if createErr != nil {
+ return fmt.Errorf("unable to validate table creation within dataset: %w. "+
+ "Please check if bigquery.tables.create permission has been granted", createErr)
+ }
+
+ var errs []error
+ insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable))
+ insertQuery.DefaultDatasetID = dataset
+ insertQuery.DefaultProjectID = project
+ _, insertErr := insertQuery.Run(ctx)
+ if insertErr != nil {
+ errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr))
+ }
+
+ // Drop the table
+ deleteErr := newTable.Delete(ctx)
+ if deleteErr != nil {
+ errs = append(errs, fmt.Errorf("unable to delete table :%w. ", deleteErr))
+ }
+
+ if len(errs) > 0 {
+ return errors.Join(errs...)
+ }
+
+ return nil
+}
+
// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig.
func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) {
bqsa, err := NewBigQueryServiceAccount(config)
@@ -166,10 +212,16 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
return nil, fmt.Errorf("failed to create BigQuery client: %v", err)
}
- _, checkErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx)
- if checkErr != nil {
- slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", checkErr))
- return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr)
+ _, datasetErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx)
+ if datasetErr != nil {
+ slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", datasetErr))
+ return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr)
+ }
+
+ permissionErr := TableCheck(ctx, client, datasetID, projectID)
+ if permissionErr != nil {
+ slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr))
+ return nil, permissionErr
}
storageClient, err := bqsa.CreateStorageClient(ctx)
diff --git a/ui/components/PeerForms/BigqueryConfig.tsx b/ui/components/PeerForms/BigqueryConfig.tsx
index db435bb17a..9b423de320 100644
--- a/ui/components/PeerForms/BigqueryConfig.tsx
+++ b/ui/components/PeerForms/BigqueryConfig.tsx
@@ -55,13 +55,13 @@ export default function BigqueryForm(props: BQProps) {
A service account JSON file in BigQuery is a file that contains
information which allows PeerDB to securely access BigQuery resources.
-
+