Skip to content

Commit

Permalink
(tae): fix non-repeatable read for delete nodes (#4027)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH authored and yingfeng committed Jul 16, 2022
1 parent e3aae17 commit 405e4d2
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 37 deletions.
6 changes: 5 additions & 1 deletion pkg/vm/engine/tae/iface/txnif/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type TxnReader interface {
String() string
Repr() string
GetLSN() uint64

SameTxn(startTs uint64) bool
CommitBefore(startTs uint64) bool
CommitAfter(startTs uint64) bool
}

type TxnHandle interface {
Expand Down Expand Up @@ -137,7 +141,7 @@ type DeleteChain interface {

PrepareRangeDelete(start, end uint32, ts uint64) error
DepthLocked() int
CollectDeletesLocked(ts uint64, collectIndex bool) (DeleteNode, error)
CollectDeletesLocked(ts uint64, collectIndex bool, rwlocker *sync.RWMutex) (DeleteNode, error)
}

type AppendNode interface {
Expand Down
12 changes: 9 additions & 3 deletions pkg/vm/engine/tae/stl/containers/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func withAllocator(opts *Options) *Options {
func TestVector1(t *testing.T) {
opts := new(Options)
opts.Capacity = 1
opts.Allocator = stl.NewSimpleAllocator()
vec := NewVector[int64](opts)
now := time.Now()

Expand Down Expand Up @@ -75,7 +76,9 @@ func TestVector2(t *testing.T) {
}

func TestVector3(t *testing.T) {
vec := NewVector[[]byte]()
opts := new(Options)
opts.Allocator = stl.NewSimpleAllocator()
vec := NewVector[[]byte](opts)
vec.Append([]byte("h1"))
vec.Append([]byte("h2"))
vec.Append([]byte("h3"))
Expand All @@ -98,7 +101,10 @@ func TestVector3(t *testing.T) {
}

func TestVector4(t *testing.T) {
vec := NewVector[[]byte]()
opts := &Options{
Allocator: stl.NewSimpleAllocator(),
}
vec := NewVector[[]byte](opts)
vec.Append([]byte("h1"))
vec.Append([]byte("h2"))
vec.Append([]byte("h3"))
Expand All @@ -123,7 +129,7 @@ func TestVector5(t *testing.T) {
opts := &Options{
Allocator: stl.NewSimpleAllocator(),
}
vec := NewVector[[]byte]()
vec := NewVector[[]byte](opts)
vec.Append([]byte("h1"))
vec.Append([]byte("hh2"))
vec.Append([]byte("hhh3"))
Expand Down
10 changes: 5 additions & 5 deletions pkg/vm/engine/tae/tables/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,9 @@ func (blk *dataBlock) FillColumnUpdates(view *model.ColumnView) (err error) {
return
}

func (blk *dataBlock) FillColumnDeletes(view *model.ColumnView) (err error) {
func (blk *dataBlock) FillColumnDeletes(view *model.ColumnView, rwlocker *sync.RWMutex) (err error) {
deleteChain := blk.mvcc.GetDeleteChain()
n, err := deleteChain.CollectDeletesLocked(view.Ts, false)
n, err := deleteChain.CollectDeletesLocked(view.Ts, false, rwlocker)
if err != nil {
return
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func (blk *dataBlock) ResolveColumnMVCCData(
blk.mvcc.RLock()
err = blk.FillColumnUpdates(view)
if err == nil {
err = blk.FillColumnDeletes(view)
err = blk.FillColumnDeletes(view, blk.mvcc.RWMutex)
}
blk.mvcc.RUnlock()
if err != nil {
Expand Down Expand Up @@ -482,7 +482,7 @@ func (blk *dataBlock) ResolveABlkColumnMVCCData(
blk.mvcc.RLock()
err = blk.FillColumnUpdates(view)
if err == nil {
err = blk.FillColumnDeletes(view)
err = blk.FillColumnDeletes(view, blk.mvcc.RWMutex)
}
blk.mvcc.RUnlock()
if err != nil {
Expand Down Expand Up @@ -856,7 +856,7 @@ func (blk *dataBlock) CollectChangesInRange(startTs, endTs uint64) (view *model.
}
}
deleteChain := blk.mvcc.GetDeleteChain()
view.DeleteMask, view.DeleteLogIndexes, err = deleteChain.CollectDeletesInRange(startTs, endTs)
view.DeleteMask, view.DeleteLogIndexes, err = deleteChain.CollectDeletesInRange(startTs, endTs, blk.mvcc.RWMutex)
blk.mvcc.RUnlock()
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/tables/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (node *appendableNode) flushData(ts uint64, colsData *containers.Batch, opC
return
}
deleteChain := mvcc.GetDeleteChain()
n, err := deleteChain.CollectDeletesLocked(ts, false)
n, err := deleteChain.CollectDeletesLocked(ts, false, mvcc.RWMutex)
mvcc.RUnlock()
if err != nil {
return
Expand Down
30 changes: 15 additions & 15 deletions pkg/vm/engine/tae/tables/updates/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,17 +402,17 @@ func TestDeleteChain1(t *testing.T) {
assert.Nil(t, merged)
assert.Equal(t, 2, chain.Depth())

collected, err := chain.CollectDeletesLocked(txn1.GetStartTS(), false)
collected, err := chain.CollectDeletesLocked(txn1.GetStartTS(), false, nil)
assert.NoError(t, err)
assert.Equal(t, uint32(10), collected.GetCardinalityLocked())
collected, err = chain.CollectDeletesLocked(txn2.GetStartTS(), false)
collected, err = chain.CollectDeletesLocked(txn2.GetStartTS(), false, nil)
assert.NoError(t, err)
assert.Equal(t, uint32(11), collected.GetCardinalityLocked())

collected, err = chain.CollectDeletesLocked(0, false)
collected, err = chain.CollectDeletesLocked(0, false, nil)
assert.NoError(t, err)
assert.Nil(t, collected)
collected, err = chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false)
collected, err = chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false, nil)
assert.NoError(t, err)
assert.Nil(t, collected)

Expand All @@ -421,13 +421,13 @@ func TestDeleteChain1(t *testing.T) {
assert.Nil(t, n1.ApplyCommit(nil))
t.Log(chain.StringLocked())

collected, err = chain.CollectDeletesLocked(0, false)
collected, err = chain.CollectDeletesLocked(0, false, nil)
assert.NoError(t, err)
assert.Nil(t, collected)
collected, err = chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false)
collected, err = chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false, nil)
assert.NoError(t, err)
assert.Equal(t, uint32(10), collected.GetCardinalityLocked())
collected, err = chain.CollectDeletesLocked(txn2.GetStartTS(), false)
collected, err = chain.CollectDeletesLocked(txn2.GetStartTS(), false, nil)
assert.NoError(t, err)
assert.Equal(t, uint32(11), collected.GetCardinalityLocked())

Expand All @@ -439,7 +439,7 @@ func TestDeleteChain1(t *testing.T) {
n3 := chain.AddNodeLocked(txn3, handle.DeleteType(handle.DT_Normal))
n3.RangeDeleteLocked(31, 33)

collected, err = chain.CollectDeletesLocked(txn3.GetStartTS(), false)
collected, err = chain.CollectDeletesLocked(txn3.GetStartTS(), false, nil)
assert.NoError(t, err)
assert.Equal(t, uint32(13), collected.GetCardinalityLocked())
t.Log(chain.StringLocked())
Expand Down Expand Up @@ -503,33 +503,33 @@ func TestDeleteChain2(t *testing.T) {
assert.Nil(t, err)
t.Log(chain.StringLocked())

m, err := chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false)
m, err := chain.CollectDeletesLocked(common.NextGlobalSeqNum(), false, nil)
assert.NoError(t, err)
mask := m.(*DeleteNode).mask
assert.Equal(t, uint64(8), mask.GetCardinality())
m, err = chain.CollectDeletesLocked(txn3.GetCommitTS(), false)
m, err = chain.CollectDeletesLocked(txn3.GetCommitTS(), false, nil)
assert.NoError(t, err)
mask = m.(*DeleteNode).mask
assert.Equal(t, uint64(8), mask.GetCardinality())
m, err = chain.CollectDeletesLocked(txn1.GetCommitTS(), false)
m, err = chain.CollectDeletesLocked(txn1.GetCommitTS(), false, nil)
assert.NoError(t, err)
mask = m.(*DeleteNode).mask
assert.Equal(t, uint64(4), mask.GetCardinality())
m, err = chain.CollectDeletesLocked(txn1.GetCommitTS()-1, false)
m, err = chain.CollectDeletesLocked(txn1.GetCommitTS()-1, false, nil)
assert.NoError(t, err)
assert.Nil(t, m)

mask, _, err = chain.CollectDeletesInRange(0, txn3.GetCommitTS())
mask, _, err = chain.CollectDeletesInRange(0, txn3.GetCommitTS(), nil)
assert.NoError(t, err)
t.Log(mask.String())
assert.Equal(t, uint64(8), mask.GetCardinality())

mask, _, err = chain.CollectDeletesInRange(0, txn3.GetCommitTS()+1)
mask, _, err = chain.CollectDeletesInRange(0, txn3.GetCommitTS()+1, nil)
assert.NoError(t, err)
t.Log(mask.String())
assert.Equal(t, uint64(8), mask.GetCardinality())

mask, _, err = chain.CollectDeletesInRange(txn1.GetCommitTS(), txn3.GetCommitTS()+1)
mask, _, err = chain.CollectDeletesInRange(txn1.GetCommitTS(), txn3.GetCommitTS()+1, nil)
assert.NoError(t, err)
t.Log(mask.String())
assert.Equal(t, uint64(4), mask.GetCardinality())
Expand Down
48 changes: 36 additions & 12 deletions pkg/vm/engine/tae/tables/updates/delchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,16 @@ func (chain *DeleteChain) AddMergeNode() txnif.DeleteNode {
}

// CollectDeletesInRange collects [startTs, endTs)
func (chain *DeleteChain) CollectDeletesInRange(startTs, endTs uint64) (mask *roaring.Bitmap, indexes []*wal.Index, err error) {
n, err := chain.CollectDeletesLocked(startTs, true)
func (chain *DeleteChain) CollectDeletesInRange(
startTs, endTs uint64,
rwlocker *sync.RWMutex) (mask *roaring.Bitmap, indexes []*wal.Index, err error) {
n, err := chain.CollectDeletesLocked(startTs, true, rwlocker)
if err != nil {
return
}
startNode := n.(*DeleteNode)
// n, err = chain.CollectDeletesLocked(endTs-1, true)
n, err = chain.CollectDeletesLocked(endTs, true)
n, err = chain.CollectDeletesLocked(endTs, true, rwlocker)
if err != nil {
return
}
Expand All @@ -222,7 +224,10 @@ func (chain *DeleteChain) CollectDeletesInRange(startTs, endTs uint64) (mask *ro
return
}

func (chain *DeleteChain) CollectDeletesLocked(ts uint64, collectIndex bool) (txnif.DeleteNode, error) {
func (chain *DeleteChain) CollectDeletesLocked(
ts uint64,
collectIndex bool,
rwlocker *sync.RWMutex) (txnif.DeleteNode, error) {
var merged *DeleteNode
var err error
chain.LoopChainLocked(func(n *DeleteNode) bool {
Expand All @@ -239,34 +244,53 @@ func (chain *DeleteChain) CollectDeletesLocked(ts uint64, collectIndex bool) (tx
}
n.RLock()
txn := n.txn
if txn != nil && txn.GetStartTS() == ts {

if txn == nil {
if n.GetCommitTSLocked() <= ts {
if merged == nil {
merged = NewMergedNode(n.GetCommitTSLocked())
}
merged.MergeLocked(n, collectIndex)
}
n.RUnlock()
return true
}

if txn.SameTxn(ts) {
// Use the delete from the same active txn
if merged == nil {
merged = NewMergedNode(n.GetCommitTSLocked())
}
merged.MergeLocked(n, collectIndex)
} else if txn != nil && n.GetCommitTSLocked() > ts {
} else if txn.CommitAfter(ts) {
// Skip txn deletes committed after ts
n.RUnlock()
return true
} else if txn != nil {
} else {
// Wait committing txn with commit ts before ts
n.RUnlock()
if rwlocker != nil {
rwlocker.RUnlock()
}
state := txn.GetTxnState(true)
// logutil.Infof("%d -- wait --> %s: %d", ts, txn.Repr(), state)
// If the txn is rollbacked. skip to the next
if state == txnif.TxnStateRollbacked {
if rwlocker != nil {
rwlocker.RLock()
}
return true
} else if state == txnif.TxnStateUnknown {
err = txnif.ErrTxnInternal
if rwlocker != nil {
rwlocker.RLock()
}
return false
}
n.RLock()
if merged == nil {
merged = NewMergedNode(n.GetCommitTSLocked())
if rwlocker != nil {
rwlocker.RLock()
}
merged.MergeLocked(n, collectIndex)
} else if n.GetCommitTSLocked() <= ts {
n.RLock()
if merged == nil {
merged = NewMergedNode(n.GetCommitTSLocked())
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/txn/txnbase/txnctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (ctx *TxnCtx) Repr() string {
return repr
}

func (ctx *TxnCtx) SameTxn(startTs uint64) bool { return ctx.StartTS == startTs }
func (ctx *TxnCtx) CommitBefore(startTs uint64) bool {
return ctx.GetCommitTS() < startTs
}
func (ctx *TxnCtx) CommitAfter(startTs uint64) bool {
return ctx.GetCommitTS() > startTs
}

func (ctx *TxnCtx) String() string { return ctx.Repr() }
func (ctx *TxnCtx) GetID() uint64 { return ctx.ID }
func (ctx *TxnCtx) GetInfo() []byte { return ctx.Info }
Expand Down

0 comments on commit 405e4d2

Please sign in to comment.