diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index c4f9273bb1a30..c1193b4d7b8a6 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -634,8 +634,8 @@ func (s *Scope) getRelData(c *Compile, blockExprList []*plan.Expr) error { if err != nil { return err } - s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice()) + s.NodeInfo.Data.AppendBlockInfoSlice(commited.GetBlockInfoSlice()) } else { tombstones := s.NodeInfo.Data.GetTombstones() commited.AttachTombstones(tombstones) @@ -973,6 +973,7 @@ func (s *Scope) aggOptimize(c *Compile, rel engine.Relation, ctx context.Context }); err != nil { return err } + if partialResults != nil { s.NodeInfo.Data = newRelData //find the last mergegroup diff --git a/pkg/vm/engine/disttae/change_handle.go b/pkg/vm/engine/disttae/change_handle.go index e18fd5d635a74..220eea717cada 100644 --- a/pkg/vm/engine/disttae/change_handle.go +++ b/pkg/vm/engine/disttae/change_handle.go @@ -201,7 +201,9 @@ func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) { ); err != nil { return } - relData := readutil.NewBlockListRelationData(1) + relData := readutil.NewBlockListRelationData( + 1, + readutil.WithPartitionState(part)) h.blockList = blockList for i, end := 0, blockList.Len(); i < end; i++ { relData.AppendBlockInfo(blockList.Get(i)) diff --git a/pkg/vm/engine/disttae/local_disttae_datasource.go b/pkg/vm/engine/disttae/local_disttae_datasource.go index 32a0b5c9eee94..e125c80fb0b82 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource.go @@ -46,6 +46,7 @@ func NewLocalDataSource( ctx context.Context, table *txnTable, txnOffset int, + pState *logtailreplay.PartitionState, rangesSlice objectio.BlockInfoSlice, extraTombstones engine.Tombstoner, skipReadMem bool, @@ -75,11 +76,7 @@ func NewLocalDataSource( } if source.category != engine.ShardingLocalDataSource { - state, err := table.getPartitionState(ctx) - if err != nil { - return nil, err - } - source.pState = state + source.pState = pState } source.table = table diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index 80549d693cd51..378c0769287c9 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -95,8 +95,28 @@ func (p *PartitionState) LogEntry(entry *api.Entry, msg string) { ) } -func (p *PartitionState) Desc() string { - return fmt.Sprintf("PartitionState(tid:%d) objLen %v, rowsLen %v", p.tid, p.dataObjectsNameIndex.Len(), p.rows.Len()) +func (p *PartitionState) Desc(short bool) string { + buf := bytes.Buffer{} + buf.WriteString(fmt.Sprintf("tid= %d, dataObjectCnt= %d, tombstoneObjectCnt= %d, rowsCnt= %d", + p.tid, + p.dataObjectsNameIndex.Len(), + p.tombstoneObjectsNameIndex.Len(), + p.rows.Len())) + + if short { + return buf.String() + } + + buf.WriteString("\n\nRows:\n") + + str := p.LogAllRowEntry() + buf.WriteString(str) + + return buf.String() +} + +func (p *PartitionState) String() string { + return p.Desc(false) } func (p *PartitionState) HandleObjectEntry(ctx context.Context, objectEntry objectio.ObjectEntry, isTombstone bool) (err error) { diff --git a/pkg/vm/engine/disttae/merge.go b/pkg/vm/engine/disttae/merge.go index 2b19b6941828c..87785225b6ef9 100644 --- a/pkg/vm/engine/disttae/merge.go +++ b/pkg/vm/engine/disttae/merge.go @@ -83,7 +83,15 @@ func newCNMergeTask( targets []objectio.ObjectStats, targetObjSize uint32, ) (*cnMergeTask, error) { - relData := readutil.NewBlockListRelationData(1) + + part, err := tbl.getPartitionState(ctx) + if err != nil { + return nil, err + } + + relData := readutil.NewBlockListRelationData(1, + readutil.WithPartitionState(part)) + source, err := tbl.buildLocalDataSource( ctx, 0, diff --git a/pkg/vm/engine/disttae/transfer.go b/pkg/vm/engine/disttae/transfer.go index b9818d7779951..adc102756d25a 100644 --- a/pkg/vm/engine/disttae/transfer.go +++ b/pkg/vm/engine/disttae/transfer.go @@ -479,7 +479,16 @@ func doTransferRowids( ); err != nil { return } - relData := readutil.NewBlockListRelationData(1) + + part, err := table.getPartitionState(ctx) + if err != nil { + return err + } + + relData := readutil.NewBlockListRelationData( + 1, + readutil.WithPartitionState(part)) + for i, end := 0, blockList.Len(); i < end; i++ { relData.AppendBlockInfo(blockList.Get(i)) } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 9652b3171a424..644d498310833 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "runtime/debug" "strconv" "strings" "sync" @@ -538,10 +539,6 @@ func (tbl *txnTable) GetProcess() any { return tbl.proc.Load() } -func (tbl *txnTable) resetSnapshot() { - tbl._partState.Store(nil) -} - func (tbl *txnTable) CollectTombstones( ctx context.Context, txnOffset int, @@ -646,9 +643,16 @@ func (tbl *txnTable) Ranges(ctx context.Context, rangesParam engine.RangesParam) func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesParam) (data engine.RelData, err error) { needUncommited := rangesParam.Policy&engine.Policy_CollectUncommittedData != 0 + + var part *logtailreplay.PartitionState + if part, err = tbl.getPartitionState(ctx); err != nil { + return + } + objRelData := &readutil.ObjListRelData{ NeedFirstEmpty: needUncommited, Rsp: rangesParam.Rsp, + PState: part, } if needUncommited { @@ -659,12 +663,9 @@ func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesPa } } - var part *logtailreplay.PartitionState // get the table's snapshot - if rangesParam.Policy&engine.Policy_CollectCommittedData != 0 { - if part, err = tbl.getPartitionState(ctx); err != nil { - return - } + if !(rangesParam.Policy&engine.Policy_CollectCommittedData != 0) { + part = nil } if err = ForeachSnapshotObjects( @@ -800,9 +801,18 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara return } - blklist := &readutil.BlockListRelData{} + if part == nil { + if part, err = tbl.getPartitionState(ctx); err != nil { + return + } + } + + blklist := readutil.NewBlockListRelationData( + 0, + readutil.WithPartitionState(part)) blklist.SetBlockList(blocks) data = blklist + return } @@ -1790,6 +1800,46 @@ func BuildLocalDataSource( engine.GeneralLocalDataSource) } +func extractPStateFromRelData( + ctx context.Context, + tbl *txnTable, + relData engine.RelData, +) (*logtailreplay.PartitionState, error) { + + var part any + + if x1, o1 := relData.(*readutil.ObjListRelData); o1 { + part = x1.PState + } else if x2, o2 := relData.(*readutil.BlockListRelData); o2 { + part = x2.GetPState() + } + + if part == nil { + // why the partition will be nil ?? + sql := "" + if p := tbl.proc.Load().GetStmtProfile(); p != nil { + sql = p.GetSqlOfStmt() + } + + logutil.Warn("RELDATA-WITH-EMPTY-PSTATE", + zap.String("db", tbl.db.databaseName), + zap.String("table", tbl.tableName), + zap.String("sql", sql), + zap.String("relDataType", fmt.Sprintf("%T", relData)), + zap.String("relDataContent", relData.String()), + zap.String("stack", string(debug.Stack()))) + + pState, err := tbl.getPartitionState(ctx) + if err != nil { + return nil, err + } + + return pState, nil + } + + return part.(*logtailreplay.PartitionState), nil +} + func (tbl *txnTable) buildLocalDataSource( ctx context.Context, txnOffset int, @@ -1800,6 +1850,13 @@ func (tbl *txnTable) buildLocalDataSource( switch relData.GetType() { case engine.RelDataObjList, engine.RelDataBlockList: + var pState *logtailreplay.PartitionState + + pState, err = extractPStateFromRelData(ctx, tbl, relData) + if err != nil { + return nil, err + } + ranges := relData.GetBlockInfoSlice() skipReadMem := !bytes.Equal( objectio.EncodeBlockInfo(ranges.Get(0)), objectio.EmptyBlockInfoBytes) @@ -1823,6 +1880,7 @@ func (tbl *txnTable) buildLocalDataSource( ctx, tbl, txnOffset, + pState, ranges, relData.GetTombstones(), skipReadMem, @@ -1866,8 +1924,16 @@ func (tbl *txnTable) BuildReaders( //relData maybe is nil, indicate that only read data from memory. if relData == nil || relData.DataCnt() == 0 { - relData = readutil.NewBlockListRelationData(1) + part, err := tbl.getPartitionState(ctx) + if err != nil { + return nil, err + } + + relData = readutil.NewBlockListRelationData( + 1, + readutil.WithPartitionState(part)) } + blkCnt := relData.DataCnt() newNum := num if blkCnt < num { @@ -1921,44 +1987,42 @@ func (tbl *txnTable) BuildShardingReaders( func (tbl *txnTable) getPartitionState( ctx context.Context, ) (*logtailreplay.PartitionState, error) { + if !tbl.db.op.IsSnapOp() { - if tbl._partState.Load() == nil { - ps, err := tbl.tryToSubscribe(ctx) - if err != nil { - return nil, err - } - if ps == nil { - ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot() - } - tbl._partState.Store(ps) - if tbl.tableId == catalog.MO_COLUMNS_ID { - logutil.Info("open partition state for mo_columns", - zap.String("txn", tbl.db.op.Txn().DebugString()), - zap.String("desc", ps.Desc()), - zap.String("pointer", fmt.Sprintf("%p", ps))) - } + ps, err := tbl.tryToSubscribe(ctx) + if err != nil { + return nil, err + } + if ps == nil { + ps = tbl.getTxn().engine.GetOrCreateLatestPart(tbl.db.databaseId, tbl.tableId).Snapshot() + } + + if tbl.tableId == catalog.MO_COLUMNS_ID { + logutil.Info("open partition state for mo_columns", + zap.String("txn", tbl.db.op.Txn().DebugString()), + zap.String("desc", ps.Desc(true)), + zap.String("pointer", fmt.Sprintf("%p", ps))) } - return tbl._partState.Load(), nil + + return ps, nil } // for snapshot txnOp - if tbl._partState.Load() == nil { - ps, err := tbl.getTxn().engine.getOrCreateSnapPart( - ctx, - tbl, - types.TimestampToTS(tbl.db.op.Txn().SnapshotTS)) - if err != nil { - return nil, err - } - logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p", - tbl, - tbl.tableName, - tbl.tableId, - tbl.db.op.Txn().DebugString(), - ps) - tbl._partState.Store(ps) + ps, err := tbl.getTxn().engine.getOrCreateSnapPart( + ctx, + tbl, + types.TimestampToTS(tbl.db.op.Txn().SnapshotTS)) + if err != nil { + return nil, err } - return tbl._partState.Load(), nil + logutil.Infof("Get partition state for snapshot read, tbl:%p, table name:%s, tid:%v, txn:%s, ps:%p", + tbl, + tbl.tableName, + tbl.tableId, + tbl.db.op.Txn().DebugString(), + ps) + + return ps, nil } func (tbl *txnTable) tryToSubscribe(ctx context.Context) (ps *logtailreplay.PartitionState, err error) { @@ -2298,6 +2362,8 @@ func (tbl *txnTable) GetNonAppendableObjectStats(ctx context.Context) ([]objecti return objStats, nil } +// Reset what? +// TODO: txnTable should be stateless func (tbl *txnTable) Reset(op client.TxnOperator) error { ws := op.GetWorkspace() if ws == nil { @@ -2312,7 +2378,6 @@ func (tbl *txnTable) Reset(op client.TxnOperator) error { tbl.proc.Store(txn.proc) tbl.createdInTxn = false tbl.lastTS = op.SnapshotTS() - tbl.resetSnapshot() return nil } diff --git a/pkg/vm/engine/disttae/txn_table_delegate.go b/pkg/vm/engine/disttae/txn_table_delegate.go index 788af6045ba3d..791053460e39d 100644 --- a/pkg/vm/engine/disttae/txn_table_delegate.go +++ b/pkg/vm/engine/disttae/txn_table_delegate.go @@ -311,7 +311,14 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang return nil, err } - ret := readutil.NewBlockListRelationData(0) + part, err := tbl.origin.getPartitionState(ctx) + if err != nil { + return nil, err + } + + ret := readutil.NewBlockListRelationData( + 0, + readutil.WithPartitionState(part)) for i := 0; i < rs.DataCnt(); i++ { blk := rs.GetBlockInfo(i) @@ -569,7 +576,14 @@ func (tbl *txnTableDelegate) BuildShardingReaders( //relData maybe is nil, indicate that only read data from memory. if relData == nil || relData.DataCnt() == 0 { - relData = readutil.NewBlockListRelationData(1) + part, err2 := tbl.origin.getPartitionState(ctx) + if err2 != nil { + return nil, err2 + } + + relData = readutil.NewBlockListRelationData( + 1, + readutil.WithPartitionState(part)) } blkCnt := relData.DataCnt() diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index f67a6b2f022fa..054f9f0e28dbe 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -815,13 +815,6 @@ func (txn *Transaction) RollbackLastStatement(ctx context.Context) error { txn.incrStatementCalled = false return nil } -func (txn *Transaction) resetSnapshot() error { - txn.tableCache.Range(func(key, value interface{}) bool { - value.(*txnTableDelegate).origin.resetSnapshot() - return true - }) - return nil -} func (txn *Transaction) IncrSQLCount() { n := txn.sqlCount.Add(1) @@ -844,8 +837,7 @@ func (txn *Transaction) advanceSnapshot( return err } - // reset to get the latest partitionstate - return txn.resetSnapshot() + return nil } // For RC isolation, update the snapshot TS of transaction for each statement. @@ -983,7 +975,6 @@ type txnTable struct { tableDef *plan.TableDef seqnums []uint16 typs []types.Type - _partState atomic.Pointer[logtailreplay.PartitionState] primaryIdx int // -1 means no primary key primarySeqnum int // -1 means no primary key clusterByIdx int // -1 means no clusterBy key diff --git a/pkg/vm/engine/readutil/relation_data.go b/pkg/vm/engine/readutil/relation_data.go index 8736e55323728..79105ee692366 100644 --- a/pkg/vm/engine/readutil/relation_data.go +++ b/pkg/vm/engine/readutil/relation_data.go @@ -17,13 +17,11 @@ package readutil import ( "bytes" "fmt" - - "github.com/matrixorigin/matrixone/pkg/pb/plan" - plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine" ) @@ -191,11 +189,27 @@ func (rd *EmptyRelationData) DataCnt() int { return 0 } +func WithPartitionState(part any) func(*BlockListRelData) { + return func(blrd *BlockListRelData) { + blrd.pState = part + } +} + // emptyCnt is the number of empty blocks preserved -func NewBlockListRelationData(emptyCnt int) *BlockListRelData { - return &BlockListRelData{ +func NewBlockListRelationData( + emptyCnt int, + opts ...func(op *BlockListRelData), +) *BlockListRelData { + + blrd := &BlockListRelData{ blklist: objectio.MakeBlockInfoSlice(emptyCnt), } + + for i := range opts { + opts[i](blrd) + } + + return blrd } func NewBlockListRelationDataOfObject( @@ -210,6 +224,7 @@ func NewBlockListRelationDataOfObject( } type ObjListRelData struct { + PState any NeedFirstEmpty bool expanded bool TotalBlocks uint32 @@ -223,6 +238,10 @@ func (or *ObjListRelData) expand() { or.expanded = true or.blocklistRelData.blklist = objectio.MultiObjectStatsToBlockInfoSlice(or.Objlist, or.NeedFirstEmpty) } + + if or.blocklistRelData.pState == nil { + or.blocklistRelData.pState = or.PState + } } func (or *ObjListRelData) AppendObj(obj *objectio.ObjectStats) { @@ -258,6 +277,11 @@ func (or *ObjListRelData) Split(cpunum int) []engine.RelData { or.expand() return or.blocklistRelData.Split(cpunum) } + + if or.blocklistRelData.pState == nil { + or.blocklistRelData.pState = or.PState + } + //split by range shuffle result := make([]engine.RelData, cpunum) for i := range result { @@ -304,6 +328,7 @@ func (or *ObjListRelData) Split(cpunum int) []engine.RelData { if totalBlocks != int(or.TotalBlocks) { panic("wrong blocks cnt after objlist reldata split!") } + return result } @@ -370,6 +395,7 @@ type BlockListRelData struct { // blkList[0] is a empty block info blklist objectio.BlockInfoSlice + pState any // tombstones tombstones engine.Tombstoner } @@ -387,6 +413,8 @@ func (relData *BlockListRelData) String() string { } else { w.WriteString("\tTombstones: nil\n") } + + w.WriteString(fmt.Sprintf("\nPState: %v", relData.pState)) return w.String() } @@ -421,6 +449,10 @@ func (relData *BlockListRelData) Split(i int) []engine.RelData { return shards } +func (relData *BlockListRelData) GetPState() any { + return relData.pState +} + func (relData *BlockListRelData) GetBlockInfoSlice() objectio.BlockInfoSlice { return relData.blklist.GetAllBytes() } @@ -428,6 +460,7 @@ func (relData *BlockListRelData) GetBlockInfoSlice() objectio.BlockInfoSlice { func (relData *BlockListRelData) BuildEmptyRelData(i int) engine.RelData { l := make([]byte, 0, objectio.BlockInfoSize*i) return &BlockListRelData{ + pState: relData.pState, blklist: l, } } @@ -534,6 +567,7 @@ func (relData *BlockListRelData) GetTombstones() engine.Tombstoner { func (relData *BlockListRelData) DataSlice(i, j int) engine.RelData { blist := objectio.BlockInfoSlice(relData.blklist.Slice(i, j)) return &BlockListRelData{ + pState: relData.pState, blklist: blist, tombstones: relData.tombstones, } diff --git a/pkg/vm/engine/readutil/relation_data_test.go b/pkg/vm/engine/readutil/relation_data_test.go index d3eb88fbd519e..00caf18668313 100644 --- a/pkg/vm/engine/readutil/relation_data_test.go +++ b/pkg/vm/engine/readutil/relation_data_test.go @@ -54,6 +54,6 @@ func TestEmptyRelationData(t *testing.T) { require.Panics(t, func() { relData.DataSlice(0, 0) }) - require.Equal(t, 0, relData.DataCnt()) + require.Equal(t, 0, relData.DataCnt()) } diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index dfd7620e13f46..80a228c22504a 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -1242,6 +1242,30 @@ func Test_ShardingLocalReader(t *testing.T) { } + // test build reader using nil relData + { + //start to build sharding readers. + _, rel, txn, err := disttaeEngine.GetTable(ctx, databaseName, tableName) + require.NoError(t, err) + + shardSvr := testutil.MockShardService() + delegate, _ := disttae.MockTableDelegate(rel, shardSvr) + num := 10 + _, err = delegate.BuildShardingReaders( + ctx, + rel.GetProcess(), + nil, + nil, + num, + 0, + false, + 0, + ) + + require.NoError(t, err) + require.NoError(t, txn.Commit(ctx)) + } + { //start to build sharding readers. _, rel, txn, err := disttaeEngine.GetTable(ctx, databaseName, tableName) @@ -1250,6 +1274,8 @@ func Test_ShardingLocalReader(t *testing.T) { relData, err := rel.Ranges(ctx, engine.DefaultRangesParam) require.NoError(t, err) + fmt.Println(relData.String()) + shardSvr := testutil.MockShardService() delegate, _ := disttae.MockTableDelegate(rel, shardSvr) num := 10