Skip to content

Commit

Permalink
Lock both src and dst parent for rename op to avoid txn conflicts and…
Browse files Browse the repository at this point in the history
… performance

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit committed Jan 17, 2025
1 parent fac5515 commit 1c13135
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
28 changes: 28 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,34 @@ func (r *baseMeta) txUnlock(idx uint) {
r.txlocks[idx%nlocks].Unlock()
}

func (r *baseMeta) txBatchLock(inodes ...Ino) func() {
switch len(inodes) {
case 0:
return func() {}
case 1: // most cases
r.txLock(uint(inodes[0]))
return func() { r.txUnlock(uint(inodes[0])) }
default: // for rename and more
sort.Slice(inodes, func(i, j int) bool { return inodes[i]%nlocks < inodes[j]%nlocks })
lockedKeys := make(map[uint]struct{}, 2)
lockedInodes := make([]uint, 0, len(inodes))
for _, ino := range inodes {
ino := uint(ino)
if _, locked := lockedKeys[ino%nlocks]; locked {
continue // Go does not support recursive locks
}
lockedKeys[ino%nlocks] = struct{}{}
r.txLock(ino)
lockedInodes = append(lockedInodes, ino)
}
return func() {
for _, ino := range lockedInodes {
r.txUnlock(ino)
}
}
}
}

func (r *baseMeta) OnMsg(mtype uint32, cb MsgCallback) {
r.msgCallbacks.Lock()
defer r.msgCallbacks.Unlock()
Expand Down
60 changes: 60 additions & 0 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"os"
"reflect"
"runtime"
Expand Down Expand Up @@ -3282,3 +3283,62 @@ func TestSymlinkCache(t *testing.T) {
cache.doClean()
require.Equal(t, int32(8000), cache.size.Load())
}

func TestTxBatchLock(t *testing.T) {
var base baseMeta
// 0 inode
func() {
defer base.txBatchLock()()
}()
// 1 inodes
func() {
defer base.txBatchLock(2)()
}()
// 2 inodes
func() {
defer base.txBatchLock(1, 2)()
}()
// no reentrant
func() {
defer base.txBatchLock(1, 1, nlocks+1)()
}()
// no deadlock - sequential
func() {
batch1 := []Ino{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
batch2 := []Ino{1 + nlocks*9, 2 + nlocks*8, 3 + nlocks*7, 4 + nlocks*6, 5 + nlocks*5, 6 + nlocks*4, 7 + nlocks*3, 8 + nlocks*2, 9 + nlocks, 10}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(2)
go func() {
defer wg.Done()
defer base.txBatchLock(batch1...)()
}()
go func() {
defer wg.Done()
defer base.txBatchLock(batch2...)()
}()
}
wg.Wait()
}()
// no deadlock - fuzz testing
func() {
var batch1, batch2 []Ino
for i := 0; i < 100; i++ {
batch1 = append(batch1, Ino(rand.Uint64()+1))
batch2 = append(batch2, Ino(rand.Uint64()+1))
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(2)
go func() {
defer wg.Done()
defer base.txBatchLock(batch1...)()
}()
go func() {
defer wg.Done()
defer base.txBatchLock(batch2...)()
}()
}
wg.Wait()
}()
}
7 changes: 2 additions & 5 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,10 +908,7 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error {
inodes = []Ino{1}
}

if len(inodes) > 0 {
m.txLock(uint(inodes[0]))
defer m.txUnlock(uint(inodes[0]))
}
defer m.txBatchLock(inodes...)()
var lastErr error
for i := 0; i < 50; i++ {
_, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
Expand Down Expand Up @@ -2252,7 +2249,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
}
return err
})
}, parentSrc, parentDst)
if err == nil && !exchange && trash == 0 {
if dino > 0 && dn.Type == TypeFile && dn.Nlink == 0 {
m.fileDeleted(opened, false, dino, dn.Length)
Expand Down
11 changes: 2 additions & 9 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,7 @@ func (m *kvMeta) txn(ctx context.Context, f func(tx *kvTxn) error, inodes ...Ino
}
start := time.Now()
defer func() { m.txDist.Observe(time.Since(start).Seconds()) }()
if len(inodes) > 0 {
m.txLock(uint(inodes[0]))
defer m.txUnlock(uint(inodes[0]))
}
defer m.txBatchLock(inodes...)()
var lastErr error
for i := 0; i < 50; i++ {
err := m.client.txn(ctx, f, i)
Expand Down Expand Up @@ -1491,10 +1488,6 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
var dtyp uint8
var tattr Attr
var newSpace, newInode int64
lockParent := parentSrc
if isTrash(lockParent) {
lockParent = parentDst
}
err := m.txn(ctx, func(tx *kvTxn) error {
opened = false
dino, dtyp = 0, 0
Expand Down Expand Up @@ -1731,7 +1724,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
tx.set(m.inodeKey(parentDst), m.marshal(&dattr))
}
return nil
}, lockParent)
}, parentSrc, parentDst)
if err == nil && !exchange && trash == 0 {
if dino > 0 && dtyp == TypeFile && tattr.Nlink == 0 {
m.fileDeleted(opened, false, dino, tattr.Length)
Expand Down

0 comments on commit 1c13135

Please sign in to comment.