From 3895f64df5d1f7d6e27b9afc9ac67c459463ff73 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 21 Mar 2024 10:58:59 +0100 Subject: [PATCH] chore: small fixes Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/concatenate.go | 15 ++++++---- go/vt/vtgate/engine/simple_concatenate.go | 35 ++++++++++++----------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/go/vt/vtgate/engine/concatenate.go b/go/vt/vtgate/engine/concatenate.go index 465c5014e42..ec353013106 100644 --- a/go/vt/vtgate/engine/concatenate.go +++ b/go/vt/vtgate/engine/concatenate.go @@ -277,11 +277,16 @@ func (c *Concatenate) parallelStreamExec( // Check if type coercion needed for this source. // We only need to check if fields are not in NoNeedToTypeCheck set. needsCoercion := false - for idx, field := range c.fieldTypes { - _, skip := c.NoNeedToTypeCheck[idx] - if !skip && field.Type() != res.Fields[idx].Type { - needsCoercion = true - break + if len(res.Fields) < len(c.fieldTypes) { + // if we didn't get enough fields, we'll always coerce + needsCoercion = true + } else { + for idx, field := range c.fieldTypes { + _, skip := c.NoNeedToTypeCheck[idx] + if !skip && field.Type() != res.Fields[idx].Type { + needsCoercion = true + break + } } } diff --git a/go/vt/vtgate/engine/simple_concatenate.go b/go/vt/vtgate/engine/simple_concatenate.go index 518807bb757..4dd6db4367a 100644 --- a/go/vt/vtgate/engine/simple_concatenate.go +++ b/go/vt/vtgate/engine/simple_concatenate.go @@ -19,6 +19,7 @@ package engine import ( "context" "sync" + "sync/atomic" "golang.org/x/sync/errgroup" @@ -94,31 +95,28 @@ func (c *SimpleConcatenate) parallelExec( defer cancel() var ( - wg errgroup.Group - rows sync.Mutex - fieldsWg sync.WaitGroup - fields []*querypb.Field - result *sqltypes.Result + wg errgroup.Group + rows sync.Mutex ) - fieldsWg.Add(1) - + result := &sqltypes.Result{} for i, source := range c.Sources { - vars := copyBindVars(bindVars) // the first source will be used to get the fields - // the other sources will run in parallel + // all sources will run in parallel wg.Go(func() error { + vars := copyBindVars(bindVars) chunk, err := vcursor.ExecutePrimitive(ctx, source, vars, true) if err != nil { cancel() return err } + if i == 0 { - fields = chunk.Fields - fieldsWg.Done() - } else { - fieldsWg.Wait() - chunk.Fields = fields + result.Fields = chunk.Fields + } + + if len(chunk.Rows) == 0 { + return nil } rows.Lock() @@ -175,6 +173,7 @@ func (c *SimpleConcatenate) parallelStreamExec( wg errgroup.Group fieldsWg sync.WaitGroup fields []*querypb.Field + fieldsDone atomic.Bool ) fieldsWg.Add(1) @@ -187,8 +186,12 @@ func (c *SimpleConcatenate) parallelStreamExec( return nil } if i == 0 { - fields = chunk.Fields - fieldsWg.Done() + // for results coming from the first source, we don't need to block + fieldsAlreadyLoaded := fieldsDone.Swap(true) + if !fieldsAlreadyLoaded { + fields = chunk.Fields + fieldsWg.Done() + } } else { fieldsWg.Wait() chunk.Fields = fields