Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove the partition state that is cached in the txn table. #21430

Merged
merged 10 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ func (s *Scope) getRelData(c *Compile, blockExprList []*plan.Expr) error {
if err != nil {
return err
}
s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice())

s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice())
} else {
tombstones := s.NodeInfo.Data.GetTombstones()
commited.AttachTombstones(tombstones)
Expand Down Expand Up @@ -973,6 +973,7 @@ func (s *Scope) aggOptimize(c *Compile, rel engine.Relation, ctx context.Context
}); err != nil {
return err
}

if partialResults != nil {
s.NodeInfo.Data = newRelData
//find the last mergegroup
Expand Down
4 changes: 3 additions & 1 deletion pkg/vm/engine/disttae/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) {
); err != nil {
return
}
relData := readutil.NewBlockListRelationData(1)
relData := readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
h.blockList = blockList
for i, end := 0, blockList.Len(); i < end; i++ {
relData.AppendBlockInfo(blockList.Get(i))
Expand Down
7 changes: 2 additions & 5 deletions pkg/vm/engine/disttae/local_disttae_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewLocalDataSource(
ctx context.Context,
table *txnTable,
txnOffset int,
pState *logtailreplay.PartitionState,
rangesSlice objectio.BlockInfoSlice,
extraTombstones engine.Tombstoner,
skipReadMem bool,
Expand Down Expand Up @@ -75,11 +76,7 @@ func NewLocalDataSource(
}

if source.category != engine.ShardingLocalDataSource {
state, err := table.getPartitionState(ctx)
if err != nil {
return nil, err
}
source.pState = state
source.pState = pState
}

source.table = table
Expand Down
24 changes: 22 additions & 2 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,28 @@ func (p *PartitionState) LogEntry(entry *api.Entry, msg string) {
)
}

func (p *PartitionState) Desc() string {
return fmt.Sprintf("PartitionState(tid:%d) objLen %v, rowsLen %v", p.tid, p.dataObjectsNameIndex.Len(), p.rows.Len())
func (p *PartitionState) Desc(short bool) string {
buf := bytes.Buffer{}
buf.WriteString(fmt.Sprintf("tid= %d, dataObjectCnt= %d, tombstoneObjectCnt= %d, rowsCnt= %d",
p.tid,
p.dataObjectsNameIndex.Len(),
p.tombstoneObjectsNameIndex.Len(),
p.rows.Len()))

if short {
return buf.String()
}

buf.WriteString("\n\nRows:\n")

str := p.LogAllRowEntry()
buf.WriteString(str)

return buf.String()
}

func (p *PartitionState) String() string {
return p.Desc(false)
}

func (p *PartitionState) HandleObjectEntry(ctx context.Context, objectEntry objectio.ObjectEntry, isTombstone bool) (err error) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/vm/engine/disttae/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ func newCNMergeTask(
targets []objectio.ObjectStats,
targetObjSize uint32,
) (*cnMergeTask, error) {
relData := readutil.NewBlockListRelationData(1)

part, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

relData := readutil.NewBlockListRelationData(1,
readutil.WithPartitionState(part))

source, err := tbl.buildLocalDataSource(
ctx,
0,
Expand Down
11 changes: 10 additions & 1 deletion pkg/vm/engine/disttae/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,16 @@ func doTransferRowids(
); err != nil {
return
}
relData := readutil.NewBlockListRelationData(1)

part, err := table.getPartitionState(ctx)
if err != nil {
return err
}

relData := readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))

for i, end := 0, blockList.Len(); i < end; i++ {
relData.AppendBlockInfo(blockList.Get(i))
}
Expand Down
153 changes: 109 additions & 44 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -538,10 +539,6 @@ func (tbl *txnTable) GetProcess() any {
return tbl.proc.Load()
}

func (tbl *txnTable) resetSnapshot() {
tbl._partState.Store(nil)
}

func (tbl *txnTable) CollectTombstones(
ctx context.Context,
txnOffset int,
Expand Down Expand Up @@ -646,9 +643,16 @@ func (tbl *txnTable) Ranges(ctx context.Context, rangesParam engine.RangesParam)

func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesParam) (data engine.RelData, err error) {
needUncommited := rangesParam.Policy&engine.Policy_CollectUncommittedData != 0

var part *logtailreplay.PartitionState
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}

objRelData := &readutil.ObjListRelData{
NeedFirstEmpty: needUncommited,
Rsp: rangesParam.Rsp,
PState: part,
}

if needUncommited {
Expand All @@ -659,12 +663,9 @@ func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesPa
}
}

var part *logtailreplay.PartitionState
// get the table's snapshot
if rangesParam.Policy&engine.Policy_CollectCommittedData != 0 {
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}
if !(rangesParam.Policy&engine.Policy_CollectCommittedData != 0) {
part = nil
}

if err = ForeachSnapshotObjects(
Expand Down Expand Up @@ -800,9 +801,18 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
return
}

blklist := &readutil.BlockListRelData{}
if part == nil {
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}
}

blklist := readutil.NewBlockListRelationData(
0,
readutil.WithPartitionState(part))
blklist.SetBlockList(blocks)
data = blklist

return
}

Expand Down Expand Up @@ -1790,6 +1800,46 @@ func BuildLocalDataSource(
engine.GeneralLocalDataSource)
}

