Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ouyuanning authored Feb 25, 2025
2 parents a207787 + b5a7d11 commit f4a82df
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 74 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ func (s *Scope) getRelData(c *Compile, blockExprList []*plan.Expr) error {
if err != nil {
return err
}
s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice())

s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice())
} else {
tombstones := s.NodeInfo.Data.GetTombstones()
commited.AttachTombstones(tombstones)
Expand Down Expand Up @@ -973,6 +973,7 @@ func (s *Scope) aggOptimize(c *Compile, rel engine.Relation, ctx context.Context
}); err != nil {
return err
}

if partialResults != nil {
s.NodeInfo.Data = newRelData
//find the last mergegroup
Expand Down
4 changes: 3 additions & 1 deletion pkg/vm/engine/disttae/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) {
); err != nil {
return
}
relData := readutil.NewBlockListRelationData(1)
relData := readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
h.blockList = blockList
for i, end := 0, blockList.Len(); i < end; i++ {
relData.AppendBlockInfo(blockList.Get(i))
Expand Down
7 changes: 2 additions & 5 deletions pkg/vm/engine/disttae/local_disttae_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewLocalDataSource(
ctx context.Context,
table *txnTable,
txnOffset int,
pState *logtailreplay.PartitionState,
rangesSlice objectio.BlockInfoSlice,
extraTombstones engine.Tombstoner,
skipReadMem bool,
Expand Down Expand Up @@ -75,11 +76,7 @@ func NewLocalDataSource(
}

if source.category != engine.ShardingLocalDataSource {
state, err := table.getPartitionState(ctx)
if err != nil {
return nil, err
}
source.pState = state
source.pState = pState
}

source.table = table
Expand Down
24 changes: 22 additions & 2 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,28 @@ func (p *PartitionState) LogEntry(entry *api.Entry, msg string) {
)
}

func (p *PartitionState) Desc() string {
return fmt.Sprintf("PartitionState(tid:%d) objLen %v, rowsLen %v", p.tid, p.dataObjectsNameIndex.Len(), p.rows.Len())
func (p *PartitionState) Desc(short bool) string {
buf := bytes.Buffer{}
buf.WriteString(fmt.Sprintf("tid= %d, dataObjectCnt= %d, tombstoneObjectCnt= %d, rowsCnt= %d",
p.tid,
p.dataObjectsNameIndex.Len(),
p.tombstoneObjectsNameIndex.Len(),
p.rows.Len()))

if short {
return buf.String()
}

buf.WriteString("\n\nRows:\n")

str := p.LogAllRowEntry()
buf.WriteString(str)

return buf.String()
}

func (p *PartitionState) String() string {
return p.Desc(false)
}

func (p *PartitionState) HandleObjectEntry(ctx context.Context, objectEntry objectio.ObjectEntry, isTombstone bool) (err error) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/vm/engine/disttae/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ func newCNMergeTask(
targets []objectio.ObjectStats,
targetObjSize uint32,
) (*cnMergeTask, error) {
relData := readutil.NewBlockListRelationData(1)

part, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

relData := readutil.NewBlockListRelationData(1,
readutil.WithPartitionState(part))

source, err := tbl.buildLocalDataSource(
ctx,
0,
Expand Down
11 changes: 10 additions & 1 deletion pkg/vm/engine/disttae/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,16 @@ func doTransferRowids(
); err != nil {
return
}
relData := readutil.NewBlockListRelationData(1)

part, err := table.getPartitionState(ctx)
if err != nil {
return err
}

relData := readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))

for i, end := 0, blockList.Len(); i < end; i++ {
relData.AppendBlockInfo(blockList.Get(i))
}
Expand Down
153 changes: 109 additions & 44 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -538,10 +539,6 @@ func (tbl *txnTable) GetProcess() any {
return tbl.proc.Load()
}

func (tbl *txnTable) resetSnapshot() {
tbl._partState.Store(nil)
}

func (tbl *txnTable) CollectTombstones(
ctx context.Context,
txnOffset int,
Expand Down Expand Up @@ -646,9 +643,16 @@ func (tbl *txnTable) Ranges(ctx context.Context, rangesParam engine.RangesParam)

func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesParam) (data engine.RelData, err error) {
needUncommited := rangesParam.Policy&engine.Policy_CollectUncommittedData != 0

var part *logtailreplay.PartitionState
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}

objRelData := &readutil.ObjListRelData{
NeedFirstEmpty: needUncommited,
Rsp: rangesParam.Rsp,
PState: part,
}

if needUncommited {
Expand All @@ -659,12 +663,9 @@ func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesPa
}
}

var part *logtailreplay.PartitionState
// get the table's snapshot
if rangesParam.Policy&engine.Policy_CollectCommittedData != 0 {
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}
if !(rangesParam.Policy&engine.Policy_CollectCommittedData != 0) {
part = nil
}

if err = ForeachSnapshotObjects(
Expand Down Expand Up @@ -800,9 +801,18 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
return
}

blklist := &readutil.BlockListRelData{}
if part == nil {
if part, err = tbl.getPartitionState(ctx); err != nil {
return
}
}

blklist := readutil.NewBlockListRelationData(
0,
readutil.WithPartitionState(part))
blklist.SetBlockList(blocks)
data = blklist

return
}

Expand Down Expand Up @@ -1790,6 +1800,46 @@ func BuildLocalDataSource(
engine.GeneralLocalDataSource)
}

