diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e30e61f..042760e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -18,7 +18,13 @@ jobs: - uses: actions/setup-go@v4 with: go-version: ">=1.21.0" - - name: Install + - name: Install redis server package + run: | + curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + sudo chmod 644 /usr/share/keyrings/redis-archive-keyring.gpg + echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list + sudo apt-get update && sudo apt-get install redis-stack-server + - name: Install dev binaries run: make install - name: Build run: make build diff --git a/.gitignore b/.gitignore index 6246d0d..5f25506 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ testbin/ # data directories default.etcd/ +*.rdb # dev configs .vscode diff --git a/Makefile b/Makefile index f4841a6..c3299ab 100644 --- a/Makefile +++ b/Makefile @@ -3,14 +3,16 @@ GOCMD=go GO_BUILD_FLAGS= -GOBUILDSERVER=$(GOCMD) build $(GO_BUILD_FLAGS) -o ./bin/dlock ./cmd/dlock -GOBUILDCLI=$(GOCMD) build $(GO_BUILD_FLAGS) -o ./bin/dlockctl ./cmd/dlockctl +GOBUILDSERVER=$(GOCMD) build $(GO_BUILD_FLAGS) -ldflags "-w -s" -o ./bin/dlock ./cmd/dlock +GOBUILDCLI=$(GOCMD) build $(GO_BUILD_FLAGS) -ldflags "-w -s" -o ./bin/dlockctl ./cmd/dlockctl GOOS=$(shell go env GOOS) GOARCH=$(shell go env GOARCH) NATS_VERSION=v2.10.9 NATS_BIN=nats-server-$(NATS_VERSION)-$(GOOS)-$(GOARCH) ETCD_VERSION=v3.5.11 ETCD_BIN=etcd-$(ETCD_VERSION)-$(GOOS)-$(GOARCH) +REDIS_VERSION=7.2.0 +REDIS_BIN=redis-stack-server-$(REDIS_VERSION)-v8-x86_64.AppImage install: go install github.com/bufbuild/buf/cmd/buf@v1.29.0 @@ -34,6 +36,9 @@ testbin: wget https://github.com/etcd-io/etcd/releases/download/$(ETCD_VERSION)/$(ETCD_BIN).tar.gz tar -C ./testbin -zxvf $(ETCD_BIN).tar.gz --strip-components=1 $(ETCD_BIN)/etcd rm $(ETCD_BIN).tar.gz + wget https://packages.redis.io/redis-stack/$(REDIS_BIN) + chmod +x $(REDIS_BIN) + mv $(REDIS_BIN) ./testbin/redis-server clean: rm -rf ./bin diff --git a/README.md b/README.md index 47fcd42..149d9ab 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,7 @@ As apposed to standard single-host locking mechanisms, distributed locks can be | :-------------------------------------------------------: | :----------------: | :-: | :-: | :-: | :-: | :-: | | [Jetstream](https://docs.nats.io/nats-concepts/jetstream) | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | | [Etcd ](https://etcd.io/) | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | +| [Redis ](https://redis.io/) | :white_check_mark: | :x: | :x: | :x: | :x: | :x: | ## Dlock specific guarantees @@ -119,3 +120,4 @@ As apposed to standard single-host locking mechanisms, distributed locks can be - [Distributed Lock Manager](https://en.wikipedia.org/wiki/Distributed_lock_manager). (n.d.). In Wikipedia. Retrieved from https://en.wikipedia.org/wiki/Distributed_lock_manager - Kleppmann, Martin. "Designing Data-Intensive Applications." (2019). +- [Redis redlock algorithm](https://redis.io/docs/manual/patterns/distributed-locks/) from https://redis.io/docs/manual/patterns/distributed-locks/ diff --git a/docs/TODO.md b/docs/TODO.md index 3bb3d16..2959bf7 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -16,7 +16,7 @@ ## Backends - [ ] Filesystem-based distributed locks using `flock` -- [ ] Redis support +- [x] Redis support - [ ] Embedded raft / mini-raft support ## Testing @@ -27,4 +27,4 @@ ## Miscellaneous - [ ] Examples folder for distributed lock usecases -- [ ] Internal metrics & traces +- [x] Server & internal metrics & traces instrumentation diff --git a/go.mod b/go.mod index cdf55f9..31e97fd 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,10 @@ replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.8 replace google.golang.org/grpc => google.golang.org/grpc v1.61.0 +replace github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 => github.com/alexandreLamarre/tempredis v0.0.0-20240129193023-7f411f64c2c7 + require ( + github.com/go-redsync/redsync/v4 v4.11.0 github.com/google/uuid v1.6.0 github.com/jwalton/go-supportscolor v1.2.0 github.com/kralicky/gpkg v0.0.0-20240119195700-64f32830b14f @@ -16,10 +19,13 @@ require ( github.com/onsi/ginkgo/v2 v2.15.0 github.com/onsi/gomega v1.31.1 github.com/prometheus/client_golang v1.18.0 + github.com/redis/go-redis/v9 v9.0.2 + github.com/redis/rueidis v1.0.19 github.com/samber/lo v1.39.0 github.com/samber/slog-multi v1.0.2 github.com/samber/slog-sampling v1.4.0 github.com/spf13/cobra v1.8.0 + github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 go.etcd.io/etcd/client/v3 v3.5.11 go.etcd.io/etcd/server/v3 v3.5.11 @@ -47,6 +53,7 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cornelk/hashmap v1.0.8 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 75553f0..3c19213 100644 --- a/go.sum +++ b/go.sum @@ -1142,6 +1142,8 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= +github.com/alexandreLamarre/tempredis v0.0.0-20240129193023-7f411f64c2c7 h1:jTMPs0rYyrP5xVUuX0iZpYu9UICZKdPnZ45xw8gHiu4= +github.com/alexandreLamarre/tempredis v0.0.0-20240129193023-7f411f64c2c7/go.mod h1:mpTEMdh0NBnmDxuJ8UACOTeLZjX+yG7vGwF2dTfmj0g= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= @@ -1156,6 +1158,10 @@ github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw= github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ= +github.com/bsm/ginkgo/v2 v2.5.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8= +github.com/bsm/gomega v1.20.0/go.mod h1:JifAceMQ4crZIWYUKrlGcmbN3bqHogVTADMD2ATsbwk= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -1193,6 +1199,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -1232,6 +1240,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= +github.com/go-redsync/redsync/v4 v4.11.0 h1:OPEcAxHBb95EzfwCKWM93ksOwHd5bTce2BD4+R14N6k= +github.com/go-redsync/redsync/v4 v4.11.0/go.mod h1:ZfayzutkgeBmEmBlUR3j+rF6kN44UUGtEdfzhBFZTPc= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -1279,6 +1289,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= @@ -1477,6 +1489,10 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= +github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= +github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo= +github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/pkg/config/v1alpha1/config.go b/pkg/config/v1alpha1/config.go index b010eff..24495a9 100644 --- a/pkg/config/v1alpha1/config.go +++ b/pkg/config/v1alpha1/config.go @@ -1,8 +1,9 @@ package v1alpha1 type LockServerConfig struct { - EtcdStorageSpec *EtcdStorageSpec `json:"etcd,omitempty"` - JetStreamStorageSpec *JetStreamStorageSpec `json:"jetstream,omitempty"` + EtcdClientSpec *EtcdClientSpec `json:"etcd,omitempty"` + JetstreamClientSpec *JetstreamClientSpec `json:"jetstream,omitempty"` + RedisClientSpec *RedisClientSpec `json:"redis,omitempty"` } type TracesConfig struct { diff --git a/pkg/config/v1alpha1/etcd.go b/pkg/config/v1alpha1/etcd.go index 5dc6b66..c6948ac 100644 --- a/pkg/config/v1alpha1/etcd.go +++ b/pkg/config/v1alpha1/etcd.go @@ -1,6 +1,6 @@ package v1alpha1 -type EtcdStorageSpec struct { +type EtcdClientSpec struct { // List of etcd endpoints to connect to. Endpoints []string `json:"endpoints,omitempty"` // Configuration for etcd client-cert auth. diff --git a/pkg/config/v1alpha1/jetstream.go b/pkg/config/v1alpha1/jetstream.go index 47d3cce..ff847b8 100644 --- a/pkg/config/v1alpha1/jetstream.go +++ b/pkg/config/v1alpha1/jetstream.go @@ -1,6 +1,6 @@ package v1alpha1 -type JetStreamStorageSpec struct { +type JetstreamClientSpec struct { Endpoint string `json:"endpoint,omitempty"` NkeySeedPath string `json:"nkeySeedPath,omitempty"` } diff --git a/pkg/config/v1alpha1/redis.go b/pkg/config/v1alpha1/redis.go new file mode 100644 index 0000000..4096e27 --- /dev/null +++ b/pkg/config/v1alpha1/redis.go @@ -0,0 +1,6 @@ +package v1alpha1 + +type RedisClientSpec struct { + Network string `json:"network,omitempty"` + Addr string `json:"addr,omitempty"` +} diff --git a/pkg/lock/backend/etcd/util.go b/pkg/lock/backend/etcd/util.go index 4da66f8..8af5d9f 100644 --- a/pkg/lock/backend/etcd/util.go +++ b/pkg/lock/backend/etcd/util.go @@ -10,7 +10,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func NewEtcdClient(ctx context.Context, conf *v1alpha1.EtcdStorageSpec) (*clientv3.Client, error) { +func NewEtcdClient(ctx context.Context, conf *v1alpha1.EtcdClientSpec) (*clientv3.Client, error) { var tlsConfig *tls.Config if conf.Certs != nil { var err error diff --git a/pkg/lock/backend/jetstream/jetstream_suite_test.go b/pkg/lock/backend/jetstream/jetstream_suite_test.go index 0fe452b..c6dcdb7 100644 --- a/pkg/lock/backend/jetstream/jetstream_suite_test.go +++ b/pkg/lock/backend/jetstream/jetstream_suite_test.go @@ -45,7 +45,7 @@ var _ = BeforeSuite(func() { js, "test", nil, - logger.New().WithGroup("js-lock"), + logger.NewNop(), ) lmF.Set(lm) @@ -66,13 +66,13 @@ var _ = BeforeSuite(func() { js3, err := jetstream.AcquireJetstreamConn( context.Background(), conf, - logger.New(), + logger.NewNop(), ) Expect(err).NotTo(HaveOccurred()) - x := jetstream.NewLockManager(context.Background(), js1, "test", nil, logger.New()) - y := jetstream.NewLockManager(context.Background(), js2, "test", nil, logger.New()) - z := jetstream.NewLockManager(context.Background(), js3, "test", nil, logger.New()) + x := jetstream.NewLockManager(context.Background(), js1, "test", nil, logger.NewNop()) + y := jetstream.NewLockManager(context.Background(), js2, "test", nil, logger.NewNop()) + z := jetstream.NewLockManager(context.Background(), js3, "test", nil, logger.NewNop()) lmSetF.Set(lo.Tuple3[lock.LockManager, lock.LockManager, lock.LockManager]{ A: x, B: y, C: z, diff --git a/pkg/lock/backend/jetstream/util.go b/pkg/lock/backend/jetstream/util.go index c0cdb41..54c2e7f 100644 --- a/pkg/lock/backend/jetstream/util.go +++ b/pkg/lock/backend/jetstream/util.go @@ -17,7 +17,7 @@ func sanitizePrefix(prefix string) string { return strings.ReplaceAll(strings.ReplaceAll(prefix, "/", "-"), ".", "_") } -func AcquireJetstreamConn(ctx context.Context, conf *v1alpha1.JetStreamStorageSpec, lg *slog.Logger) (nats.JetStreamContext, error) { +func AcquireJetstreamConn(ctx context.Context, conf *v1alpha1.JetstreamClientSpec, lg *slog.Logger) (nats.JetStreamContext, error) { options := []nats.Option{ nats.MaxReconnects(-1), nats.RetryOnFailedConnect(true), diff --git a/pkg/lock/backend/redis/errors.go b/pkg/lock/backend/redis/errors.go new file mode 100644 index 0000000..cefe434 --- /dev/null +++ b/pkg/lock/backend/redis/errors.go @@ -0,0 +1,30 @@ +package redis + +import ( + "errors" + "fmt" +) + +var ErrExtendFailed = errors.New("redsync: failed to extend lock") + +// A RedisError is an error communicating with one of the Redis nodes. +type RedisError struct { + Node int + Err error +} + +func (err RedisError) Error() string { + return fmt.Sprintf("node %d: %v", err.Node, err.Err) +} + +// ErrNodeTaken is the error resulting if the lock is already taken in one of +// the cluster's nodes +type ErrNodeTaken struct { + Node int +} + +func (err ErrNodeTaken) Error() string { + return fmt.Sprintf("node #%d: lock already taken", err.Node) +} + +var ErrTaken = errors.New("lock already taken") diff --git a/pkg/lock/backend/redis/lock.go b/pkg/lock/backend/redis/lock.go new file mode 100644 index 0000000..40ba8a2 --- /dev/null +++ b/pkg/lock/backend/redis/lock.go @@ -0,0 +1,155 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/alexandreLamarre/dlock/pkg/lock" + "github.com/alexandreLamarre/dlock/pkg/logger" + "github.com/go-redsync/redsync/v4/redis" + backoffv2 "github.com/lestrrat-go/backoff/v2" + "github.com/samber/lo" +) + +var ( + LockExpiry time.Duration = 60 * time.Second + LockRetryDelay time.Duration = 100 * time.Millisecond + LockExtendDelay time.Duration = 333 * time.Millisecond + LockDriftFactor float64 = 0.01 + LockTimeoutFactor float64 = 0.05 +) + +type Lock struct { + pools []redis.Pool + quorum int + + prefix string + key string + lg *slog.Logger + + scheduler *lock.LockScheduler + mutex *redisMutex + + *lock.LockOptions +} + +func NewLock( + pools []redis.Pool, + quorum int, + prefix, key string, + lg *slog.Logger, + opts *lock.LockOptions, +) *Lock { + return &Lock{ + prefix: prefix, + key: key, + pools: pools, + quorum: quorum, + lg: lg, + scheduler: lock.NewLockScheduler(), + LockOptions: opts, + } +} + +var _ lock.Lock = (*Lock)(nil) + +func (l *Lock) Lock(ctx context.Context) (expired <-chan struct{}, err error) { + retry := lo.ToPtr( + backoffv2.Constant( + backoffv2.WithMaxRetries(0), + backoffv2.WithInterval(LockRetryDelay), + backoffv2.WithJitterFactor(0.1), + ), + ) + return l.lock(ctx, retry) +} + +func (l *Lock) TryLock(ctx context.Context) (acquired bool, expired <-chan struct{}, err error) { + closureDone, err := l.lock(ctx, nil) + if err != nil { + if errors.Is(err, ErrTaken) { + l.lg.Debug( + fmt.Sprintf( + "lock already acquired by someone else : >= quorum(%d)", + l.quorum, + ), + ) + return false, nil, nil + } + l.lg.With(logger.Err(err)).Error("failed to acquire lock") + return false, nil, err + } + return true, closureDone, nil +} + +func (l *Lock) lock(ctx context.Context, retrier *backoffv2.Policy) (expired <-chan struct{}, err error) { + if l.Tracer != nil { + ctxSpan, span := l.Tracer.Start(ctx, "Lock/redis-lock") + defer span.End() + ctx = ctxSpan + } + // https://github.com/lestrrat-go/backoff/issues/31 + ctxca, ca := context.WithCancel(ctx) + defer ca() + + var closureDone <-chan struct{} + if err := l.scheduler.Schedule(func() error { + done, err := l.acquire(ctxca, retrier) + if err != nil { + return err + } + closureDone = done + return nil + }); err != nil { + return nil, err + } + + return closureDone, nil +} + +func (l *Lock) acquire(ctx context.Context, retrier *backoffv2.Policy) (<-chan struct{}, error) { + var curErr error + mutex := newRedisMutex(l.prefix, l.key, l.quorum, l.pools, l.lg, l.LockOptions) + done, err := mutex.lock(ctx) + curErr = err + if err == nil { + l.mutex = &mutex + return done, nil + } + if retrier != nil { + ret := *retrier + acq := ret.Start(ctx) + for backoffv2.Continue(acq) { + done, err := mutex.lock(ctx) + curErr = err + if err == nil { + l.mutex = &mutex + return done, nil + } + } + return nil, errors.Join(ctx.Err(), curErr) + } + return nil, curErr +} + +func (l *Lock) Unlock() error { + if err := l.scheduler.Done(func() error { + if l.mutex == nil { + return nil + } + mutex := *l.mutex + go func() { + if unlocked, err := mutex.unlock(); err != nil { + l.lg.With(logger.Err(err), "unlocked", unlocked).Warn("failed to unlock") + } + }() + l.mutex = nil + return nil + }); err != nil { + return err + } + return nil +} diff --git a/pkg/lock/backend/redis/lock_manager.go b/pkg/lock/backend/redis/lock_manager.go new file mode 100644 index 0000000..672b51f --- /dev/null +++ b/pkg/lock/backend/redis/lock_manager.go @@ -0,0 +1,42 @@ +package redis + +import ( + "context" + "log/slog" + + "github.com/alexandreLamarre/dlock/pkg/lock" + "github.com/go-redsync/redsync/v4/redis" +) + +type LockManager struct { + ctx context.Context + pools []redis.Pool + quorum int + + prefix string + + lg *slog.Logger +} + +var _ lock.LockManager = (*LockManager)(nil) + +func NewLockManager( + ctx context.Context, + prefix string, + pools []redis.Pool, + lg *slog.Logger, +) *LockManager { + return &LockManager{ + ctx: ctx, + pools: pools, + prefix: prefix, + quorum: len(pools)/2 + 1, + lg: lg, + } +} + +func (lm *LockManager) NewLock(key string, opt ...lock.LockOption) lock.Lock { + options := lock.DefaultLockOptions() + options.Apply(opt...) + return NewLock(lm.pools, lm.quorum, lm.prefix, key, lm.lg, options) +} diff --git a/pkg/lock/backend/redis/mutex.go b/pkg/lock/backend/redis/mutex.go new file mode 100644 index 0000000..ce384c0 --- /dev/null +++ b/pkg/lock/backend/redis/mutex.go @@ -0,0 +1,295 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/alexandreLamarre/dlock/pkg/lock" + "github.com/alexandreLamarre/dlock/pkg/logger" + "github.com/go-redsync/redsync/v4/redis" + "github.com/google/uuid" + "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type redisMutex struct { + lg *slog.Logger + prefix string + mutexKey string + + internalDone chan struct{} + *lock.LockOptions + + quorum int + pools []redis.Pool + + uuid string + + // TODO : make better + until time.Time + + // TODO : all the following are unused + // expiry time.Duration + // driftFactor float64 // nolint:unused + // timeoutFactor float64 + // fencingToken string // this should add extra consistency, can be added to genValue instead + // parentCtx context.Context +} + +func newRedisMutex( + prefix, key string, + quorum int, + pools []redis.Pool, + lg *slog.Logger, + opts *lock.LockOptions, +) redisMutex { + return redisMutex{ + lg: lg.With("prefix", prefix, "key", key, "quorum", quorum), + prefix: prefix, + mutexKey: key, + internalDone: make(chan struct{}), + LockOptions: opts, + quorum: quorum, + pools: pools, + } +} + +func (m *redisMutex) scopedToken() string { + return uuid.New().String() +} + +func (m *redisMutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) { + type result struct { + Node int + Status bool + Err error + } + + ch := make(chan result) + for node, pool := range m.pools { + go func(node int, pool redis.Pool) { + r := result{Node: node} + r.Status, r.Err = actFn(pool) + ch <- r + }(node, pool) + } + n := 0 + var taken []int + var err error + for range m.pools { + r := <-ch + if r.Status { + n++ + } else if r.Err != nil { + err = errors.Join(err, &RedisError{Node: r.Node, Err: r.Err}) + } else { + taken = append(taken, r.Node) + err = errors.Join(err, &ErrNodeTaken{Node: r.Node}) + } + } + if len(taken) >= m.quorum { + m.lg.With("taken", taken).Debug("consensus reached elsewhere on given operation") + return n, ErrTaken + } + return n, err +} + +func (m *redisMutex) key() string { + return m.prefix + "-" + m.mutexKey +} + +func (m *redisMutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) { + m.lg.With("fenced", value).Debug("acquiring lock...") + conn, err := pool.Get(ctx) + if err != nil { + return false, err + } + defer conn.Close() + reply, err := conn.SetNX(m.key(), value, LockExpiry) + if err != nil { + m.lg.With("fenced", value).Error("failed to acquire lock", logger.Err(err)) + return false, err + } + m.lg.With("fenced", value).Debug(fmt.Sprintf("acquired lock? %v", reply)) + return reply, nil +} + +func (m *redisMutex) lock(ctx context.Context) (<-chan struct{}, error) { + uuid := m.scopedToken() + + m.uuid = uuid + + start := time.Now() + + n, lockErr := func() (int, error) { + ctx, ca := context.WithTimeout(ctx, ackTimeoutFactor()) + defer ca() + return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { + return m.acquire(ctx, pool, uuid) + }) + }() + + now := time.Now() + expiredC := lo.Async(m.keepalive) + + until := now.Add(LockExpiry - now.Sub(start) - expiryDriftFactor()) + if n >= m.quorum && now.Before(until) { + m.lg.Debug("lock acquired and valid") + m.uuid = uuid + m.until = until + return expiredC, nil + } + + m.lg.Debug("lock not acquired, or lock acquired but already timed out") + // otherwise, lock should already be expired, due to latency in the system + func() (int, error) { + ctx, ca := context.WithTimeout(ctx, LockExpiry) + defer ca() + return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { + return m.release(ctx, pool, uuid) + }) + }() + + return expiredC, lockErr +} + +func (m *redisMutex) teardown() { + defer close(m.internalDone) + select { + case m.internalDone <- struct{}{}: + default: + } +} + +func (m *redisMutex) unlock() (bool, error) { + defer m.teardown() + m.lg.Debug("unlock requested") + ctx := context.Background() + var span trace.Span + if m.Tracer != nil { + ctx, span = m.Tracer.Start(context.Background(), "Unlock/redis-unlock", trace.WithAttributes( + attribute.KeyValue{ + Key: "key", + Value: attribute.StringValue(m.key()), + }, + )) + defer span.End() + } + + ctx, ca := context.WithTimeout(context.Background(), LockExpiry) + defer ca() + + n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { + return m.release(ctx, pool, m.uuid) + }) + if n < m.quorum { + m.lg.With(logger.Err(err)).Warn("failed to release lock no consensus : ") + if span != nil { + span.RecordError(err) + } + return false, err + } + return true, nil +} + +var deleteScript = redis.NewScript(1, ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end +`) + +func (m *redisMutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) { + m.lg.With("fenced", m.uuid).Debug("release lock requested...") + conn, err := pool.Get(ctx) + if err != nil { + return false, err + } + defer conn.Close() + status, err := conn.Eval(deleteScript, m.key(), value) + if err != nil { + return false, err + } + m.lg.With("fenced", m.uuid).Debug(fmt.Sprintf("release lock status : %d", status)) + return status != int64(0), nil +} + +var touchScript = redis.NewScript(1, ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("PEXPIRE", KEYS[1], ARGV[2]) + else + return 0 + end +`) + +func (m *redisMutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) { + conn, err := pool.Get(ctx) + if err != nil { + return false, nil + } + defer conn.Close() + status, err := conn.Eval(touchScript, m.key(), value) + if err != nil { + return false, err + } + m.lg.With("fenced", m.uuid).Debug(fmt.Sprintf("touch lock status : %d", status)) + return status != int64(0), nil +} + +func expiryDriftFactor() time.Duration { + return time.Duration(int64(float64(LockExpiry) * LockDriftFactor)) +} + +func ackTimeoutFactor() time.Duration { + return time.Duration(int64(float64(LockExpiry) * LockTimeoutFactor)) +} + +func (m *redisMutex) extend(ctx context.Context) (bool, error) { + m.lg.Debug("extending lock expiry...") + start := time.Now() + n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { + // cast to milliseconds + return m.touch(ctx, pool, m.uuid, int(LockExpiry/time.Millisecond)) + }) + if n < m.quorum { + m.lg.With(logger.Err(err)).Warn("failed to extend lock expiry : ") + return false, err + } + now := time.Now() + until := now.Add(LockExpiry - now.Sub(start) - expiryDriftFactor()) + if now.Before(until) { + m.until = until + return true, nil + } + m.lg.Warn("failed to extend lock expiry : lock already expired") + return false, ErrExtendFailed +} + +func (m *redisMutex) keepalive() struct{} { + // TODO : maybe replace with unused parentCtx + ctx := context.TODO() + t := time.NewTicker(LockExtendDelay) + defer t.Stop() + for { + select { + case <-m.internalDone: + return struct{}{} + case <-ctx.Done(): + return struct{}{} + case <-t.C: + extended, err := m.extend(ctx) + if err != nil { + m.lg.With(logger.Err(err), "extended", extended).Warn("failed to extend lock") + } + now := time.Now() + if now.After(m.until) { + return struct{}{} + } + } + } +} diff --git a/pkg/lock/backend/redis/redis_suite_test.go b/pkg/lock/backend/redis/redis_suite_test.go new file mode 100644 index 0000000..a05eb0c --- /dev/null +++ b/pkg/lock/backend/redis/redis_suite_test.go @@ -0,0 +1,62 @@ +package redis_test + +import ( + "context" + "testing" + + "github.com/alexandreLamarre/dlock/pkg/lock" + "github.com/alexandreLamarre/dlock/pkg/lock/backend/redis" + "github.com/alexandreLamarre/dlock/pkg/logger" + "github.com/alexandreLamarre/dlock/pkg/test" + "github.com/alexandreLamarre/dlock/pkg/test/conformance/integration" + "github.com/alexandreLamarre/dlock/pkg/util/future" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" +) + +func TestRedis(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Redis Suite") +} + +var lmF = future.New[lock.LockManager]() +var lmSetF = future.New[lo.Tuple3[ + lock.LockManager, lock.LockManager, lock.LockManager, +]]() + +var _ = BeforeSuite(func() { + if Label("integration").MatchesLabelFilter(GinkgoLabelFilter()) { + env := test.Environment{} + Expect(env.Start()).To(Succeed()) + + conf, err := env.StartRedis() + GinkgoWriter.Write([]byte("started redis....")) + Expect(err).NotTo(HaveOccurred()) + pools := redis.AcquireRedisPool(conf) + + lm := redis.NewLockManager( + context.Background(), + "test", + pools, + logger.NewNop(), + ) + lmF.Set(lm) + + pool1 := redis.AcquireRedisPool(conf) + pool2 := redis.AcquireRedisPool(conf) + pool3 := redis.AcquireRedisPool(conf) + + x := redis.NewLockManager(context.Background(), "test", pool1, logger.NewNop()) + y := redis.NewLockManager(context.Background(), "test", pool2, logger.NewNop()) + z := redis.NewLockManager(context.Background(), "test", pool3, logger.NewNop().WithGroup("redis-lock-pool3")) + + lmSetF.Set(lo.Tuple3[lock.LockManager, lock.LockManager, lock.LockManager]{ + A: x, B: y, C: z, + }) + + DeferCleanup(env.Stop, "Test Suite Finished") + } +}) + +var _ = Describe("Redis Lock Manager", Ordered, Label("integration", "slow"), integration.LockManagerTestSuite(lmF, lmSetF)) diff --git a/pkg/lock/backend/redis/util.go b/pkg/lock/backend/redis/util.go new file mode 100644 index 0000000..cf936b0 --- /dev/null +++ b/pkg/lock/backend/redis/util.go @@ -0,0 +1,97 @@ +package redis + +import ( + "context" + "strings" + "time" + + "github.com/go-redsync/redsync/v4/redis" + redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v9" + goredislib "github.com/redis/go-redis/v9" + + "github.com/redis/rueidis" + "github.com/redis/rueidis/rueidiscompat" +) + +func AcquireRedisPool( + clients []*goredislib.Options, +) []redis.Pool { + pools := make([]redis.Pool, len(clients)) + for i, clientOps := range clients { + client := goredislib.NewClient(clientOps) + pools[i] = redsyncgoredis.NewPool(client) + } + return pools +} + +// The following code is copied from: +// https://github.com/go-redsync/redsync/blob/master/redis/rueidis/rueidis.go + +type pool struct { + delegate rueidiscompat.Cmdable +} + +func (p *pool) Get(ctx context.Context) (redis.Conn, error) { + if ctx == nil { + ctx = context.Background() + } + return &conn{p.delegate, ctx}, nil +} + +// NewPool returns a rueidis-based pool implementation. +func NewPool(delegate rueidiscompat.Cmdable) redis.Pool { + return &pool{delegate} +} + +type conn struct { + delegate rueidiscompat.Cmdable + ctx context.Context +} + +func (c *conn) Get(name string) (string, error) { + value, err := c.delegate.Get(c.ctx, name).Result() + return value, noErrNil(err) +} + +func (c *conn) Set(name string, value string) (bool, error) { + reply, err := c.delegate.Set(c.ctx, name, value, 0).Result() + return reply == "OK", err +} + +func (c *conn) SetNX(name string, value string, expiry time.Duration) (bool, error) { + return c.delegate.SetNX(c.ctx, name, value, expiry).Result() +} + +func (c *conn) PTTL(name string) (time.Duration, error) { + return c.delegate.PTTL(c.ctx, name).Result() +} + +func (c *conn) Eval(script *redis.Script, keysAndArgs ...interface{}) (interface{}, error) { + keys := make([]string, script.KeyCount) + args := keysAndArgs + + if script.KeyCount > 0 { + for i := 0; i < script.KeyCount; i++ { + keys[i] = keysAndArgs[i].(string) + } + args = keysAndArgs[script.KeyCount:] + } + + v, err := c.delegate.EvalSha(c.ctx, script.Hash, keys, args...).Result() + if err != nil && strings.Contains(err.Error(), "NOSCRIPT ") { + v, err = c.delegate.Eval(c.ctx, script.Src, keys, args...).Result() + } + return v, noErrNil(err) +} + +func (c *conn) Close() error { + // Not needed for this library + return nil +} + +func noErrNil(err error) error { + if err == rueidis.Nil { + return nil + } + return err +} diff --git a/pkg/lock/broker/broker.go b/pkg/lock/broker/broker.go index 7f10c8a..702b171 100644 --- a/pkg/lock/broker/broker.go +++ b/pkg/lock/broker/broker.go @@ -8,26 +8,41 @@ import ( "github.com/alexandreLamarre/dlock/pkg/lock" "github.com/alexandreLamarre/dlock/pkg/lock/backend/etcd" "github.com/alexandreLamarre/dlock/pkg/lock/backend/jetstream" + "github.com/alexandreLamarre/dlock/pkg/lock/backend/redis" "github.com/alexandreLamarre/dlock/pkg/logger" + goredislib "github.com/redis/go-redis/v9" "go.opentelemetry.io/otel/trace" ) func NewLockManager(ctx context.Context, tracer trace.Tracer, lg *slog.Logger, config *v1alpha1.LockServerConfig) lock.LockManager { - if config.EtcdStorageSpec != nil { - cli, err := etcd.NewEtcdClient(ctx, config.EtcdStorageSpec) + if config.EtcdClientSpec != nil { + lg.Info("acquiring etcd client ...") + cli, err := etcd.NewEtcdClient(ctx, config.EtcdClientSpec) if err != nil { lg.With(logger.Err(err)).Warn("failed to acquired etcd client") } return etcd.NewEtcdLockManager(cli, "lock", tracer, lg) } - if config.JetStreamStorageSpec != nil { - cli, err := jetstream.AcquireJetstreamConn(ctx, config.JetStreamStorageSpec, lg) + if config.JetstreamClientSpec != nil { + lg.Info("acquiring jetstream client ...") + cli, err := jetstream.AcquireJetstreamConn(ctx, config.JetstreamClientSpec, lg) if err != nil { lg.With(logger.Err(err)).Warn("failed to acquired jetstream client") } return jetstream.NewLockManager(ctx, cli, "lock", tracer, lg) } + if config.RedisClientSpec != nil { + lg.Info("acquiring redis client ...") + cli := redis.AcquireRedisPool([]*goredislib.Options{ + { + Addr: config.RedisClientSpec.Addr, + Network: config.RedisClientSpec.Network, + }, + }) + return redis.NewLockManager(ctx, "lock", cli, lg) + } + return nil } diff --git a/pkg/test/testutil.go b/pkg/test/testutil.go index 33e005d..a93abd3 100644 --- a/pkg/test/testutil.go +++ b/pkg/test/testutil.go @@ -18,6 +18,8 @@ import ( natstest "github.com/nats-io/nats-server/v2/test" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega/gexec" + goredislib "github.com/redis/go-redis/v9" + "github.com/stvp/tempredis" etcdserver "go.etcd.io/etcd/server/v3/embed" ) @@ -59,9 +61,6 @@ func (e *Environment) Stop(cause ...string) error { e.Logger.Info("Stopping test environment") } - os.Unsetenv("NATS_SERVER_URL") - os.Unsetenv("NKEY_SEED_FILENAME") - if e.cancel != nil { e.cancel() var wg sync.WaitGroup @@ -75,12 +74,6 @@ func (e *Environment) Stop(cause ...string) error { } wg.Wait() } - if e.embeddedJS != nil { - e.embeddedJS.Shutdown() - } - if e.embeddedEtcd != nil { - e.embeddedEtcd.Close() - } // if e.mockCtrl != nil { // e.mockCtrl.Finish() // } @@ -137,8 +130,27 @@ func StartCmd(cmd *exec.Cmd) (Session, error) { } +func (e *Environment) StartRedis() ([]*goredislib.Options, error) { + server, err := tempredis.Start( + tempredis.Config{}, + tempredis.WithWriter(ginkgo.GinkgoWriter), + ) + if err != nil { + return nil, err + } + e.addShutdownHook(func() { + server.Term() + }) + e.Logger.Info("Redis server started", "socket", server.Socket()) + return []*goredislib.Options{ + { + Network: "unix", + Addr: server.Socket(), + }}, nil +} + // FIXME: set the ports to freeports -func (e *Environment) StartEtcd() (*v1alpha1.EtcdStorageSpec, error) { +func (e *Environment) StartEtcd() (*v1alpha1.EtcdClientSpec, error) { conf := etcdserver.NewConfig() if err := conf.Validate(); err != nil { panic(err) @@ -155,30 +167,35 @@ func (e *Environment) StartEtcd() (*v1alpha1.EtcdStorageSpec, error) { return nil, err } e.embeddedEtcd = server - return &v1alpha1.EtcdStorageSpec{ + e.addShutdownHook(func() { + server.Close() + }) + e.Logger.Info("Etcd server started", "endpoints", etcdserver.DefaultAdvertiseClientURLs) + return &v1alpha1.EtcdClientSpec{ Endpoints: []string{etcdserver.DefaultAdvertiseClientURLs}, }, nil } -func (e *Environment) StartJetstream() (*v1alpha1.JetStreamStorageSpec, error) { +func (e *Environment) StartJetstream() (*v1alpha1.JetstreamClientSpec, error) { ports := freeport.GetFreePorts(1) opts := natstest.DefaultTestOptions opts.Port = ports[0] opts.StoreDir = e.tempDir - e.embeddedJS = natstest.RunServer(&opts) + server := natstest.RunServer(&opts) + e.embeddedJS = server + e.addShutdownHook(func() { + server.Shutdown() + }) e.embeddedJS.EnableJetStream(nil) if !e.embeddedJS.ReadyForConnections(2 * time.Second) { return nil, errors.New("starting nats server: timeout") } + e.Logger.Info("Jetstream server started", "port", ports[0]) sUrl := fmt.Sprintf("nats://127.0.0.1:%d", ports[0]) - return &v1alpha1.JetStreamStorageSpec{ + return &v1alpha1.JetstreamClientSpec{ Endpoint: sUrl, }, nil } - -func StartEtcd() *v1alpha1.EtcdStorageSpec { - return &v1alpha1.EtcdStorageSpec{} -}