From 77a3a080a3ae7c3f12723c9e50c94de53da33ed6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 25 Jan 2024 20:00:13 +0530 Subject: [PATCH] support cross project --- flow/connectors/bigquery/bigquery.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5e166124f3..aea3cb9c96 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -147,10 +147,10 @@ func (bqsa *BigQueryServiceAccount) CreateStorageClient(ctx context.Context) (*s // 1. Creates a table // 2. Inserts one row into the table // 3. Deletes the table -func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) error { +func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, project string) error { dummyTable := "peerdb_validate_dummy_" + shared.RandomString(4) - newTable := client.Dataset(dataset).Table(dummyTable) + newTable := client.DatasetInProject(project, dataset).Table(dummyTable) createErr := newTable.Create(ctx, &bigquery.TableMetadata{ Schema: []*bigquery.FieldSchema{ @@ -167,7 +167,9 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string) er } var errs []error - insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s.%s VALUES(true)", dataset, dummyTable)) + insertQuery := client.Query(fmt.Sprintf("INSERT INTO %s VALUES(true)", dummyTable)) + insertQuery.DefaultDatasetID = dataset + insertQuery.DefaultProjectID = project _, insertErr := insertQuery.Run(ctx) if insertErr != nil { errs = append(errs, fmt.Errorf("unable to validate insertion into table: %w. ", insertErr)) @@ -216,7 +218,7 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (* return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr) } - permissionErr := TableCheck(ctx, client, datasetID) + permissionErr := TableCheck(ctx, client, datasetID, projectID) if permissionErr != nil { slog.ErrorContext(ctx, "failed to get run mock table check", slog.Any("error", permissionErr)) return nil, permissionErr