func extractPStateFromRelData(
ctx context.Context,
tbl *txnTable,
relData engine.RelData,
) (*logtailreplay.PartitionState, error) {

var part any

if x1, o1 := relData.(*readutil.ObjListRelData); o1 {
part = x1.PState
} else if x2, o2 := relData.(*readutil.BlockListRelData); o2 {
part = x2.GetPState()
}

if part == nil {
// why the partition will be nil ??
sql := ""
if p := tbl.proc.Load().GetStmtProfile(); p != nil {
sql = p.GetSqlOfStmt()
}

logutil.Warn("RELDATA-WITH-EMPTY-PSTATE",
zap.String("db", tbl.db.databaseName),
zap.String("table", tbl.tableName),
zap.String("sql", sql),
zap.String("relDataType", fmt.Sprintf("%T", relData)),
zap.String("relDataContent", relData.String()),
zap.String("stack", string(debug.Stack())))

pState, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

return pState, nil
}

return part.(*logtailreplay.PartitionState), nil
}

func (tbl *txnTable) buildLocalDataSource(
ctx context.Context,
txnOffset int,
Expand All @@ -1800,6 +1850,13 @@ func (tbl *txnTable) buildLocalDataSource(

switch relData.GetType() {
case engine.RelDataObjList, engine.RelDataBlockList:
var pState *logtailreplay.PartitionState

pState, err = extractPStateFromRelData(ctx, tbl, relData)
if err != nil {
return nil, err
}

ranges := relData.GetBlockInfoSlice()
skipReadMem := !bytes.Equal(
objectio.EncodeBlockInfo(ranges.Get(0)), objectio.EmptyBlockInfoBytes)
Expand All @@ -1823,6 +1880,7 @@ func (tbl *txnTable) buildLocalDataSource(
ctx,
tbl,
txnOffset,
pState,
ranges,
relData.GetTombstones(),
skipReadMem,
Expand Down Expand Up @@ -1866,8 +1924,16 @@ func (tbl *txnTable) BuildReaders(

//relData maybe is nil, indicate that only read data from memory.
if relData == nil || relData.DataCnt() == 0 {
relData = readutil.NewBlockListRelationData(1)
part, err := tbl.getPartitionState(ctx)
if err != nil {
return nil, err
}

relData = readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
}

blkCnt := relData.DataCnt()
newNum := num
if blkCnt < num {
Expand Down Expand Up @@ -1921,44 +1987,42 @@ func (tbl *txnTable) BuildShardingReaders(
func (tbl *txnTable) getPartitionState(
ctx context.Context,
) (*logtailreplay.PartitionState, error) {

if !tbl.db.op.IsSnapOp() {
if tbl._partState.Load() == nil {
ps, err := tbl.tryToSubscribe(ctx)
if err != nil {
return nil, err
}
if ps == nil {
ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot()
}
tbl._partState.Store(ps)
if tbl.tableId == catalog.MO_COLUMNS_ID {
logutil.Info("open partition state for mo_columns",
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("desc", ps.Desc()),
zap.String("pointer", fmt.Sprintf("%p", ps)))
}
ps, err := tbl.tryToSubscribe(ctx)
if err != nil {
return nil, err
}
if ps == nil {
ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot()
}

if tbl.tableId == catalog.MO_COLUMNS_ID {
logutil.Info("open partition state for mo_columns",
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("desc", ps.Desc(true)),
zap.String("pointer", fmt.Sprintf("%p", ps)))
}
return tbl._partState.Load(), nil

return ps, nil
}

// for snapshot txnOp
if tbl._partState.Load() == nil {
ps, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
}
logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p",
tbl,
tbl.tableName,
tbl.tableId,
tbl.db.op.Txn().DebugString(),
ps)
tbl._partState.Store(ps)
ps, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
}
return tbl._partState.Load(), nil
logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p",
tbl,
tbl.tableName,
tbl.tableId,
tbl.db.op.Txn().DebugString(),
ps)

return ps, nil
}

func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.PartitionState, err error) {
Expand Down Expand Up @@ -2298,6 +2362,8 @@ func (tbl *txnTable) GetNonAppendableObjectStats(ctx context.Context) ([]objecti
return objStats, nil
}

// Reset what?
// TODO: txnTable should be stateless
func (tbl *txnTable) Reset(op client.TxnOperator) error {
ws := op.GetWorkspace()
if ws == nil {
Expand All @@ -2312,7 +2378,6 @@ func (tbl *txnTable) Reset(op client.TxnOperator) error {
tbl.proc.Store(txn.proc)
tbl.createdInTxn = false
tbl.lastTS = op.SnapshotTS()
tbl.resetSnapshot()
return nil
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/vm/engine/disttae/txn_table_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang
return nil, err
}

ret := readutil.NewBlockListRelationData(0)
part, err := tbl.origin.getPartitionState(ctx)
if err != nil {
return nil, err
}

ret := readutil.NewBlockListRelationData(
0,
readutil.WithPartitionState(part))

for i := 0; i < rs.DataCnt(); i++ {
blk := rs.GetBlockInfo(i)
Expand Down Expand Up @@ -569,7 +576,14 @@ func (tbl *txnTableDelegate) BuildShardingReaders(

//relData maybe is nil, indicate that only read data from memory.
if relData == nil || relData.DataCnt() == 0 {
relData = readutil.NewBlockListRelationData(1)
part, err2 := tbl.origin.getPartitionState(ctx)
if err2 != nil {
return nil, err2
}

relData = readutil.NewBlockListRelationData(
1,
readutil.WithPartitionState(part))
}

blkCnt := relData.DataCnt()
Expand Down
Loading

0 comments on commit f4a82df

Please sign in to comment.