From fda7e0b9bd6de08067bc9d847842d398383c3ae3 Mon Sep 17 00:00:00 2001 From: Mayurjag <63900197+Mayurjag@users.noreply.github.com> Date: Fri, 19 Jan 2024 17:17:28 +0530 Subject: [PATCH] feat: Use of configurable concurrency for extracting assets from BigQuery (#51) * feat: Use of errgroup for configurable concurrency for extracting assets from BigQuery --------- Co-authored-by: Mayur Jagtap Co-authored-by: Haveiss --- plugins/extractors/bigquery/bigquery.go | 55 +++++++++++++++++-------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index fd3e269c4..b04adb381 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/anypb" @@ -56,6 +57,7 @@ type Config struct { UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` UsageProjectIDs []string `mapstructure:"usage_project_ids"` BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"` + Concurrency int `mapstructure:"concurrency" default:"10"` } type Exclude struct { @@ -122,6 +124,7 @@ type Extractor struct { policyTagClient *datacatalog.PolicyTagManagerClient newClient NewClientFunc randFn randFn + eg *errgroup.Group datasetsDurn metric.Int64Histogram tablesDurn metric.Int64Histogram @@ -204,6 +207,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { e.logger.Error("failed to create policy tag manager client", "err", err) } + e.eg = &errgroup.Group{} + e.eg.SetLimit(e.config.Concurrency) + return nil } @@ -211,6 +217,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { pageSize := pickFirstNonZero(e.config.DatasetPageSize, e.config.MaxPageSize, 10) + wg := sync.WaitGroup{} // Fetch and iterate over datasets pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "") for { @@ -227,7 +234,11 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID) continue } - e.extractTable(ctx, ds, emit) + wg.Add(1) + go func(ds *bigquery.Dataset) { + defer wg.Done() + e.extractTable(ctx, ds, emit) + }(ds) } if !hasNext { @@ -235,6 +246,12 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { } } + wg.Wait() + if err := e.eg.Wait(); err != nil { + e.logger.Error("error extracting bigquery tables", "err", err) + return err + } + return nil } @@ -311,22 +328,26 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit continue } - tableFQN := table.FullyQualifiedName() - - e.logger.Debug("extracting table", "table", tableFQN) - tmd, err := e.fetchTableMetadata(ctx, table) - if err != nil { - e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) - continue - } - - asset, err := e.buildAsset(ctx, table, tmd) - if err != nil { - e.logger.Error("failed to build asset", "err", err, "table", tableFQN) - continue - } - - emit(models.NewRecord(asset)) + table := table + e.eg.Go(func() error { + tableFQN := table.FullyQualifiedName() + + e.logger.Debug("extracting table", "table", tableFQN) + tmd, err := e.fetchTableMetadata(ctx, table) + if err != nil { + e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN) + return nil + } + + asset, err := e.buildAsset(ctx, table, tmd) + if err != nil { + e.logger.Error("failed to build asset", "err", err, "table", tableFQN) + return nil + } + + emit(models.NewRecord(asset)) + return nil + }) } if !hasNext {