diff --git a/flow/cmd/validate_peer.go b/flow/cmd/validate_peer.go index e3b04e88ba..deb15f7de7 100644 --- a/flow/cmd/validate_peer.go +++ b/flow/cmd/validate_peer.go @@ -32,8 +32,7 @@ func (h *FlowRequestHandler) ValidatePeer( if err != nil { return &protos.ValidatePeerResponse{ Status: protos.ValidatePeerStatus_INVALID, - Message: fmt.Sprintf("peer type is missing or "+ - "your requested configuration for %s peer %s was invalidated: %s", + Message: fmt.Sprintf("%s peer %s was invalidated: %s", req.Peer.Type, req.Peer.Name, err), }, nil } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7177e4bb42..f9cb284696 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -141,6 +141,51 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s return client, nil } +// TableCheck: +// 1. Creates a table +// 2. Inserts one row into the table +// 3. Deletes the table +func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) error { + dummyTable := "peerdb_validate_dummy" + + newTable := client.Dataset(dataset).Table(dummyTable) + + createErr := newTable.Create(ctx, &bigquery.TableMetadata{ + Schema: []*bigquery.FieldSchema{ + { + Name: "dummy", + Type: bigquery.BooleanFieldType, + Repeated: false, + }, + }, + }) + if createErr != nil { + return fmt.Errorf("unable to validate table creation within dataset: %v."+ + "Please check if bigquery.tables.create permission has been granted", createErr) + } + + errors := []string{} + insertQuery := client.Query("INSERT INTO %s VALUES(true)") + _, insertErr := insertQuery.Run(ctx) + if insertErr != nil { + errors = append(errors, fmt.Sprintf("unable to validate insertion into table: %v.", insertErr)+ + "Please check if bigquery.jobs.create permission has been granted") + } + + // Drop the table + deleteErr := newTable.Delete(ctx) + if deleteErr != nil { + errors = append(errors, fmt.Sprintf("unable to delete table :%v.", deleteErr)+ + "Please check if bigquery.tables.delete permission has been granted") + } + + if len(errors) > 0 { + return fmt.Errorf(strings.Join(errors, ",")) + } + + return nil +} + // NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig. func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) { bqsa, err := NewBigQueryServiceAccount(config) @@ -154,10 +199,16 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* } datasetID := config.GetDatasetId() - _, checkErr := client.Dataset(datasetID).Metadata(ctx) - if checkErr != nil { - slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", checkErr)) - return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr) + _, datasetErr := client.Dataset(datasetID).Metadata(ctx) + if datasetErr != nil { + slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", datasetErr)) + return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) + } + + permissionErr := TableCheck(ctx, client, datasetID) + if permissionErr != nil { + slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr)) + return nil, permissionErr } storageClient, err := bqsa.CreateStorageClient(ctx)