Skip to content

Commit

Permalink
Merge branch 'main' into dop
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 28, 2024
2 parents c4c0dee + 794f859 commit 6b65cf1
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 23 deletions.
16 changes: 12 additions & 4 deletions pkg/frontend/routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,12 @@ func (rm *RoutineManager) cleanKillQueue() {
ar := rm.accountRoutine
ar.killQueueMu.Lock()
defer ar.killQueueMu.Unlock()
for toKillAccount, killRecord := range ar.killIdQueue {
if time.Since(killRecord.killTime) > time.Duration(getPu(rm.service).SV.CleanKillQueueInterval)*time.Minute {
delete(ar.killIdQueue, toKillAccount)
pu := getPu(rm.service)
if pu != nil {
for toKillAccount, killRecord := range ar.killIdQueue {
if time.Since(killRecord.killTime) > time.Duration(pu.SV.CleanKillQueueInterval)*time.Minute {
delete(ar.killIdQueue, toKillAccount)
}
}
}
}
Expand Down Expand Up @@ -520,7 +523,12 @@ func NewRoutineManager(ctx context.Context, service string) (*RoutineManager, er
default:
}
rm.KillRoutineConnections()
time.Sleep(time.Duration(time.Duration(getPu(rm.service).SV.KillRountinesInterval) * time.Second))
pu := getPu(rm.service)
if pu != nil {
time.Sleep(time.Duration(pu.SV.KillRountinesInterval) * time.Second)
} else {
break
}
}
}()

Expand Down
18 changes: 18 additions & 0 deletions pkg/frontend/routine_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -219,3 +220,20 @@ func TestRoutineManager_killClients(t *testing.T) {
})
}
}

func Test_rm(t *testing.T) {
sv, err := getSystemVariables("test/system_vars_config.toml")
if err != nil {
t.Error(err)
}
pu := config.NewParameterUnit(sv, nil, nil, nil)
pu.SV.SkipCheckUser = true
pu.SV.KillRountinesInterval = 1
setPu("", pu)
rm, err := NewRoutineManager(context.Background(), "")
assert.NoError(t, err)
rm.cleanKillQueue()
setPu("", nil)
time.Sleep(2 * time.Second)
rm.cancelCtx()
}
7 changes: 3 additions & 4 deletions pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,10 +1010,6 @@ func lockTalbeIfLockCountIsZero(
}
for idx := 0; idx < len(lockOp.targets); idx++ {
target := lockOp.targets[idx]
// do not lock table or rows at the end for hidden table
if !target.lockTableAtTheEnd {
continue
}
if target.lockRows != nil {
vec, free, err := colexec.GetReadonlyResultFromNoColumnExpression(proc, target.lockRows)
if err != nil {
Expand All @@ -1035,6 +1031,9 @@ func lockTalbeIfLockCountIsZero(
return err
}
} else {
if !target.lockTableAtTheEnd {
continue
}
err := LockTable(lockOp.engine, proc, target.tableID, target.primaryColumnType, false)
if err != nil {
return err
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/plan/opt_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,9 +1155,6 @@ func (builder *QueryBuilder) lockTableIfLockNoRowsAtTheEndForDelAndUpdate() (err
return
}
tableDef := baseNode.TableDef
if !getLockTableAtTheEnd(tableDef) {
return
}
objRef := baseNode.ObjRef
tableIDs := make(map[uint64]bool)
tableIDs[tableDef.TblId] = true
Expand Down Expand Up @@ -1229,7 +1226,7 @@ func (builder *QueryBuilder) lockTableIfLockNoRowsAtTheEndForDelAndUpdate() (err
}

lockTarget.LockRows = lockRows
lockTarget.LockTableAtTheEnd = true
lockTarget.LockTableAtTheEnd = false
}

return
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/plan/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2709,7 +2709,6 @@ func (builder *QueryBuilder) bindSelect(stmt *tree.Select, ctx *BindContext, isR
PrimaryColTyp: pkTyp,
Block: true,
RefreshTsIdxInBat: -1, //unsupport now
LockTableAtTheEnd: getLockTableAtTheEnd(tableDef),
}
if tableDef.Partition != nil {
partTableIDs, _ := getPartTableIdsAndNames(builder.compCtx, objRef, tableDef)
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/plan/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,50 +880,64 @@ func ReCalcNodeStats(nodeID int32, builder *QueryBuilder, recursive bool, leafNo
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_LEFT:
node.Stats.Outcnt = leftStats.Outcnt
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_RIGHT:
node.Stats.Outcnt = rightStats.Outcnt
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_DEDUP:
node.Stats.Outcnt = rightStats.Outcnt
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_OUTER:
node.Stats.Outcnt = leftStats.Outcnt + rightStats.Outcnt
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_SEMI, plan.Node_INDEX:
node.Stats.Outcnt = leftStats.Outcnt * selectivity
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_ANTI:
node.Stats.Outcnt = leftStats.Outcnt * (1 - rightStats.Selectivity) * 0.5
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_SINGLE, plan.Node_MARK:
node.Stats.Outcnt = leftStats.Outcnt
node.Stats.Cost = leftStats.Cost + rightStats.Cost
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_L2: //L2 join is very time-consuming, increase the cost to get more dop
node.Stats.Outcnt = leftStats.Outcnt
node.Stats.Cost = (leftStats.Cost + rightStats.Cost) * 8
node.Stats.HashmapStats.HashmapSize = rightStats.Outcnt
node.Stats.Selectivity = selectivity_out
node.Stats.BlockNum = leftStats.BlockNum * 8
}
node.Stats.BlockNum = leftStats.BlockNum

case plan.Node_AGG:
if needResetHashMapStats {
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/plan/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2712,11 +2712,12 @@ func offsetToString(offset int) string {
return fmt.Sprintf("+%02d:%02d", hours, minutes)
}

func getLockTableAtTheEnd(tableDef *TableDef) bool {
if tableDef.Pkey.PkeyColName == catalog.FakePrimaryKeyColName || //fake pk, skip
tableDef.Partition != nil || // unsupport partition table
len(tableDef.Pkey.Names) > 1 { // unsupport multi-column primary key
return false
}
return !strings.HasPrefix(tableDef.Name, catalog.IndexTableNamePrefix)
}
// do not lock table if lock no rows now.
// if need to lock table, uncomment these codes
// func getLockTableAtTheEnd(tableDef *TableDef) bool {
// if tableDef.Pkey.PkeyColName == catalog.FakePrimaryKeyColName || //fake pk, skip
// tableDef.Partition != nil { // unsupport partition table
// return false
// }
// return !strings.HasPrefix(tableDef.Name, catalog.IndexTableNamePrefix)
// }
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ select * from su_06 where c1>=2 for update;
use select_for_update;
prepare stmt1 from 'delete from su_06 where c3 in (?)';
set @var = 3;
-- @wait:0:commit
execute stmt1 using @var;
select * from su_06;
-- @session}
Expand Down

0 comments on commit 6b65cf1

Please sign in to comment.