Skip to content

Commit

Permalink
Add option to set max concurrency for table scan operations (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
glkz authored Nov 13, 2024
1 parent 23e4e3a commit 57f2e77
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
12 changes: 6 additions & 6 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"io"
"iter"
"runtime"
"strconv"
"sync"

Expand All @@ -45,7 +44,7 @@ const (
type positionDeletes = []*arrow.Chunked
type perFilePosDeletes = map[string]positionDeletes

func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask) (perFilePosDeletes, error) {
func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask, concurrency int) (perFilePosDeletes, error) {
var (
deletesPerFile = make(perFilePosDeletes)
uniqueDeletes = make(map[string]iceberg.DataFile)
Expand All @@ -69,9 +68,9 @@ func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks []FileScanTask)
}

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.NumCPU())
g.SetLimit(concurrency)

perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU())
perFileChan := make(chan map[string]*arrow.Chunked, concurrency)
go func() {
defer close(perFileChan)
for _, v := range uniqueDeletes {
Expand Down Expand Up @@ -213,6 +212,7 @@ type arrowScan struct {
options iceberg.Properties

useLargeTypes bool
concurrency int
}

func (as *arrowScan) projectedFieldIDs() (set[int], error) {
Expand Down Expand Up @@ -534,7 +534,7 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks
taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))

// numWorkers := 1
numWorkers := min(runtime.NumCPU(), len(tasks))
numWorkers := min(as.concurrency, len(tasks))
records := make(chan enumeratedRecord, numWorkers)

var wg sync.WaitGroup
Expand Down Expand Up @@ -592,7 +592,7 @@ func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) (*arr
return resultSchema, func(yield func(arrow.Record, error) bool) {}, nil
}

deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks)
deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks, as.concurrency)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions table/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"fmt"
"iter"
"runtime"
"slices"
"sync"

Expand Down Expand Up @@ -132,6 +131,7 @@ type Scan struct {
limit int64

partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
concurrency int
}

func (scan *Scan) UseRowLimit(n int64) *Scan {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
dataEntries := make([]iceberg.ManifestEntry, 0)
positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)

nworkers := min(runtime.NumCPU(), len(manifestList))
nworkers := min(scan.concurrency, len(manifestList))
var wg sync.WaitGroup

manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
Expand Down Expand Up @@ -434,6 +434,7 @@ func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, iter.Seq2[
caseSensitive: scan.caseSensitive,
rowLimit: scan.limit,
options: scan.options,
concurrency: scan.concurrency,
}).GetRecords(ctx, tasks)
}

Expand Down
14 changes: 14 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package table

import (
"runtime"
"slices"

"github.com/apache/iceberg-go"
Expand Down Expand Up @@ -110,6 +111,18 @@ func WithLimit(n int64) ScanOption {
}
}

// WitMaxConcurrency sets the maximum concurrency for table scan and plan
// operations. When unset it defaults to runtime.GOMAXPROCS.
func WitMaxConcurrency(n int) ScanOption {
if n <= 0 {
return noopOption
}

return func(scan *Scan) {
scan.concurrency = n
}
}

func WithOptions(opts iceberg.Properties) ScanOption {
if opts == nil {
return noopOption
Expand All @@ -128,6 +141,7 @@ func (t Table) Scan(opts ...ScanOption) *Scan {
selectedFields: []string{"*"},
caseSensitive: true,
limit: ScanNoLimit,
concurrency: runtime.GOMAXPROCS(0),
}

for _, opt := range opts {
Expand Down

0 comments on commit 57f2e77

Please sign in to comment.