Skip to content

Commit

Permalink
add basic redis client config to dlock server config
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandreLamarre committed Jan 29, 2024
1 parent 55c7fd1 commit 583d3ad
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 21 deletions.
5 changes: 3 additions & 2 deletions pkg/config/v1alpha1/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package v1alpha1

type LockServerConfig struct {
EtcdStorageSpec *EtcdStorageSpec `json:"etcd,omitempty"`
JetStreamStorageSpec *JetStreamStorageSpec `json:"jetstream,omitempty"`
EtcdClientSpec *EtcdClientSpec `json:"etcd,omitempty"`
JetstreamClientSpec *JetstreamClientSpec `json:"jetstream,omitempty"`
RedisClientSpec *RedisClientSpec `json:"redis,omitempty"`
}

type TracesConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/v1alpha1/etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package v1alpha1

type EtcdStorageSpec struct {
type EtcdClientSpec struct {
// List of etcd endpoints to connect to.
Endpoints []string `json:"endpoints,omitempty"`
// Configuration for etcd client-cert auth.
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/v1alpha1/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package v1alpha1

type JetStreamStorageSpec struct {
type JetstreamClientSpec struct {
Endpoint string `json:"endpoint,omitempty"`
NkeySeedPath string `json:"nkeySeedPath,omitempty"`
}
6 changes: 6 additions & 0 deletions pkg/config/v1alpha1/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package v1alpha1

type RedisClientSpec struct {
Network string `json:"network,omitempty"`
Addr string `json:"addr,omitempty"`
}
2 changes: 1 addition & 1 deletion pkg/lock/backend/etcd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

func NewEtcdClient(ctx context.Context, conf *v1alpha1.EtcdStorageSpec) (*clientv3.Client, error) {
func NewEtcdClient(ctx context.Context, conf *v1alpha1.EtcdClientSpec) (*clientv3.Client, error) {
var tlsConfig *tls.Config
if conf.Certs != nil {
var err error
Expand Down
2 changes: 1 addition & 1 deletion pkg/lock/backend/jetstream/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func sanitizePrefix(prefix string) string {
return strings.ReplaceAll(strings.ReplaceAll(prefix, "/", "-"), ".", "_")
}

func AcquireJetstreamConn(ctx context.Context, conf *v1alpha1.JetStreamStorageSpec, lg *slog.Logger) (nats.JetStreamContext, error) {
func AcquireJetstreamConn(ctx context.Context, conf *v1alpha1.JetstreamClientSpec, lg *slog.Logger) (nats.JetStreamContext, error) {
options := []nats.Option{
nats.MaxReconnects(-1),
nats.RetryOnFailedConnect(true),
Expand Down
23 changes: 19 additions & 4 deletions pkg/lock/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,41 @@ import (
"github.com/alexandreLamarre/dlock/pkg/lock"
"github.com/alexandreLamarre/dlock/pkg/lock/backend/etcd"
"github.com/alexandreLamarre/dlock/pkg/lock/backend/jetstream"
"github.com/alexandreLamarre/dlock/pkg/lock/backend/redis"
"github.com/alexandreLamarre/dlock/pkg/logger"
goredislib "github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/trace"
)

func NewLockManager(ctx context.Context, tracer trace.Tracer, lg *slog.Logger, config *v1alpha1.LockServerConfig) lock.LockManager {
if config.EtcdStorageSpec != nil {
cli, err := etcd.NewEtcdClient(ctx, config.EtcdStorageSpec)
if config.EtcdClientSpec != nil {
lg.Info("acquiring etcd client ...")
cli, err := etcd.NewEtcdClient(ctx, config.EtcdClientSpec)
if err != nil {
lg.With(logger.Err(err)).Warn("failed to acquired etcd client")
}
return etcd.NewEtcdLockManager(cli, "lock", tracer, lg)
}

if config.JetStreamStorageSpec != nil {
cli, err := jetstream.AcquireJetstreamConn(ctx, config.JetStreamStorageSpec, lg)
if config.JetstreamClientSpec != nil {
lg.Info("acquiring jetstream client ...")
cli, err := jetstream.AcquireJetstreamConn(ctx, config.JetstreamClientSpec, lg)
if err != nil {
lg.With(logger.Err(err)).Warn("failed to acquired jetstream client")
}
return jetstream.NewLockManager(ctx, cli, "lock", tracer, lg)
}

if config.RedisClientSpec != nil {
lg.Info("acquiring redis client ...")
cli := redis.AcquireRedisPool([]*goredislib.Options{
{
Addr: config.RedisClientSpec.Addr,
Network: config.RedisClientSpec.Network,
},
})
return redis.NewLockManager(ctx, "lock", cli, lg)
}

return nil
}
18 changes: 7 additions & 11 deletions pkg/test/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ func (e *Environment) Stop(cause ...string) error {
e.Logger.Info("Stopping test environment")
}

os.Unsetenv("NATS_SERVER_URL")
os.Unsetenv("NKEY_SEED_FILENAME")

if e.cancel != nil {
e.cancel()
var wg sync.WaitGroup
Expand Down Expand Up @@ -141,6 +138,7 @@ func (e *Environment) StartRedis() ([]*goredislib.Options, error) {
e.addShutdownHook(func() {
server.Term()
})
e.Logger.Info("Redis server started", "socket", server.Socket())
return []*goredislib.Options{
{
Network: "unix",
Expand All @@ -149,7 +147,7 @@ func (e *Environment) StartRedis() ([]*goredislib.Options, error) {
}

// FIXME: set the ports to freeports
func (e *Environment) StartEtcd() (*v1alpha1.EtcdStorageSpec, error) {
func (e *Environment) StartEtcd() (*v1alpha1.EtcdClientSpec, error) {
conf := etcdserver.NewConfig()
if err := conf.Validate(); err != nil {
panic(err)
Expand All @@ -169,12 +167,13 @@ func (e *Environment) StartEtcd() (*v1alpha1.EtcdStorageSpec, error) {
e.addShutdownHook(func() {
server.Close()
})
return &v1alpha1.EtcdStorageSpec{
e.Logger.Info("Etcd server started", "endpoints", etcdserver.DefaultAdvertiseClientURLs)
return &v1alpha1.EtcdClientSpec{
Endpoints: []string{etcdserver.DefaultAdvertiseClientURLs},
}, nil
}

func (e *Environment) StartJetstream() (*v1alpha1.JetStreamStorageSpec, error) {
func (e *Environment) StartJetstream() (*v1alpha1.JetstreamClientSpec, error) {
ports := freeport.GetFreePorts(1)

opts := natstest.DefaultTestOptions
Expand All @@ -190,13 +189,10 @@ func (e *Environment) StartJetstream() (*v1alpha1.JetStreamStorageSpec, error) {
if !e.embeddedJS.ReadyForConnections(2 * time.Second) {
return nil, errors.New("starting nats server: timeout")
}
e.Logger.Info("Jetstream server started", "port", ports[0])

sUrl := fmt.Sprintf("nats://127.0.0.1:%d", ports[0])
return &v1alpha1.JetStreamStorageSpec{
return &v1alpha1.JetstreamClientSpec{
Endpoint: sUrl,
}, nil
}

func StartEtcd() *v1alpha1.EtcdStorageSpec {
return &v1alpha1.EtcdStorageSpec{}
}

0 comments on commit 583d3ad

Please sign in to comment.