diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5e166124f3..aea3cb9c96 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -147,10 +147,10 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s // 1. Creates a table // 2. Inserts one row into the table // 3. Deletes the table -func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) error { +func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error { dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4) - newTable := client.Dataset(dataset).Table(dummyTable) + newTable := client.DatasetInProject(project, dataset).Table(dummyTable) createErr := newTable.Create(ctx, &bigquery.TableMetadata{ Schema: []*bigquery.FieldSchema{ @@ -167,7 +167,9 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) er } var errs []error - insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s.%s VALUES(true)", dataset, dummyTable)) + insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) + insertQuery.DefaultDatasetID = dataset + insertQuery.DefaultProjectID = project _, insertErr := insertQuery.Run(ctx) if insertErr != nil { errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr)) @@ -216,7 +218,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) } - permissionErr := TableCheck(ctx, client, datasetID) + permissionErr := TableCheck(ctx, client, datasetID, projectID) if permissionErr != nil { slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr)) return nil, permissionErr