Skip to content

Commit

Permalink
check bigquery permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 22, 2024
1 parent 98581a4 commit b0bf835
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
3 changes: 1 addition & 2 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func (h *FlowRequestHandler) ValidatePeer(
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("peer type is missing or "+
"your requested configuration for %s peer %s was invalidated: %s",
Message: fmt.Sprintf("%s peer %s was invalidated: %s",
req.Peer.Type, req.Peer.Name, err),
}, nil
}
Expand Down
59 changes: 55 additions & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,51 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s
return client, nil
}

// TableCheck:
// 1. Creates a table
// 2. Inserts one row into the table
// 3. Deletes the table
func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) error {
dummyTable := "peerdb_validate_dummy"

newTable := client.Dataset(dataset).Table(dummyTable)

createErr := newTable.Create(ctx, &bigquery.TableMetadata{
Schema: []*bigquery.FieldSchema{
{
Name: "dummy",
Type: bigquery.BooleanFieldType,
Repeated: false,
},
},
})
if createErr != nil {
return fmt.Errorf("unable to validate table creation within dataset: %v."+
"Please check if bigquery.tables.create permission has been granted", createErr)
}

errors := []string{}
insertQuery := client.Query("INSERT INTO %s VALUES(true)")
_, insertErr := insertQuery.Run(ctx)
if insertErr != nil {
errors = append(errors, fmt.Sprintf("unable to validate insertion into table: %v.", insertErr)+
"Please check if bigquery.jobs.create permission has been granted")
}

// Drop the table
deleteErr := newTable.Delete(ctx)
if deleteErr != nil {
errors = append(errors, fmt.Sprintf("unable to delete table :%v.", deleteErr)+
"Please check if bigquery.tables.delete permission has been granted")
}

if len(errors) > 0 {
return fmt.Errorf(strings.Join(errors, ","))
}

return nil
}

// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig.
func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) {
bqsa, err := NewBigQueryServiceAccount(config)
Expand All @@ -154,10 +199,16 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
}

datasetID := config.GetDatasetId()
_, checkErr := client.Dataset(datasetID).Metadata(ctx)
if checkErr != nil {
slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", checkErr))
return nil, fmt.Errorf("failed to get dataset metadata: %v", checkErr)
_, datasetErr := client.Dataset(datasetID).Metadata(ctx)
if datasetErr != nil {
slog.ErrorContext(ctx, "failed to get dataset metadata", slog.Any("error", datasetErr))
return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr)
}

permissionErr := TableCheck(ctx, client, datasetID)
if permissionErr != nil {
slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr))
return nil, permissionErr
}

storageClient, err := bqsa.CreateStorageClient(ctx)
Expand Down

0 comments on commit b0bf835

Please sign in to comment.