From b84cc3614ebb938af556f5235edea5a0fc1fd58b Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 23 Aug 2024 13:50:25 +0530 Subject: [PATCH] feat: fix the flakiness wherein one source might not have its values coerced Signed-off-by: Manan Gupta --- go/vt/vtgate/engine/concatenate.go | 42 ++++++++++++++++-------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/go/vt/vtgate/engine/concatenate.go b/go/vt/vtgate/engine/concatenate.go index 1098d07ecc4..8d527d84033 100644 --- a/go/vt/vtgate/engine/concatenate.go +++ b/go/vt/vtgate/engine/concatenate.go @@ -247,13 +247,14 @@ func (c *Concatenate) parallelStreamExec(inCtx context.Context, vcursor VCursor, // Mutexes for dealing with concurrent access to shared state. var ( - muCallback sync.Mutex // Protects callback - muFields sync.Mutex // Protects field state - condFields = sync.NewCond(&muFields) // Condition var for field arrival - wg errgroup.Group // Wait group for all streaming goroutines - rest = make([]*sqltypes.Result, len(c.Sources)) // Collects first result from each source to derive fields - fieldTypes []evalengine.Type // Cached final field types - resultFields []*querypb.Field + muCallback sync.Mutex // Protects callback + muFields sync.Mutex // Protects field state + condFields = sync.NewCond(&muFields) // Condition var for field arrival + wg errgroup.Group // Wait group for all streaming goroutines + rest = make([]*sqltypes.Result, len(c.Sources)) // Collects first result from each source to derive fields + fieldTypes []evalengine.Type // Cached final field types + resultFields []*querypb.Field // Final fields that need to be set for any result having fields. + needsCoercion = make([]bool, len(c.Sources)) // Tracks if coercion is needed for each individual source ) // Process each result chunk, considering type coercion. @@ -261,19 +262,8 @@ func (c *Concatenate) parallelStreamExec(inCtx context.Context, vcursor VCursor, muCallback.Lock() defer muCallback.Unlock() - // Check if type coercion needed for this source. - // We only need to check if fields are not in NoNeedToTypeCheck set. - needsCoercion := false - for idx, field := range rest[srcIdx].Fields { - _, skip := c.NoNeedToTypeCheck[idx] - if !skip && fieldTypes[idx].Type() != field.Type { - needsCoercion = true - break - } - } - // Apply type coercion if needed. - if needsCoercion { + if needsCoercion[srcIdx] { for _, row := range res.Rows { if err := c.coerceValuesTo(row, fieldTypes, sqlmode); err != nil { return err @@ -308,6 +298,20 @@ func (c *Concatenate) parallelStreamExec(inCtx context.Context, vcursor VCursor, return err } + // Check if we need coercion for each source. + for srcIdx, result := range rest { + srcNeedsCoercion := false + for idx, field := range result.Fields { + _, skip := c.NoNeedToTypeCheck[idx] + // We only need to check if fields are not in NoNeedToTypeCheck set. + if !skip && fieldTypes[idx].Type() != field.Type { + srcNeedsCoercion = true + break + } + } + needsCoercion[srcIdx] = srcNeedsCoercion + } + muFields.Unlock() defer condFields.Broadcast() return callback(resultChunk, currIndex)