Skip to content

Commit

Permalink
using the lock uuid id as the key for the locks in the lock factory (#30
Browse files Browse the repository at this point in the history
)
  • Loading branch information
skeeey authored Dec 19, 2023
1 parent aeb2ee9 commit 7e47485
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
73 changes: 38 additions & 35 deletions pkg/db/advisory_locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ func (f *AdvisoryLockFactory) NewAdvisoryLock(ctx context.Context, id string, lo
// obtain the advisory lock (blocking)
if err := lock.lock(); err != nil {
UpdateAdvisoryLockCountMetric(lockType, "lock error")
log.Error("Error obtaining the advisory lock")
return "", err
errMsg := fmt.Sprintf("error obtaining the advisory lock for id %s type %s, %v", id, lockType, err)
log.Error(errMsg)
// the lock transaction is already started, if error happens, we return the transaction id, so that the caller
// can end this transaction.
return *lock.uuid, fmt.Errorf(errMsg)
}

f.locks[fmt.Sprintf("%s-%s", id, lockType)] = lock
log.V(4).Info(fmt.Sprintf("Locked advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid))
f.locks[*lock.uuid] = lock
return *lock.uuid, nil
}

Expand All @@ -79,11 +83,15 @@ func (f *AdvisoryLockFactory) NewNonBlockingLock(ctx context.Context, id string,
acquired, err := lock.nonBlockingLock()
if err != nil {
UpdateAdvisoryLockCountMetric(lockType, "lock error")
log.Error(fmt.Sprintf("Error obtaining the non blocking advisory lock for id %s", id))
return "", false, err
errMsg := fmt.Sprintf("error obtaining the non blocking advisory lock for id %s type %s, %v", id, lockType, err)
log.Error(errMsg)
// the lock transaction is already started, if error happens, we return the transaction id, so that the caller
// can end this transaction.
return *lock.uuid, false, fmt.Errorf(errMsg)
}

f.locks[fmt.Sprintf("%s-%s", id, lockType)] = lock
log.V(4).Info(fmt.Sprintf("Locked non blocking advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid))
f.locks[*lock.uuid] = lock
return *lock.uuid, acquired, nil
}

Expand All @@ -108,41 +116,36 @@ func (f *AdvisoryLockFactory) newLock(ctx context.Context, id string, lockType L
func (f *AdvisoryLockFactory) Unlock(ctx context.Context, uuid string) {
log := logger.NewOCMLogger(ctx)

for k, lock := range f.locks {
if lock.uuid == nil {
log.Error("lockOwnerID could not be found in AdvisoryLock")
continue
}

if *lock.uuid != uuid {
continue
}
if uuid == "" {
return
}

lockType := *lock.lockType
lockID := "<missing>"
if lock.id != nil {
lockID = *lock.id
}
lock, ok := f.locks[uuid]
if !ok {
// the resolving UUID belongs to a service call that did *not* initiate the lock.
// we can safely ignore this, knowing the top-most func in the call stack
// will provide the correct UUID.
log.V(4).Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid))
return
}

if err := lock.unlock(); err != nil {
UpdateAdvisoryLockCountMetric(lockType, "unlock error")
log.Extra("lockID", lockID).Extra("owner", uuid).Error(fmt.Sprintf("Could not unlock, %v", err))
}
lockType := *lock.lockType
lockID := "<missing>"
if lock.id != nil {
lockID = *lock.id
}

UpdateAdvisoryLockCountMetric(lockType, "OK")
UpdateAdvisoryLockDurationMetric(lockType, "OK", lock.startTime)
if err := lock.unlock(); err != nil {
UpdateAdvisoryLockCountMetric(lockType, "unlock error")
log.Extra("lockID", lockID).Extra("owner", uuid).Error(fmt.Sprintf("Could not unlock, %v", err))
}

log.Info(fmt.Sprintf("Unlocked lock id=%s - owner=%s", lockID, uuid))
UpdateAdvisoryLockCountMetric(lockType, "OK")
UpdateAdvisoryLockDurationMetric(lockType, "OK", lock.startTime)

delete(f.locks, k)
return
}
log.V(4).Info(fmt.Sprintf("Unlocked lock id=%s type=%s - owner=%s", lockID, lockType, uuid))

// the resolving UUID belongs to a service call that did *not* initiate the lock.
// we can safely ignore this, knowing the top-most func in the call stack
// will provide the correct UUID.
// This will happen frequently as many pkg/service functions participate in locks.
log.Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid))
delete(f.locks, uuid)
}

// AdvisoryLock represents a postgres advisory lock
Expand Down
14 changes: 14 additions & 0 deletions test/integration/dinosaurs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,20 @@ func TestUpdateDinosaurWithRacingRequests(t *testing.T) {

// the dinosaur patch request is protected by the advisory lock, so there should only be one update
Expect(updatedCount).To(Equal(1))

// all the locks should be released finally
Eventually(func() error {
var count int
err := h.DBFactory.DirectDB().
QueryRow("select count(*) from pg_locks where locktype='advisory';").
Scan(&count)
Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err)

if count != 0 {
return fmt.Errorf("there are %d unreleased advisory lock", count)
}
return nil
}, 5*time.Second, 1*time.Second).Should(Succeed())
}

func TestUpdateDinosaurWithRacingRequests_WithoutLock(t *testing.T) {
Expand Down

0 comments on commit 7e47485

Please sign in to comment.