func extractPStateFromRelData(
ctx context.Context,
tbl *txnTable,
relData engine.RelData,
) (*logtailreplay.PartitionState, error) {

var part any

if x1, o1 := relData.(*readutil.ObjListRelData); o1 {
part = x1.PState
} else if x2, o2 := relData.(*readutil.BlockListRelData); o2 {
part = x2.GetPState()
}

if part == nil {
// why the partition will be nil ??
sql := ""
if p := tbl.proc.Load().GetStmtProfile(); p != nil {
sql = p.GetSqlOfStmt()
}

logutil.Warn("RELDATA-WITH-EMPTY-PSTATE",
zap.String("db", tbl.db.databaseName),
zap.String("table", tbl.tableName),
zap.String("sql", sql),
zap.String("relDataType", fmt.Sprintf("%T", relData)),
zap.String("relDataContent", relData.String()),
zap.String("stack", string(debug.Stack())))

pState, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

return pState, nil
}

return part.(*logtailreplay.PartitionState), nil
}

func (tbl *txnTable) buildLocalDataSource(
ctx context.Context,
txnOffset int,
Expand All @@ -1800,6 +1850,13 @@ func (tbl *txnTable) buildLocalDataSource(

switch relData.GetType() {
case engine.RelDataObjList, engine.RelDataBlockList:
var pState *logtailreplay.PartitionState

pState, err = extractPStateFromRelData(ctx, tbl, relData)
if err != nil {
return nil, err
}

ranges := relData.GetBlockInfoSlice()
skipReadMem := !bytes.Equal(
objectio.EncodeBlockInfo(ranges.Get(0)), objectio.EmptyBlockInfoBytes)
Expand All @@ -1823,6 +1880,7 @@ func (tbl *txnTable) buildLocalDataSource(
ctx,
tbl,
txnOffset,
pState,
ranges,
relData.GetTombstones(),
skipReadMem,
Expand Down Expand Up @@ -1866,8 +1924,16 @@ func (tbl *txnTable) BuildReaders(

//relData maybe is nil, indicate that only read data from memory.
if relData == nil || relData.DataCnt() == 0 {
relData = readutil.NewBlockListRelationData(1)
part, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

relData = readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
}

blkCnt := relData.DataCnt()
newNum := num
if blkCnt < num {
Expand Down Expand Up @@ -1921,44 +1987,42 @@ func (tbl *txnTable) BuildShardingReaders(
func (tbl *txnTable) getPartitionState(
ctx context.Context,
) (*logtailreplay.PartitionState, error) {

if !tbl.db.op.IsSnapOp() {
if tbl._partState.Load() == nil {
ps, err := tbl.tryToSubscribe(ctx)
if err != nil {
return nil, err
}
if ps == nil {
ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot()
}
tbl._partState.Store(ps)
if tbl.tableId == catalog.MO_COLUMNS_ID {
logutil.Info("open partition state for mo_columns",
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("desc", ps.Desc()),
zap.String("pointer", fmt.Sprintf("%p", ps)))
}
ps, err := tbl.tryToSubscribe(ctx)
if err != nil {
return nil, err
}
if ps == nil {
ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot()
}

if tbl.tableId == catalog.MO_COLUMNS_ID {
logutil.Info("open partition state for mo_columns",
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("desc", ps.Desc(true)),
zap.String("pointer", fmt.Sprintf("%p", ps)))
}
return tbl._partState.Load(), nil

return ps, nil
}

// for snapshot txnOp
if tbl._partState.Load() == nil {
ps, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
}
logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p",
tbl,
tbl.tableName,
tbl.tableId,
tbl.db.op.Txn().DebugString(),
ps)
tbl._partState.Store(ps)
ps, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
}
return tbl._partState.Load(), nil
logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p",
tbl,
tbl.tableName,
tbl.tableId,
tbl.db.op.Txn().DebugString(),
ps)

return ps, nil
}

func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.PartitionState, err error) {
Expand Down Expand Up @@ -2298,6 +2362,8 @@ func (tbl *txnTable) GetNonAppendableObjectStats(ctx context.Context) ([]objecti
return objStats, nil
}

// Reset what?
// TODO: txnTable should be stateless
func (tbl *txnTable) Reset(op client.TxnOperator) error {
ws := op.GetWorkspace()
if ws == nil {
Expand All @@ -2312,7 +2378,6 @@ func (tbl *txnTable) Reset(op client.TxnOperator) error {
tbl.proc.Store(txn.proc)
tbl.createdInTxn = false
tbl.lastTS = op.SnapshotTS()
tbl.resetSnapshot()
return nil
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/vm/engine/disttae/txn_table_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang
return nil, err
}

ret := readutil.NewBlockListRelationData(0)
part, err := tbl.origin.getPartitionState(ctx)
if err != nil {
return nil, err
}

ret := readutil.NewBlockListRelationData(
0,
readutil.WithPartitionState(part))

for i := 0; i < rs.DataCnt(); i++ {
blk := rs.GetBlockInfo(i)
Expand Down Expand Up @@ -569,7 +576,14 @@ func (tbl *txnTableDelegate) BuildShardingReaders(

//relData maybe is nil, indicate that only read data from memory.
if relData == nil || relData.DataCnt() == 0 {
relData = readutil.NewBlockListRelationData(1)
part, err2 := tbl.origin.getPartitionState(ctx)
if err2 != nil {
return nil, err2
}

relData = readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
}

blkCnt := relData.DataCnt()
Expand Down
Loading
Loading