From 45c325922a5c2c0848853951b2d8d6c30e50ced1 Mon Sep 17 00:00:00 2001 From: JiHwan Yim Date: Tue, 11 Feb 2025 17:05:52 +0900 Subject: [PATCH] Extend RWMutex Interface for locker package (#1135) This commit extends RWMutex interface for locker package. The original plan was to implement RWMutex for maintaining consistency in updateVersionVector during PushPull operations(#1081). However, this change is currently on hold due to the following reasons: 1. A bug was discovered in Lamport removal logic: #1089 2. The removal logic has been deprecated since v0.5.7 This commit includes only the interface extension while the actual implementation will be addressed in a future update. --------- Co-authored-by: Youngteac Hong --- pkg/locker/locker.go | 71 +++++++++++++-- pkg/locker/locker_test.go | 156 +++++++++++++++++++++++++++++++- server/backend/sync/locker.go | 22 +++++ server/rpc/yorkie_server.go | 2 + test/bench/locker_bench_test.go | 28 ++++++ 5 files changed, 268 insertions(+), 11 deletions(-) diff --git a/pkg/locker/locker.go b/pkg/locker/locker.go index e0af69dd8..7178de902 100644 --- a/pkg/locker/locker.go +++ b/pkg/locker/locker.go @@ -49,7 +49,7 @@ type Locker struct { // lockCtr is used by Locker to represent a lock with a given name. type lockCtr struct { - mu sync.Mutex + mu sync.RWMutex // waiters is the number of waiters waiting to acquire the lock // this is int32 instead of uint32 so we can add `-1` in `dec()` waiters int32 @@ -70,7 +70,7 @@ func (l *lockCtr) count() int32 { return atomic.LoadInt32(&l.waiters) } -// Lock locks the mutex +// Lock locks the mutex for writing func (l *lockCtr) Lock() { l.mu.Lock() } @@ -80,11 +80,21 @@ func (l *lockCtr) TryLock() bool { return l.mu.TryLock() } -// Unlock unlocks the mutex +// Unlock unlocks the mutex for writing func (l *lockCtr) Unlock() { l.mu.Unlock() } +// RLock locks the mutex for reading +func (l *lockCtr) RLock() { + l.mu.RLock() +} + +// RUnlock unlocks the mutex for reading +func (l *lockCtr) RUnlock() { + l.mu.RUnlock() +} + // New creates a new Locker func New() *Locker { return &Locker{ @@ -111,9 +121,7 @@ func (l *Locker) Lock(name string) { l.mu.Unlock() // Lock the nameLock outside the main mutex so we don't block other operations - // once locked then we can decrement the number of waiters for this lock nameLock.Lock() - nameLock.dec() } // TryLock locks a mutex with the given name. If it doesn't exist, one is created. @@ -135,9 +143,7 @@ func (l *Locker) TryLock(name string) bool { l.mu.Unlock() // Lock the nameLock outside the main mutex so we don't block other operations - // once locked then we can decrement the number of waiters for this lock succeeded := nameLock.TryLock() - nameLock.dec() return succeeded } @@ -152,10 +158,59 @@ func (l *Locker) Unlock(name string) error { return ErrNoSuchLock } + nameLock.Unlock() + // Decrement waiters here to ensure the lock isn't deleted prematurely + // while another goroutine might still be using it. + nameLock.dec() + + if nameLock.count() == 0 { + delete(l.locks, name) + } + + l.mu.Unlock() + return nil +} + +// RLock acquires a read lock for the given name. +// If there is no lock for that name, a new one is created. +func (l *Locker) RLock(name string) { + l.mu.Lock() + if l.locks == nil { + l.locks = make(map[string]*lockCtr) + } + + nameLock, exists := l.locks[name] + if !exists { + nameLock = &lockCtr{} + l.locks[name] = nameLock + } + + // increment the nameLock waiters while inside the main mutex + // this makes sure that the lock isn't deleted if `Lock` and `Unlock` are called concurrently + nameLock.inc() + l.mu.Unlock() + + // Lock the nameLock outside the main mutex so we don't block other operations + nameLock.RLock() +} + +// RUnlock releases a read lock for the given name. +func (l *Locker) RUnlock(name string) error { + l.mu.Lock() + nameLock, exists := l.locks[name] + if !exists { + l.mu.Unlock() + return ErrNoSuchLock + } + + nameLock.RUnlock() + // Decrement waiters here to ensure the lock isn't deleted prematurely + // while another goroutine might still be using it. + nameLock.dec() + if nameLock.count() == 0 { delete(l.locks, name) } - nameLock.Unlock() l.mu.Unlock() return nil diff --git a/pkg/locker/locker_test.go b/pkg/locker/locker_test.go index 3d3496aaf..137117478 100644 --- a/pkg/locker/locker_test.go +++ b/pkg/locker/locker_test.go @@ -46,8 +46,8 @@ func TestLockerLock(t *testing.T) { l.Lock("test") ctr := l.locks["test"] - if ctr.count() != 0 { - t.Fatalf("expected waiters to be 0, got :%d", ctr.waiters) + if ctr.count() != 1 { + t.Fatalf("expected waiters to be 1, got :%d", ctr.waiters) } chDone := make(chan struct{}) @@ -59,7 +59,7 @@ func TestLockerLock(t *testing.T) { chWaiting := make(chan struct{}) go func() { for range time.Tick(1 * time.Millisecond) { - if ctr.count() == 1 { + if ctr.count() == 2 { close(chWaiting) break } @@ -88,6 +88,10 @@ func TestLockerLock(t *testing.T) { t.Fatalf("lock should have completed") } + if err := l.Unlock("test"); err != nil { + t.Fatal(err) + } + if ctr.count() != 0 { t.Fatalf("expected waiters to be 0, got: %d", ctr.count()) } @@ -165,3 +169,149 @@ func TestTryLock(t *testing.T) { } } } + +func TestRWLockerRLock(t *testing.T) { + l := New() + l.RLock("test") + ctr := l.locks["test"] + + if ctr.count() != 1 { + t.Fatalf("expected waiters to be 1, got :%d", ctr.waiters) + } + + chDone := make(chan struct{}) + go func() { + l.RLock("test") + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(3 * time.Second): + t.Fatalf("lock should have completed") + } + + if err := l.RUnlock("test"); err != nil { + t.Fatal(err) + } + + if ctr.count() != 1 { + t.Fatalf("expected waiters to be 1, got: %d", ctr.count()) + } + + if _, exists := l.locks["test"]; !exists { + t.Fatal("expected lock not to be deleted") + } + + if err := l.RUnlock("test"); err != nil { + t.Fatal(err) + } + + if ctr.count() != 0 { + t.Fatalf("expected waiters to be 0, got: %d", ctr.count()) + } + + if _, exists := l.locks["test"]; exists { + t.Fatal("expected lock to be deleted") + } +} + +func TestLockRLock(t *testing.T) { + l := New() + + // RLock after Lock + l.RLock("test") + + chDone := make(chan struct{}) + go func() { + l.Lock("test") + close(chDone) + }() + + select { + case <-chDone: + t.Fatal("lock should not have returned while it was still held") + default: + } + + if err := l.RUnlock("test"); err != nil { + t.Fatal(err) + } + + select { + case <-chDone: + case <-time.After(3 * time.Second): + t.Fatalf("lock should have completed") + } + + if err := l.Unlock("test"); err != nil { + t.Fatal(err) + } + + // Lock after RLock + l.Lock("test") + + chDone = make(chan struct{}) + go func() { + l.RLock("test") + close(chDone) + }() + + select { + case <-chDone: + t.Fatal("lock should not have returned while it was still held") + default: + } + + if err := l.Unlock("test"); err != nil { + t.Fatal(err) + } + + select { + case <-chDone: + case <-time.After(3 * time.Second): + t.Fatalf("lock should have completed") + } + + if err := l.RUnlock("test"); err != nil { + t.Fatal(err) + } +} + +func TestRWLockerConcurrency(t *testing.T) { + l := New() + + var wg sync.WaitGroup + for i := 0; i <= 1000; i++ { + wg.Add(1) + go func(i int) { + if i%2 == 0 { + l.Lock("test") + // if there is a concurrency issue, will very likely panic here + assert.NoError(t, l.Unlock("test")) + } else { + l.RLock("test") + // if there is a concurrency issue, will very likely panic here + assert.NoError(t, l.RUnlock("test")) + } + wg.Done() + }(i) + } + + chDone := make(chan struct{}) + go func() { + wg.Wait() + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for locks to complete") + } + + // Since everything has unlocked this should not exist anymore + if ctr, exists := l.locks["test"]; exists { + t.Fatalf("lock should not exist: %v", ctr) + } +} diff --git a/server/backend/sync/locker.go b/server/backend/sync/locker.go index 4d8eaa96d..e7291d1f1 100644 --- a/server/backend/sync/locker.go +++ b/server/backend/sync/locker.go @@ -73,6 +73,12 @@ type Locker interface { // Unlock unlocks the mutex. Unlock(ctx context.Context) error + + // RLock acquires a read lock with a cancelable context. + RLock(ctx context.Context) error + + // RUnlock releases a read lock previously acquired by RLock. + RUnlock(ctx context.Context) error } type internalLocker struct { @@ -104,3 +110,19 @@ func (il *internalLocker) Unlock(_ context.Context) error { return nil } + +// RLock locks the mutex for reading.. +func (il *internalLocker) RLock(_ context.Context) error { + il.locks.RLock(il.key) + + return nil +} + +// RUnlock unlocks the read lock. +func (il *internalLocker) RUnlock(_ context.Context) error { + if err := il.locks.RUnlock(il.key); err != nil { + return err + } + + return nil +} diff --git a/server/rpc/yorkie_server.go b/server/rpc/yorkie_server.go index 7625ca4b6..d223e7fb9 100644 --- a/server/rpc/yorkie_server.go +++ b/server/rpc/yorkie_server.go @@ -303,6 +303,7 @@ func (s *yorkieServer) PushPullChanges( } project := projects.From(ctx) + if pack.HasChanges() { locker, err := s.backend.Locker.NewLocker( ctx, @@ -519,6 +520,7 @@ func (s *yorkieServer) RemoveDocument( } project := projects.From(ctx) + if pack.HasChanges() { locker, err := s.backend.Locker.NewLocker(ctx, packs.PushPullKey(project.ID, pack.DocumentKey)) if err != nil { diff --git a/test/bench/locker_bench_test.go b/test/bench/locker_bench_test.go index 266e013c2..434e05075 100644 --- a/test/bench/locker_bench_test.go +++ b/test/bench/locker_bench_test.go @@ -22,6 +22,7 @@ package bench import ( + "fmt" "math/rand" "strconv" "testing" @@ -65,3 +66,30 @@ func BenchmarkLockerMoreKeys(b *testing.B) { } }) } + +func BenchmarkRWLocker(b *testing.B) { + b.SetParallelism(128) + + rates := []int{2, 10, 100, 1000} + for _, rate := range rates { + b.Run(fmt.Sprintf("RWLock rate %d", rate), func(b *testing.B) { + benchmarkRWLockerParallel(rate, b) + }) + } +} + +func benchmarkRWLockerParallel(rate int, b *testing.B) { + l := locker.New() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if rand.Intn(rate) == 0 { + l.Lock("test") + assert.NoError(b, l.Unlock("test")) + } else { + l.RLock("test") + assert.NoError(b, l.RUnlock("test")) + } + } + }) +}