Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing some extra lock modes #54

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/lock/backend/etcd/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ type EtcdLock struct {
options *lock.LockOptions

scheduler *lock.LockScheduler

// !! Cannot reuse *concurrency.Session across multiple locks since it will break liveliness guarantee A
// locks will share their sessions and therefore keepalives will be sent for all locks, not just a specific lock.
// In the current implementation sessions are forcibly orphaned when the non-blocking call to unlock is
// made so we cannot re-use sessions in that case either -- since the session will be orphaned for all locks
// if the session is re-used.
client *clientv3.Client
mutex *etcdMutex
}
Expand Down
26 changes: 21 additions & 5 deletions internal/lock/backend/etcd/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/alexandreLamarre/dlock/internal/lock/backend/unimplemented"
"github.com/alexandreLamarre/dlock/pkg/constants"
"github.com/alexandreLamarre/dlock/pkg/lock"
"github.com/alexandreLamarre/dlock/pkg/lock/broker"
Expand Down Expand Up @@ -83,11 +84,6 @@ func (e *EtcdLockManager) Health(ctx context.Context) (conditions []string, err
return
}

// !! Cannot reuse *concurrency.Session across multiple locks since it will break liveliness guarantee A
// locks will share their sessions and therefore keepalives will be sent for all locks, not just a specific lock.
// In the current implementation sessions are forcibly orphaned when the non-blocking call to unlock is
// made so we cannot re-use sessions in that case either -- since the session will be orphaned for all locks
// if the session is re-used.
func (e *EtcdLockManager) NewLock(key string, opts ...lock.LockOption) lock.Lock {
options := lock.DefaultLockOptions()
options.Apply(opts...)
Expand All @@ -99,3 +95,23 @@ func (e *EtcdLockManager) NewLock(key string, opts ...lock.LockOption) lock.Lock
options,
)
}

func (e *EtcdLockManager) EXLock(key string, opts ...lock.LockOption) lock.Lock {
return e.NewLock(key, opts...)
}

func (e *EtcdLockManager) PWLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (e *EtcdLockManager) PRLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (e *EtcdLockManager) CWLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (e *EtcdLockManager) CRLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}
24 changes: 24 additions & 0 deletions internal/lock/backend/jetstream/concurrent_read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jetstream

import (
"context"

"github.com/alexandreLamarre/dlock/pkg/lock"
)

type crLock struct {
}

var _ lock.Lock = (*crLock)(nil)

func (l *crLock) Lock(ctx context.Context) (expired <-chan struct{}, err error) {
return nil, lock.ErrLockTypeNotImplemented
}

func (l *crLock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) {
return false, nil, lock.ErrLockTypeNotImplemented
}

func (l *crLock) Unlock() error {
return lock.ErrLockTypeNotImplemented
}
24 changes: 24 additions & 0 deletions internal/lock/backend/jetstream/concurrent_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jetstream

import (
"context"

"github.com/alexandreLamarre/dlock/pkg/lock"
)

type cwLock struct {
}

var _ lock.Lock = (*cwLock)(nil)

func (l *cwLock) Lock(ctx context.Context) (expired <-chan struct{}, err error) {
return nil, lock.ErrLockTypeNotImplemented
}

func (l *cwLock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) {
return false, nil, lock.ErrLockTypeNotImplemented
}

func (l *cwLock) Unlock() error {
return lock.ErrLockTypeNotImplemented
}
134 changes: 134 additions & 0 deletions internal/lock/backend/jetstream/exclusive_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package jetstream

import (
"context"
"errors"
"log/slog"
"strings"

"github.com/alexandreLamarre/dlock/pkg/lock"
backoffv2 "github.com/lestrrat-go/backoff/v2"
"github.com/nats-io/nats.go"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
)

type Lock struct {

Check failure on line 16 in internal/lock/backend/jetstream/exclusive_lock.go

View workflow job for this annotation

GitHub Actions / build

other declaration of Lock

Check failure on line 16 in internal/lock/backend/jetstream/exclusive_lock.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of Lock
prefix string
key string

js nats.JetStreamContext
*lock.LockOptions

scheduler *lock.LockScheduler
mutex *jetstreamMutex

lg *slog.Logger
}

var _ lock.Lock = (*Lock)(nil)

func NewLock(js nats.JetStreamContext, prefix, key string, lg *slog.Logger, options *lock.LockOptions) *Lock {

Check failure on line 31 in internal/lock/backend/jetstream/exclusive_lock.go

View workflow job for this annotation

GitHub Actions / build

other declaration of NewLock

Check failure on line 31 in internal/lock/backend/jetstream/exclusive_lock.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of NewLock
return &Lock{
prefix: prefix,
key: key,
js: js,
lg: lg.With("key", key),
LockOptions: options,
scheduler: lock.NewLockScheduler(),
}
}

func (l *Lock) Key() string {
return l.key
}

func (l *Lock) acquire(ctx context.Context, retrier *backoffv2.Policy) (<-chan struct{}, error) {
var curErr error
mutex := newJetstreamMutex(l.lg, l.js, l.prefix, l.key, l.LockOptions)
done, err := mutex.tryLock()
curErr = err
if err == nil {
l.mutex = &mutex
return done, nil
}
if retrier != nil {
ret := *retrier
acq := ret.Start(ctx)
for backoffv2.Continue(acq) {
done, err := mutex.tryLock()
curErr = err
if err == nil {
l.mutex = &mutex
return done, nil
}
}
return nil, errors.Join(ctx.Err(), curErr)
}
return nil, curErr
}

