Skip to content

Commit

Permalink
update CollectDeletesLocked (#18677)
Browse files Browse the repository at this point in the history
update CollectDeletesLocked, return tae possible duplicate if delsMap is nil

Approved by: @zhangxu19830126, @XuPeng-SH, @sukki37
  • Loading branch information
jiangxinmeng1 authored Sep 10, 2024
1 parent eebb872 commit 2737645
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ const (
ErrNewTxnInCNRollingRestart uint16 = 20635
ErrPrevCheckpointNotFinished uint16 = 20636
ErrCantDelGCChecker uint16 = 20637
ErrPossibleDuplicate uint16 = 20638

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
Expand Down Expand Up @@ -454,6 +455,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrNewTxnInCNRollingRestart: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "new txn in CN rolling restart"},
ErrPrevCheckpointNotFinished: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "prev checkpoint not finished"},
ErrCantDelGCChecker: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can't delete gc checker"},
ErrPossibleDuplicate: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "possible duplicate"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func NewTAEPrepareNoCtx(msg string) *Error {
return newError(Context(), ErrTAEPrepare, msg)
}

func NewPossibleDuplicateNoCtx() *Error {
return newError(Context(), ErrPossibleDuplicate)
}

func NewTxnRWConflictNoCtx() *Error {
return newError(Context(), ErrTxnRWConflict)
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"

Expand Down Expand Up @@ -9055,3 +9056,58 @@ func TestVisitTombstone(t *testing.T) {
tae.Restart(context.Background())
t.Log(tae.Catalog.SimplePPString(3))
}

func TestDedupObject(t *testing.T) {
defer testutils.AfterTest(t)()
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
options.WithGlobalVersionInterval(time.Microsecond)(opts)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
schema := catalog.MockSchemaAll(2, 1)
schema.BlockMaxRows = 50
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 50)
defer bat.Close()

tae.CreateRelAndAppend(bat, true)

tae.CompactBlocks(true)

var dedupTxn txnif.AsyncTxn
// var dedupRel handle.Relation
{
txn2, rel2 := tae.GetRelation()
obj := testutil.GetOneBlockMeta(rel2)
task, err := jobs.NewMergeObjectsTask(nil, txn2, []*catalog.ObjectEntry{obj}, tae.Runtime, 0)
assert.NoError(t, err)
err = task.OnExec(context.Background())
assert.NoError(t, err)
{
tae.DeleteAll(true)
}
dedupTxn, _ = tae.GetRelation()
txn2.Commit(ctx)
}
getObjTxn, getObjRel := tae.GetRelation()
obj := testutil.GetOneBlockMeta(getObjRel)
mergeBlockID := *objectio.NewBlockidWithObjectID(&obj.ID, 0)
getObjTxn.Commit(ctx)
pkVec := bat.Vecs[1]
pkType := pkVec.GetType()
keysZM := index.NewZM(pkType.Oid, pkType.Scale)
index.BatchUpdateZM(keysZM, pkVec.GetDownstreamVector())
location := obj.GetLatestCommittedNodeLocked().BaseNode.ObjectLocation()
bf, _ := objectio.FastLoadBF(
ctx,
location,
false,
tae.Runtime.Fs.Service,
)
{
tae.Runtime.TransferDelsMap.Delete(mergeBlockID)
}
err := obj.GetObjectData().BatchDedup(ctx, dedupTxn, pkVec, keysZM, nil, true, bf, common.DebugAllocator)
t.Log(err)
dedupTxn.Commit(ctx)
}
6 changes: 6 additions & 0 deletions pkg/vm/engine/tae/model/transferdels.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,9 @@ func (t *TransDelsForBlks) Prune(gap time.Duration) {
}
}
}

func (t *TransDelsForBlks) Delete(blkID types.Blockid) {
t.Lock()
defer t.Unlock()
delete(t.dels, blkID)
}
9 changes: 8 additions & 1 deletion pkg/vm/engine/tae/tables/updates/delchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
Expand Down Expand Up @@ -392,7 +393,13 @@ func (chain *DeleteChain) CollectDeletesLocked(
} else {
ts := txn.GetStartTS()
rt := chain.mvcc.meta.GetObjectData().GetRuntime()
tsMapping := rt.TransferDelsMap.GetDelsForBlk(*objectio.NewBlockidWithObjectID(&chain.mvcc.meta.ID, chain.mvcc.blkID)).Mapping
blockID := *objectio.NewBlockidWithObjectID(&chain.mvcc.meta.ID, chain.mvcc.blkID)
delsMap := rt.TransferDelsMap.GetDelsForBlk(blockID)
if delsMap == nil {
err = moerr.NewPossibleDuplicateNoCtx()
return false
}
tsMapping := delsMap.Mapping
if tsMapping == nil {
logutil.Warnf("flushtabletail check special dels for %s, no tsMapping", chain.mvcc.meta.ID.String())
return true
Expand Down

0 comments on commit 2737645

Please sign in to comment.