Skip to content

Commit

Permalink
chore: small fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Mar 21, 2024
1 parent 4b34a80 commit 3895f64
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
15 changes: 10 additions & 5 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,16 @@ func (c *Concatenate) parallelStreamExec(
// Check if type coercion needed for this source.
// We only need to check if fields are not in NoNeedToTypeCheck set.
needsCoercion := false
for idx, field := range c.fieldTypes {
_, skip := c.NoNeedToTypeCheck[idx]
if !skip && field.Type() != res.Fields[idx].Type {
needsCoercion = true
break
if len(res.Fields) < len(c.fieldTypes) {
// if we didn't get enough fields, we'll always coerce
needsCoercion = true
} else {
for idx, field := range c.fieldTypes {
_, skip := c.NoNeedToTypeCheck[idx]
if !skip && field.Type() != res.Fields[idx].Type {
needsCoercion = true
break
}
}
}

Expand Down
35 changes: 19 additions & 16 deletions go/vt/vtgate/engine/simple_concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"context"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -94,31 +95,28 @@ func (c *SimpleConcatenate) parallelExec(
defer cancel()

var (
wg errgroup.Group
rows sync.Mutex
fieldsWg sync.WaitGroup
fields []*querypb.Field
result *sqltypes.Result
wg errgroup.Group
rows sync.Mutex
)

fieldsWg.Add(1)

result := &sqltypes.Result{}
for i, source := range c.Sources {
vars := copyBindVars(bindVars)
// the first source will be used to get the fields
// the other sources will run in parallel
// all sources will run in parallel
wg.Go(func() error {
vars := copyBindVars(bindVars)
chunk, err := vcursor.ExecutePrimitive(ctx, source, vars, true)
if err != nil {
cancel()
return err
}

if i == 0 {
fields = chunk.Fields
fieldsWg.Done()
} else {
fieldsWg.Wait()
chunk.Fields = fields
result.Fields = chunk.Fields
}

if len(chunk.Rows) == 0 {
return nil
}

rows.Lock()
Expand Down Expand Up @@ -175,6 +173,7 @@ func (c *SimpleConcatenate) parallelStreamExec(
wg errgroup.Group
fieldsWg sync.WaitGroup
fields []*querypb.Field
fieldsDone atomic.Bool
)

fieldsWg.Add(1)
Expand All @@ -187,8 +186,12 @@ func (c *SimpleConcatenate) parallelStreamExec(
return nil
}
if i == 0 {
fields = chunk.Fields
fieldsWg.Done()
// for results coming from the first source, we don't need to block
fieldsAlreadyLoaded := fieldsDone.Swap(true)
if !fieldsAlreadyLoaded {
fields = chunk.Fields
fieldsWg.Done()
}
} else {
fieldsWg.Wait()
chunk.Fields = fields
Expand Down

0 comments on commit 3895f64

Please sign in to comment.