Skip to content

Commit

Permalink
feat: fix the flakiness wherein one source might not have its values …
Browse files Browse the repository at this point in the history
…coerced

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 23, 2024
1 parent 84461a7 commit b84cc36
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,33 +247,23 @@ func (c *Concatenate) parallelStreamExec(inCtx context.Context, vcursor VCursor,

// Mutexes for dealing with concurrent access to shared state.
var (
muCallback sync.Mutex // Protects callback
muFields sync.Mutex // Protects field state
condFields = sync.NewCond(&muFields) // Condition var for field arrival
wg errgroup.Group // Wait group for all streaming goroutines
rest = make([]*sqltypes.Result, len(c.Sources)) // Collects first result from each source to derive fields
fieldTypes []evalengine.Type // Cached final field types
resultFields []*querypb.Field
muCallback sync.Mutex // Protects callback
muFields sync.Mutex // Protects field state
condFields = sync.NewCond(&muFields) // Condition var for field arrival
wg errgroup.Group // Wait group for all streaming goroutines
rest = make([]*sqltypes.Result, len(c.Sources)) // Collects first result from each source to derive fields
fieldTypes []evalengine.Type // Cached final field types
resultFields []*querypb.Field // Final fields that need to be set for any result having fields.
needsCoercion = make([]bool, len(c.Sources)) // Tracks if coercion is needed for each individual source
)

// Process each result chunk, considering type coercion.
callback := func(res *sqltypes.Result, srcIdx int) error {
muCallback.Lock()
defer muCallback.Unlock()

// Check if type coercion needed for this source.
// We only need to check if fields are not in NoNeedToTypeCheck set.
needsCoercion := false
for idx, field := range rest[srcIdx].Fields {
_, skip := c.NoNeedToTypeCheck[idx]
if !skip && fieldTypes[idx].Type() != field.Type {
needsCoercion = true
break
}
}

// Apply type coercion if needed.
if needsCoercion {
if needsCoercion[srcIdx] {
for _, row := range res.Rows {
if err := c.coerceValuesTo(row, fieldTypes, sqlmode); err != nil {
return err
Expand Down Expand Up @@ -308,6 +298,20 @@ func (c *Concatenate) parallelStreamExec(inCtx context.Context, vcursor VCursor,
return err
}

// Check if we need coercion for each source.
for srcIdx, result := range rest {
srcNeedsCoercion := false
for idx, field := range result.Fields {
_, skip := c.NoNeedToTypeCheck[idx]
// We only need to check if fields are not in NoNeedToTypeCheck set.
if !skip && fieldTypes[idx].Type() != field.Type {
srcNeedsCoercion = true
break
}
}
needsCoercion[srcIdx] = srcNeedsCoercion
}

muFields.Unlock()
defer condFields.Broadcast()
return callback(resultChunk, currIndex)
Expand Down

0 comments on commit b84cc36

Please sign in to comment.