From 7e47485f3dfa2b646ca8e067b2c4c260d2816907 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 20 Dec 2023 00:43:27 +0800 Subject: [PATCH] using the lock uuid id as the key for the locks in the lock factory (#30) --- pkg/db/advisory_locks.go | 73 ++++++++++++++++-------------- test/integration/dinosaurs_test.go | 14 ++++++ 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/pkg/db/advisory_locks.go b/pkg/db/advisory_locks.go index c854ad4b..3d5f385e 100644 --- a/pkg/db/advisory_locks.go +++ b/pkg/db/advisory_locks.go @@ -59,11 +59,15 @@ func (f *AdvisoryLockFactory) NewAdvisoryLock(ctx context.Context, id string, lo // obtain the advisory lock (blocking) if err := lock.lock(); err != nil { UpdateAdvisoryLockCountMetric(lockType, "lock error") - log.Error("Error obtaining the advisory lock") - return "", err + errMsg := fmt.Sprintf("error obtaining the advisory lock for id %s type %s, %v", id, lockType, err) + log.Error(errMsg) + // the lock transaction is already started, if error happens, we return the transaction id, so that the caller + // can end this transaction. + return *lock.uuid, fmt.Errorf(errMsg) } - f.locks[fmt.Sprintf("%s-%s", id, lockType)] = lock + log.V(4).Info(fmt.Sprintf("Locked advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) + f.locks[*lock.uuid] = lock return *lock.uuid, nil } @@ -79,11 +83,15 @@ func (f *AdvisoryLockFactory) NewNonBlockingLock(ctx context.Context, id string, acquired, err := lock.nonBlockingLock() if err != nil { UpdateAdvisoryLockCountMetric(lockType, "lock error") - log.Error(fmt.Sprintf("Error obtaining the non blocking advisory lock for id %s", id)) - return "", false, err + errMsg := fmt.Sprintf("error obtaining the non blocking advisory lock for id %s type %s, %v", id, lockType, err) + log.Error(errMsg) + // the lock transaction is already started, if error happens, we return the transaction id, so that the caller + // can end this transaction. + return *lock.uuid, false, fmt.Errorf(errMsg) } - f.locks[fmt.Sprintf("%s-%s", id, lockType)] = lock + log.V(4).Info(fmt.Sprintf("Locked non blocking advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) + f.locks[*lock.uuid] = lock return *lock.uuid, acquired, nil } @@ -108,41 +116,36 @@ func (f *AdvisoryLockFactory) newLock(ctx context.Context, id string, lockType L func (f *AdvisoryLockFactory) Unlock(ctx context.Context, uuid string) { log := logger.NewOCMLogger(ctx) - for k, lock := range f.locks { - if lock.uuid == nil { - log.Error("lockOwnerID could not be found in AdvisoryLock") - continue - } - - if *lock.uuid != uuid { - continue - } + if uuid == "" { + return + } - lockType := *lock.lockType - lockID := "" - if lock.id != nil { - lockID = *lock.id - } + lock, ok := f.locks[uuid] + if !ok { + // the resolving UUID belongs to a service call that did *not* initiate the lock. + // we can safely ignore this, knowing the top-most func in the call stack + // will provide the correct UUID. + log.V(4).Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid)) + return + } - if err := lock.unlock(); err != nil { - UpdateAdvisoryLockCountMetric(lockType, "unlock error") - log.Extra("lockID", lockID).Extra("owner", uuid).Error(fmt.Sprintf("Could not unlock, %v", err)) - } + lockType := *lock.lockType + lockID := "" + if lock.id != nil { + lockID = *lock.id + } - UpdateAdvisoryLockCountMetric(lockType, "OK") - UpdateAdvisoryLockDurationMetric(lockType, "OK", lock.startTime) + if err := lock.unlock(); err != nil { + UpdateAdvisoryLockCountMetric(lockType, "unlock error") + log.Extra("lockID", lockID).Extra("owner", uuid).Error(fmt.Sprintf("Could not unlock, %v", err)) + } - log.Info(fmt.Sprintf("Unlocked lock id=%s - owner=%s", lockID, uuid)) + UpdateAdvisoryLockCountMetric(lockType, "OK") + UpdateAdvisoryLockDurationMetric(lockType, "OK", lock.startTime) - delete(f.locks, k) - return - } + log.V(4).Info(fmt.Sprintf("Unlocked lock id=%s type=%s - owner=%s", lockID, lockType, uuid)) - // the resolving UUID belongs to a service call that did *not* initiate the lock. - // we can safely ignore this, knowing the top-most func in the call stack - // will provide the correct UUID. - // This will happen frequently as many pkg/service functions participate in locks. - log.Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid)) + delete(f.locks, uuid) } // AdvisoryLock represents a postgres advisory lock diff --git a/test/integration/dinosaurs_test.go b/test/integration/dinosaurs_test.go index c333cfd9..8e983a8f 100644 --- a/test/integration/dinosaurs_test.go +++ b/test/integration/dinosaurs_test.go @@ -219,6 +219,20 @@ func TestUpdateDinosaurWithRacingRequests(t *testing.T) { // the dinosaur patch request is protected by the advisory lock, so there should only be one update Expect(updatedCount).To(Equal(1)) + + // all the locks should be released finally + Eventually(func() error { + var count int + err := h.DBFactory.DirectDB(). + QueryRow("select count(*) from pg_locks where locktype='advisory';"). + Scan(&count) + Expect(err).NotTo(HaveOccurred(), "Error querying pg_locks: %v", err) + + if count != 0 { + return fmt.Errorf("there are %d unreleased advisory lock", count) + } + return nil + }, 5*time.Second, 1*time.Second).Should(Succeed()) } func TestUpdateDinosaurWithRacingRequests_WithoutLock(t *testing.T) {