Skip to content

Commit

Permalink
executor: fix the incorrect return when hash join encounters error (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 11, 2025
1 parent d90faf1 commit 24d0b8c
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 65 deletions.
1 change: 1 addition & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_test(
deps = [
"//pkg/config",
"//pkg/domain",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/testutil",
"//pkg/expression",
"//pkg/parser/ast",
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,17 @@ func (w *buildWorkerBase) fetchBuildSideRows(ctx context.Context, hashJoinCtx *h

for {
err := checkAndSpillRowTableIfNeeded(fetcherAndWorkerSyncer, spillHelper)
issue59377Intest(&err)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

err = triggerIntest(2)
if err != nil {
hasError = true
errCh <- errors.Trace(err)
return
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,12 @@ func rehash(oldHashValue uint64, rehashBuf []byte, hash hash.Hash64) uint64 {
return hash.Sum64()
}

func issue59377Intest(err *error) {
failpoint.Inject("Issue59377", func() {
*err = errors.New("Random failpoint error is triggered")
})
}

func triggerIntest(errProbability int) error {
failpoint.Inject("slowWorkers", func(val failpoint.Value) {
if val.(bool) {
Expand Down
158 changes: 93 additions & 65 deletions pkg/executor/join/outer_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package join

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -29,6 +31,76 @@ import (
"github.com/stretchr/testify/require"
)

func prepareSimpleHashJoinEnv() (*testutil.MockDataSource, *testutil.MockDataSource, *hashJoinInfo, *testutil.MockActionOnExceed) {
hardLimitBytesNum := int64(5000000)
newRootExceedAction := new(testutil.MockActionOnExceed)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
// Consume lots of memory in advance to help to trigger fallback action.
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)

leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)

intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
stringTp := types.NewFieldType(mysql.TypeVarString)
stringTp.AddFlag(mysql.NotNullFlag)

leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}

leftKeys := []*expression.Column{
{Index: 1, RetType: intTp},
{Index: 3, RetType: stringTp},
}
rightKeys := []*expression.Column{
{Index: 0, RetType: intTp},
{Index: 2, RetType: stringTp},
}

param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}

maxRowTableSegmentSize = 100
spillChunkSize = 100
joinType := logicalop.InnerJoin

returnTypes := getReturnTypes(joinType, param)

var buildKeys []*expression.Column
var probeKeys []*expression.Column
if param.rightAsBuildSide {
buildKeys = param.rightKeys
probeKeys = param.leftKeys
} else {
buildKeys = param.leftKeys
probeKeys = param.rightKeys
}

info := &hashJoinInfo{
ctx: ctx,
schema: buildSchema(returnTypes),
leftExec: leftDataSource,
rightExec: rightDataSource,
joinType: joinType,
rightAsBuildSide: param.rightAsBuildSide,
buildKeys: buildKeys,
probeKeys: probeKeys,
lUsed: param.leftUsed,
rUsed: param.rightUsed,
otherCondition: param.otherCondition,
lUsedInOtherCondition: param.leftUsedByOtherCondition,
rUsedInOtherCondition: param.rightUsedByOtherCondition,
}

return leftDataSource, rightDataSource, info, newRootExceedAction
}

func testRandomFail(t *testing.T, ctx *mock.Context, joinType logicalop.JoinType, param spillTestParam, leftDataSource *testutil.MockDataSource, rightDataSource *testutil.MockDataSource) {
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSQLText, 1500000)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
Expand Down Expand Up @@ -267,71 +339,7 @@ func TestOuterJoinUnderApplyExec(t *testing.T) {
}

func TestFallBackAction(t *testing.T) {
hardLimitBytesNum := int64(5000000)
newRootExceedAction := new(testutil.MockActionOnExceed)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
ctx.GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, hardLimitBytesNum)
ctx.GetSessionVars().MemTracker.SetActionOnExceed(newRootExceedAction)
// Consume lots of memory in advance to help to trigger fallback action.
ctx.GetSessionVars().MemTracker.Consume(int64(float64(hardLimitBytesNum) * 0.99999))
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(memory.LabelForSQLText, -1)
ctx.GetSessionVars().StmtCtx.MemTracker.AttachTo(ctx.GetSessionVars().MemTracker)

leftDataSource, rightDataSource := buildLeftAndRightDataSource(ctx, leftCols, rightCols, false)

intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
stringTp := types.NewFieldType(mysql.TypeVarString)
stringTp.AddFlag(mysql.NotNullFlag)

leftTypes := []*types.FieldType{intTp, intTp, intTp, stringTp, intTp}
rightTypes := []*types.FieldType{intTp, intTp, stringTp, intTp, intTp}

leftKeys := []*expression.Column{
{Index: 1, RetType: intTp},
{Index: 3, RetType: stringTp},
}
rightKeys := []*expression.Column{
{Index: 0, RetType: intTp},
{Index: 2, RetType: stringTp},
}

param := spillTestParam{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{0, 2, 3, 4}, nil, nil, nil, []int64{5000000, 1700000, 6000000, 1500000, 10000}}

maxRowTableSegmentSize = 100
spillChunkSize = 100
joinType := logicalop.InnerJoin

returnTypes := getReturnTypes(joinType, param)

var buildKeys []*expression.Column
var probeKeys []*expression.Column
if param.rightAsBuildSide {
buildKeys = param.rightKeys
probeKeys = param.leftKeys
} else {
buildKeys = param.leftKeys
probeKeys = param.rightKeys
}

info := &hashJoinInfo{
ctx: ctx,
schema: buildSchema(returnTypes),
leftExec: leftDataSource,
rightExec: rightDataSource,
joinType: joinType,
rightAsBuildSide: param.rightAsBuildSide,
buildKeys: buildKeys,
probeKeys: probeKeys,
lUsed: param.leftUsed,
rUsed: param.rightUsed,
otherCondition: param.otherCondition,
lUsedInOtherCondition: param.leftUsedByOtherCondition,
rUsedInOtherCondition: param.rightUsedByOtherCondition,
}
leftDataSource, rightDataSource, info, newRootExceedAction := prepareSimpleHashJoinEnv()

leftDataSource.PrepareChunks()
rightDataSource.PrepareChunks()
Expand All @@ -340,6 +348,26 @@ func TestFallBackAction(t *testing.T) {
require.Less(t, 0, newRootExceedAction.GetTriggeredNum())
}

func TestIssue59377(t *testing.T) {
leftDataSource, rightDataSource, info, _ := prepareSimpleHashJoinEnv()
leftDataSource.PrepareChunks()
rightDataSource.PrepareChunks()
hashJoinExec := buildHashJoinV2Exec(info)

err := failpoint.Enable("github.com/pingcap/tidb/pkg/executor/join/Issue59377", "return")
require.NoError(t, err)
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/join/Issue59377")

tmpCtx := context.Background()
hashJoinExec.isMemoryClearedForTest = true
err = hashJoinExec.Open(tmpCtx)
require.NoError(t, err)
chk := exec.NewFirstChunk(hashJoinExec)
err = hashJoinExec.Next(tmpCtx, chk)
require.True(t, err != nil)
_ = hashJoinExec.Close()
}

func TestHashJoinRandomFail(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
Expand Down

0 comments on commit 24d0b8c

Please sign in to comment.