func (l *Lock) lock(ctx context.Context, retrier *backoffv2.Policy) (<-chan struct{}, error) {
if l.Tracer != nil {
ctxSpan, span := l.Tracer.Start(ctx, "Lock/jetstream-lock", trace.WithAttributes())
defer span.End()
ctx = ctxSpan
}
// https://github.com/lestrrat-go/backoff/issues/31
ctxca, ca := context.WithCancel(ctx)
defer ca()

var closureDone <-chan struct{}
if err := l.scheduler.Schedule(func() error {
done, err := l.acquire(ctxca, retrier)
if err != nil {
return err
}
closureDone = done
return nil
}); err != nil {
return nil, err
}
return closureDone, nil
}

func (l *Lock) Lock(ctx context.Context) (<-chan struct{}, error) {
retry := lo.ToPtr(backoffv2.Constant(
backoffv2.WithMaxRetries(0),
backoffv2.WithInterval(LockRetryDelay),
backoffv2.WithJitterFactor(0.1),
))
return l.lock(ctx, retry)
}

func (l *Lock) Unlock() error {
if err := l.scheduler.Done(func() error {
if l.mutex == nil {
panic("never acquired")
}
mutex := *l.mutex
go func() {
if err := mutex.unlock(); err != nil {
l.lg.Error(err.Error())
}
}()
l.mutex = nil
return nil
}); err != nil {
return err
}
return nil
}

func (l *Lock) TryLock(ctx context.Context) (acquired bool, done <-chan struct{}, err error) {
closureDone, err := l.lock(ctx, nil)
if err != nil {
// hack : jetstream client does not have a stronly typed error for : maxium consumers limit reached
if strings.Contains(err.Error(), "maximum consumers limit reached") {
// the request has gone through but someone else has the lock
return false, nil, nil
}
return false, nil, err
}
return true, closureDone, nil
}
20 changes: 20 additions & 0 deletions internal/lock/backend/jetstream/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log/slog"

"github.com/alexandreLamarre/dlock/internal/lock/backend/unimplemented"
"github.com/alexandreLamarre/dlock/pkg/constants"
"github.com/alexandreLamarre/dlock/pkg/lock"
"github.com/alexandreLamarre/dlock/pkg/lock/broker"
Expand Down Expand Up @@ -73,3 +74,22 @@ func (l *LockManager) NewLock(key string, opts ...lock.LockOption) lock.Lock {
options.Apply(opts...)
return NewLock(l.js, l.prefix, key, l.lg, options)
}

func (l *LockManager) EXLock(key string, opts ...lock.LockOption) lock.Lock {
return l.NewLock(key, opts...)
}

func (l *LockManager) PWLock(_ string, _ ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (l *LockManager) PRLock(_ string, _ ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}
func (l *LockManager) CWLock(_ string, _ ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (l *LockManager) CRLock(_ string, _ ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}
24 changes: 24 additions & 0 deletions internal/lock/backend/jetstream/protected_read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jetstream

import (
"context"

"github.com/alexandreLamarre/dlock/pkg/lock"
)

type prLock struct {
}

var _ lock.Lock = (*prLock)(nil)

func (l *prLock) Lock(ctx context.Context) (expired <-chan struct{}, err error) {
return nil, lock.ErrLockTypeNotImplemented
}

func (l *prLock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) {
return false, nil, lock.ErrLockTypeNotImplemented
}

func (l *prLock) Unlock() error {
return lock.ErrLockTypeNotImplemented
}
24 changes: 24 additions & 0 deletions internal/lock/backend/jetstream/protected_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jetstream

import (
"context"

"github.com/alexandreLamarre/dlock/pkg/lock"
)

type pwLock struct {
}

var _ lock.Lock = (*pwLock)(nil)

func (l *pwLock) Lock(ctx context.Context) (expired <-chan struct{}, err error) {
return nil, lock.ErrLockTypeNotImplemented
}

func (l *pwLock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) {
return false, nil, lock.ErrLockTypeNotImplemented
}

func (l *pwLock) Unlock() error {
return lock.ErrLockTypeNotImplemented
}
21 changes: 21 additions & 0 deletions internal/lock/backend/redis/lock_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"log/slog"

"github.com/alexandreLamarre/dlock/internal/lock/backend/unimplemented"
"github.com/alexandreLamarre/dlock/pkg/constants"
"github.com/alexandreLamarre/dlock/pkg/lock"
"github.com/alexandreLamarre/dlock/pkg/lock/broker"
Expand Down Expand Up @@ -84,3 +85,23 @@ func (lm *LockManager) NewLock(key string, opt ...lock.LockOption) lock.Lock {
options.Apply(opt...)
return NewLock(lm.pools, lm.quorum, lm.prefix, key, lm.lg, options)
}

func (lm *LockManager) EXLock(key string, opts ...lock.LockOption) lock.Lock {
return lm.NewLock(key, opts...)
}

func (lm *LockManager) PWLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (lm *LockManager) PRLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (lm *LockManager) CWLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}

func (lm *LockManager) CRLock(key string, opts ...lock.LockOption) lock.Lock {
return &unimplemented.UnimplementedLock{}
}
23 changes: 23 additions & 0 deletions internal/lock/backend/unimplemented/unimplemented.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package unimplemented

import (
"context"

"github.com/alexandreLamarre/dlock/pkg/lock"
)

type UnimplementedLock struct{}

var _ lock.Lock = (*UnimplementedLock)(nil)

func (u *UnimplementedLock) Lock(ctx context.Context) (expired <-chan struct{}, err error) {
return nil, lock.ErrLockTypeNotImplemented
}

func (u *UnimplementedLock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) {
return false, nil, lock.ErrLockTypeNotImplemented
}

func (u *UnimplementedLock) Unlock() error {
return lock.ErrLockTypeNotImplemented
}
Loading
Loading