Skip to content

Commit

Permalink
fix data race in join engine primitive olap streaming mode execution (#…
Browse files Browse the repository at this point in the history
…14012)

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
vitess-bot[bot] committed Sep 19, 2023
1 parent 69d003a commit 9a2dee5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
23 changes: 13 additions & 10 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -95,34 +96,36 @@ func (jn *Join) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[st

// TryStreamExecute performs a streaming exec.
func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
joinVars := make(map[string]*querypb.BindVariable)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error {
var fieldNeeded atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_tablet_healthcheck_cache)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revertible)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revert)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Region Sharding example using etcd on ubuntu-latest

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_across_db_versions)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_concurrentdml)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_suite)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_failover)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (topo_connection_cache)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (12)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_throttler)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_unsharded)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_general_heavy)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_scheduler)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_vindex_heavy)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_tablegc)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_consul)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_gen4)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (mysql80)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_declarative) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / End-to-End Test

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_schema)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_recovery)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtctlbackup_sharded_clustertest_heavy)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / End-to-End Test (Race)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revert) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using consul on ubuntu-latest

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_transaction)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_schema_tracker)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (ers_prs_newfeatures_heavy)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_with_keyspaces_to_watch)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtorc) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_partial_keyspace)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (22)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtorc)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_reservedconn)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtbackup_transform)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using etcd on ubuntu-latest

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_stoponreshard_false)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using k8s on ubuntu-latest

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_v2)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_queries)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (15)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress_suite)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo_etcd)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_vschema)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (21)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_basic)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_declarative)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_singleton) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_stoponreshard_true)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_scheduler) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revertible) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_singleton)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_ghost) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (18)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress_suite) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_multicell)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_godriver)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_throttler_custom_config)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo_consul)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_ghost)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_tablegc) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_readafterwrite)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_backup) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (13)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vttablet_prscomplex)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_suite) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (mysql_server_vault)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_backup)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_recovery) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress) mysql57

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Backups - E2E

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

Bool not declared by package atomic (typecheck)

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Backups - Manual

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Query Serving (Schema)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Query Serving (Queries)

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Reparent Old Vtctl

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Reparent Old VTTablet

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool

Check failure on line 99 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool
fieldNeeded.Store(wantfields)
err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Load(), func(lresult *sqltypes.Result) error {
joinVars := make(map[string]*querypb.BindVariable)
for _, lrow := range lresult.Rows {
for k, col := range jn.Vars {
joinVars[k] = sqltypes.ValueBindVariable(lrow[col])
}
rowSent := false
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), wantfields, func(rresult *sqltypes.Result) error {
var rowSent atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_tablet_healthcheck_cache)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revertible)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revert)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Region Sharding example using etcd on ubuntu-latest

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_across_db_versions)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_concurrentdml)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_suite)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_failover)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (topo_connection_cache)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (12)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_throttler)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_unsharded)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_general_heavy)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_scheduler)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_vindex_heavy)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_tablegc)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_consul)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_gen4)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (mysql80)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_declarative) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / End-to-End Test

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_cellalias)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_schema)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_recovery)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtctlbackup_sharded_clustertest_heavy)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / End-to-End Test (Race)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revert) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using consul on ubuntu-latest

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_transaction)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_schema_tracker)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (ers_prs_newfeatures_heavy)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_with_keyspaces_to_watch)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtorc) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_partial_keyspace)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (22)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_migrate_vdiff2_convert_tz)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtorc)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_reservedconn)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtbackup_transform)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using etcd on ubuntu-latest

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_stoponreshard_false)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Local example using k8s on ubuntu-latest

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_v2)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_queries)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (15)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress_suite)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo_etcd)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_vschema)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (21)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_basic)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_declarative)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_singleton) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vstream_stoponreshard_true)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_scheduler) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_revertible) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_singleton)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_ghost) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (schemadiff_vrepl) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (18)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress_suite) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vreplication_multicell)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_godriver)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_throttler_custom_config)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_topo_consul)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_ghost)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (tabletmanager_tablegc) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vtgate_readafterwrite)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_backup) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (13)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (vttablet_prscomplex)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_suite) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (mysql_server_vault)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_backup)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (xb_recovery) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run endtoend tests on Cluster (onlineddl_vrepl_stress) mysql57

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Unit Test (Race)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Backups - E2E

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / test

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

Bool not declared by package atomic (typecheck)

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Backups - Manual

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Query Serving (Schema)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Query Serving (Queries)

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Reparent Old Vtctl

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Run Upgrade Downgrade Test - Reparent Old VTTablet

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 25

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool

Check failure on line 107 in go/vt/vtgate/engine/join.go

View workflow job for this annotation

GitHub Actions / Docker Test Cluster 10

undefined: atomic.Bool
err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Load(), func(rresult *sqltypes.Result) error {
result := &sqltypes.Result{}
if wantfields {
if fieldNeeded.Load() {
// This code is currently unreachable because the first result
// will always be just the field info, which will cause the outer
// wantfields code path to be executed. But this may change in the future.
wantfields = false
fieldNeeded.Store(false)
result.Fields = joinFields(lresult.Fields, rresult.Fields, jn.Cols)
}
for _, rrow := range rresult.Rows {
result.Rows = append(result.Rows, joinRows(lrow, rrow, jn.Cols))
}
if len(rresult.Rows) != 0 {
rowSent = true
rowSent.Store(true)
}
return callback(result)
})
if err != nil {
return err
}
if jn.Opcode == LeftJoin && !rowSent {
if jn.Opcode == LeftJoin && !rowSent.Load() {
result := &sqltypes.Result{}
result.Rows = [][]sqltypes.Value{joinRows(
lrow,
Expand All @@ -132,8 +135,8 @@ func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars
return callback(result)
}
}
if wantfields {
wantfields = false
if fieldNeeded.Load() {
fieldNeeded.Store(false)
for k := range jn.Vars {
joinVars[k] = sqltypes.NullBindVariable
}
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3782,3 +3782,38 @@ func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

func TestStreamJoinQuery(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Check failure on line 3787 in go/vt/vtgate/executor_select_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

LeakCheckContext not declared by package utils (typecheck)

// Special setup: Don't use createExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
u := createSandbox(KsTestUnsharded)
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
u.VSchema = unshardedVSchema
serv := newSandboxForCells(ctx, []string{cell})
resolver := newTestResolver(ctx, hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := createExecutor(ctx, serv, cell, resolver)
defer executor.Close()

Check failure on line 3803 in go/vt/vtgate/executor_select_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

executor.Close undefined (type *Executor has no field or method Close) (typecheck)

sql := "select u.foo, u.apa, ue.bar, ue.apa from user u join user_extra ue on u.foo = ue.bar"
result, err := executorStream(ctx, executor, sql)
require.NoError(t, err)
wantResult := &sqltypes.Result{
Fields: append(sandboxconn.SingleRowResult.Fields, sandboxconn.SingleRowResult.Fields...),
}
wantRow := append(sandboxconn.StreamRowResult.Rows[0], sandboxconn.StreamRowResult.Rows[0]...)
for i := 0; i < 64; i++ {
wantResult.Rows = append(wantResult.Rows, wantRow)
}
require.Equal(t, len(wantResult.Rows), len(result.Rows))
for idx := 0; idx < 64; idx++ {
utils.MustMatch(t, wantResult.Rows[idx], result.Rows[idx], "mismatched on: ", strconv.Itoa(idx))
}
}

0 comments on commit 9a2dee5

Please sign in to comment.