diff --git a/pkg/executor/join/BUILD.bazel b/pkg/executor/join/BUILD.bazel index d93d83563288e..49d9f1b3a85ea 100644 --- a/pkg/executor/join/BUILD.bazel +++ b/pkg/executor/join/BUILD.bazel @@ -98,6 +98,7 @@ go_test( deps = [ "//pkg/config", "//pkg/domain", + "//pkg/executor/internal/exec", "//pkg/executor/internal/testutil", "//pkg/expression", "//pkg/parser/ast", diff --git a/pkg/executor/join/hash_join_base.go b/pkg/executor/join/hash_join_base.go index 24718af2da760..6d1c9bda8354f 100644 --- a/pkg/executor/join/hash_join_base.go +++ b/pkg/executor/join/hash_join_base.go @@ -312,14 +312,17 @@ func (w *buildWorkerBase) fetchBuildSideRows(ctx context.Context, hashJoinCtx *h for { err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, spillHelper) + issue59377Intest(&err) if err != nil { hasError = true + errCh <- errors.Trace(err) return } err = triggerIntest(2) if err != nil { hasError = true + errCh <- errors.Trace(err) return } diff --git a/pkg/executor/join/hash_join_v2.go b/pkg/executor/join/hash_join_v2.go index ca18d0d58eb63..35756fa872ed8 100644 --- a/pkg/executor/join/hash_join_v2.go +++ b/pkg/executor/join/hash_join_v2.go @@ -1576,6 +1576,12 @@ func rehash(oldHashValue uint64, rehashBuf []byte, hash hash.Hash64) uint64 { return hash.Sum64() } +func issue59377Intest(err *error) { + failpoint.Inject("Issue59377", func() { + *err = errors.New("Random failpoint error is triggered") + }) +} + func triggerIntest(errProbability int) error { failpoint.Inject("slowWorkers", func(val failpoint.Value) { if val.(bool) { diff --git a/pkg/executor/join/outer_join_spill_test.go b/pkg/executor/join/outer_join_spill_test.go index 63c08cb8533b5..452b0031cdad3 100644 --- a/pkg/executor/join/outer_join_spill_test.go +++ b/pkg/executor/join/outer_join_spill_test.go @@ -15,9 +15,11 @@ package join import ( + "context" "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/testutil" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/parser/ast" @@ -29,6 +31,76 @@ import ( "github.com/stretchr/testify/require" ) +func prepareSimpleHashJoinEnv() (*testutil.MockDataSource, *testutil.MockDataSource, *hashJoinInfo, *testutil.MockActionOnExceed) { + hardLimitBytesNum := int64(5000000) + newRootExceedAction := new(testutil.MockActionOnExceed) + + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = 32 + ctx.GetSessionVars().MaxChunkSize = 32 + ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum) + ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction) + // Consume lots of memory in advance to help to trigger fallback action. + ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999)) + ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) + ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) + + leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false) + + intTp := types.NewFieldType(mysql.TypeLonglong) + intTp.AddFlag(mysql.NotNullFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + stringTp.AddFlag(mysql.NotNullFlag) + + leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp} + rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp} + + leftKeys := []*expression.Column{ + {Index: 1, RetType: intTp}, + {Index: 3, RetType: stringTp}, + } + rightKeys := []*expression.Column{ + {Index: 0, RetType: intTp}, + {Index: 2, RetType: stringTp}, + } + + param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}} + + maxRowTableSegmentSize = 100 + spillChunkSize = 100 + joinType := logicalop.InnerJoin + + returnTypes := getReturnTypes(joinType, param) + + var buildKeys []*expression.Column + var probeKeys []*expression.Column + if param.rightAsBuildSide { + buildKeys = param.rightKeys + probeKeys = param.leftKeys + } else { + buildKeys = param.leftKeys + probeKeys = param.rightKeys + } + + info := &hashJoinInfo{ + ctx: ctx, + schema: buildSchema(returnTypes), + leftExec: leftDataSource, + rightExec: rightDataSource, + joinType: joinType, + rightAsBuildSide: param.rightAsBuildSide, + buildKeys: buildKeys, + probeKeys: probeKeys, + lUsed: param.leftUsed, + rUsed: param.rightUsed, + otherCondition: param.otherCondition, + lUsedInOtherCondition: param.leftUsedByOtherCondition, + rUsedInOtherCondition: param.rightUsedByOtherCondition, + } + + return leftDataSource, rightDataSource, info, newRootExceedAction +} + func testRandomFail(t *testing.T, ctx *mock.Context, joinType logicalop.JoinType, param spillTestParam, leftDataSource *testutil.MockDataSource, rightDataSource *testutil.MockDataSource) { ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1500000) ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) @@ -267,71 +339,7 @@ func TestOuterJoinUnderApplyExec(t *testing.T) { } func TestFallBackAction(t *testing.T) { - hardLimitBytesNum := int64(5000000) - newRootExceedAction := new(testutil.MockActionOnExceed) - - ctx := mock.NewContext() - ctx.GetSessionVars().InitChunkSize = 32 - ctx.GetSessionVars().MaxChunkSize = 32 - ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum) - ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction) - // Consume lots of memory in advance to help to trigger fallback action. - ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999)) - ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1) - ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker) - - leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false) - - intTp := types.NewFieldType(mysql.TypeLonglong) - intTp.AddFlag(mysql.NotNullFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - stringTp.AddFlag(mysql.NotNullFlag) - - leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp} - rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp} - - leftKeys := []*expression.Column{ - {Index: 1, RetType: intTp}, - {Index: 3, RetType: stringTp}, - } - rightKeys := []*expression.Column{ - {Index: 0, RetType: intTp}, - {Index: 2, RetType: stringTp}, - } - - param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}} - - maxRowTableSegmentSize = 100 - spillChunkSize = 100 - joinType := logicalop.InnerJoin - - returnTypes := getReturnTypes(joinType, param) - - var buildKeys []*expression.Column - var probeKeys []*expression.Column - if param.rightAsBuildSide { - buildKeys = param.rightKeys - probeKeys = param.leftKeys - } else { - buildKeys = param.leftKeys - probeKeys = param.rightKeys - } - - info := &hashJoinInfo{ - ctx: ctx, - schema: buildSchema(returnTypes), - leftExec: leftDataSource, - rightExec: rightDataSource, - joinType: joinType, - rightAsBuildSide: param.rightAsBuildSide, - buildKeys: buildKeys, - probeKeys: probeKeys, - lUsed: param.leftUsed, - rUsed: param.rightUsed, - otherCondition: param.otherCondition, - lUsedInOtherCondition: param.leftUsedByOtherCondition, - rUsedInOtherCondition: param.rightUsedByOtherCondition, - } + leftDataSource, rightDataSource, info, newRootExceedAction := prepareSimpleHashJoinEnv() leftDataSource.PrepareChunks() rightDataSource.PrepareChunks() @@ -340,6 +348,26 @@ func TestFallBackAction(t *testing.T) { require.Less(t, 0, newRootExceedAction.GetTriggeredNum()) } +func TestIssue59377(t *testing.T) { + leftDataSource, rightDataSource, info, _ := prepareSimpleHashJoinEnv() + leftDataSource.PrepareChunks() + rightDataSource.PrepareChunks() + hashJoinExec := buildHashJoinV2Exec(info) + + err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/Issue59377", "return") + require.NoError(t, err) + defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/Issue59377") + + tmpCtx := context.Background() + hashJoinExec.isMemoryClearedForTest = true + err = hashJoinExec.Open(tmpCtx) + require.NoError(t, err) + chk := exec.NewFirstChunk(hashJoinExec) + err = hashJoinExec.Next(tmpCtx, chk) + require.True(t, err != nil) + _ = hashJoinExec.Close() +} + func TestHashJoinRandomFail(t *testing.T) { ctx := mock.NewContext() ctx.GetSessionVars().InitChunkSize = 32