Skip to content

Commit

Permalink
Use Selector in BoundSelector to wait in parallel
Browse files Browse the repository at this point in the history
BoundSelector is only used by table cloning

Selector does some juggling so that Select waits on all futures,
changing BoundSelector to use Selector avoids pathological cases

Imagine parallelism is 4. The time to clone tables is as follows:
15m 1m 2m 3m 4m 5m

Currently, cloning will take 20m (waits for 15m at 4m, then immediately adds 4m & 5m)
Optimally, cloning would take 15m. 4m would start after 1m & another minute later 5m would start
  • Loading branch information
serprex committed Nov 17, 2023
1 parent 085425a commit 38c821a
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions flow/concurrency/bound_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,45 @@ import (
)

type BoundSelector struct {
ctx workflow.Context
limit int
futures []workflow.Future
ferrors []error
ctx workflow.Context
limit int
count int
selector workflow.Selector
ferrors []error
}

func NewBoundSelector(limit int, total int, ctx workflow.Context) *BoundSelector {
return &BoundSelector{
ctx: ctx,
limit: limit,
ctx: ctx,
limit: limit,
selector: workflow.NewSelector(ctx),
}
}

func (s *BoundSelector) SpawnChild(chCtx workflow.Context, w interface{}, args ...interface{}) {
if len(s.futures) >= s.limit {
if s.count >= s.limit {
s.waitOne()
}

future := workflow.ExecuteChildWorkflow(chCtx, w, args...)
s.futures = append(s.futures, future)
s.selector.AddFuture(future, func(f workflow.Future) {
err := f.Get(s.ctx, nil)
if err != nil {
s.ferrors = append(s.ferrors, err)
}
})
s.count += 1
}

func (s *BoundSelector) waitOne() {
if len(s.futures) == 0 {
return
}

f := s.futures[0]
s.futures = s.futures[1:]

err := f.Get(s.ctx, nil)
if err != nil {
s.ferrors = append(s.ferrors, err)
if s.count > 0 {
s.selector.Select(s.ctx)
s.count -= 1
}
}

func (s *BoundSelector) Wait() error {
for len(s.futures) > 0 {
for s.count > 0 {
s.waitOne()
}

Expand Down

0 comments on commit 38c821a

Please sign in to comment.