Skip to content

Commit

Permalink
Merge pull request #4 from alex123012/big-refactor
Browse files Browse the repository at this point in the history
Refactor proxy, add new retry flag, add tests
  • Loading branch information
alex123012 authored Jun 22, 2023
2 parents 28075be + 05db9e4 commit 118d5fb
Show file tree
Hide file tree
Showing 18 changed files with 757 additions and 106 deletions.
18 changes: 11 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
FROM golang:1.17 AS builder
FROM golang:1.20 AS builder
LABEL Andrey Kolashtov <[email protected]>

ADD . /redis-sentinel-proxy/
WORKDIR /redis-sentinel-proxy
RUN go mod init redis-sentinel-proxy && \
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o redis-sentinel-proxy .
WORKDIR /src
COPY go.mod go.sum /src/
RUN go mod download

FROM alpine:3.14
COPY main.go Makefile /src/
COPY pkg /src/pkg

COPY --from=builder /redis-sentinel-proxy/redis-sentinel-proxy /usr/local/bin/redis-sentinel-proxy
RUN make build CGO_ENABLED=0 GOOS=linux GOARCH=amd64

FROM alpine:3.17

COPY --from=builder /src/bin/redis-sentinel-proxy /usr/local/bin/redis-sentinel-proxy
RUN apk --update --no-cache add redis

ENTRYPOINT ["/usr/local/bin/redis-sentinel-proxy"]
Expand Down
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)
CGO_ENABLED ?= 0

.PHONY: test
test:
go test -v ./...
cd test && ./test.sh

.PHONY: build
build:
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -ldflags '-s -w -extldflags "-static"' -o bin/redis-sentinel-proxy .
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ Small command utility that:

* Proxies all tcp requests that it receives on `PORT` to that master


Usage:

`./redis-sentinel-proxy -listen IP:PORT -sentinel :SENTINEL_PORT -master NAME`
`./redis-sentinel-proxy -listen IP:PORT -sentinel :SENTINEL_PORT -master NAME --resolve-retries 10`

testing
============
- install `docker` and `docker-compose`.
- run `make test`
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/flant/redis-sentinel-proxy

go 1.20

require golang.org/x/sync v0.2.0
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
134 changes: 37 additions & 97 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,120 +1,60 @@
package main

import (
"errors"
"context"
"flag"
"fmt"
"io"
"log"
"net"
"strings"
"time"
)

var (
masterAddr *net.TCPAddr
raddr *net.TCPAddr
saddr *net.TCPAddr
"os"
"os/signal"
"syscall"

localAddr = flag.String("listen", ":9999", "local address")
sentinelAddr = flag.String("sentinel", ":26379", "remote address")
masterName = flag.String("master", "", "name of the master redis node")
masterresolver "github.com/flant/redis-sentinel-proxy/pkg/master_resolver"
"github.com/flant/redis-sentinel-proxy/pkg/proxy"
"golang.org/x/sync/errgroup"
)

func main() {
var (
localAddr = ":9999"
sentinelAddr = ":26379"
masterName = "mymaster"
masterResolveRetries = 3
)

flag.StringVar(&localAddr, "listen", localAddr, "local address")
flag.StringVar(&sentinelAddr, "sentinel", sentinelAddr, "remote address")
flag.StringVar(&masterName, "master", masterName, "name of the master redis node")
flag.IntVar(&masterResolveRetries, "resolve-retries", masterResolveRetries, "number of consecutive retries of the redis master node resolve")
flag.Parse()

laddr, err := net.ResolveTCPAddr("tcp", *localAddr)
if err != nil {
log.Fatal("Failed to resolve local address: %s", err)
}
saddr, err = net.ResolveTCPAddr("tcp", *sentinelAddr)
if err != nil {
log.Fatal("Failed to resolve sentinel address: %s", err)
}

go master()

listener, err := net.ListenTCP("tcp", laddr)
if err != nil {
log.Fatal(err)
}

for {
conn, err := listener.AcceptTCP()
if err != nil {
log.Println(err)
continue
}

go proxy(conn, masterAddr)
}
}

func master() {
var err error
for {
masterAddr, err = getMasterAddr(saddr, *masterName)
if err != nil {
log.Println(err)
}
time.Sleep(1 * time.Second)
}
}

func pipe(r io.Reader, w io.WriteCloser) {
io.Copy(w, r)
w.Close()
}

func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr) {
d := net.Dialer{Timeout: 1 * time.Second}
remote, err := d.Dial("tcp", remoteAddr.String())
if err != nil {
log.Println(err)
local.Close()
return
if err := runProxying(localAddr, sentinelAddr, masterName, masterResolveRetries); err != nil {
log.Fatalln(err)
}
go pipe(local, remote)
go pipe(remote, local)
log.Println("Exiting...")
}

func getMasterAddr(sentinelAddress *net.TCPAddr, masterName string) (*net.TCPAddr, error) {
conn, err := net.DialTCP("tcp", nil, sentinelAddress)
if err != nil {
return nil, err
}

defer conn.Close()
func runProxying(localAddr, sentinelAddr, masterName string, masterResolveRetries int) error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

conn.Write([]byte(fmt.Sprintf("sentinel get-master-addr-by-name %s\n", masterName)))
laddr := resolveTCPAddr(localAddr)
saddr := resolveTCPAddr(sentinelAddr)

b := make([]byte, 256)
_, err = conn.Read(b)
if err != nil {
log.Fatal(err)
}
masterAddrResolver := masterresolver.NewRedisMasterResolver(masterName, saddr, masterResolveRetries)
rsp := proxy.NewRedisSentinelProxy(laddr, masterAddrResolver)

parts := strings.Split(string(b), "\r\n")

if len(parts) < 5 {
err = errors.New("Couldn't get master address from sentinel")
return nil, err
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return masterAddrResolver.UpdateMasterAddressLoop(ctx) })
eg.Go(func() error { return rsp.Run(ctx) })

//getting the string address for the master node
stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4])
addr, err := net.ResolveTCPAddr("tcp", stringaddr)
return eg.Wait()
}

func resolveTCPAddr(addr string) *net.TCPAddr {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}

//check that there's actually someone listening on that address
conn2, err := net.DialTCP("tcp", nil, addr)
if err == nil {
defer conn2.Close()
log.Fatalf("Failed resolving tcp address: %s", err)
}

return addr, err
return tcpAddr
}
136 changes: 136 additions & 0 deletions pkg/master_resolver/master_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package masterresolver

import (
"context"
"errors"
"fmt"
"log"
"net"
"strings"
"sync"
"time"

"github.com/flant/redis-sentinel-proxy/pkg/utils"
)

type RedisMasterResolver struct {
masterName string
sentinelAddr *net.TCPAddr
retryOnMasterResolveFail int

masterAddrLock *sync.RWMutex
initialMasterResolveLock chan struct{}

masterAddr string
}

func NewRedisMasterResolver(masterName string, sentinelAddr *net.TCPAddr, retryOnMasterResolveFail int) *RedisMasterResolver {
return &RedisMasterResolver{
masterName: masterName,
sentinelAddr: sentinelAddr,
retryOnMasterResolveFail: retryOnMasterResolveFail,
masterAddrLock: &sync.RWMutex{},
initialMasterResolveLock: make(chan struct{}),
}
}

func (r *RedisMasterResolver) MasterAddress() string {
<-r.initialMasterResolveLock

r.masterAddrLock.RLock()
defer r.masterAddrLock.RUnlock()
return r.masterAddr
}

func (r *RedisMasterResolver) setMasterAddress(masterAddr *net.TCPAddr) {
r.masterAddrLock.Lock()
defer r.masterAddrLock.Unlock()
r.masterAddr = masterAddr.String()
}

func (r *RedisMasterResolver) updateMasterAddress() error {
masterAddr, err := redisMasterFromSentinelAddr(r.sentinelAddr, r.masterName)
if err != nil {
log.Println(err)
return err
}
r.setMasterAddress(masterAddr)
return nil
}

func (r *RedisMasterResolver) UpdateMasterAddressLoop(ctx context.Context) error {
if err := r.initialMasterAdressResolve(); err != nil {
return err
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var err error
for errCount := 0; errCount <= r.retryOnMasterResolveFail; {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}

err = r.updateMasterAddress()
if err != nil {
errCount++
} else {
errCount = 0
}
}
return err
}

func (r *RedisMasterResolver) initialMasterAdressResolve() error {
defer close(r.initialMasterResolveLock)
return r.updateMasterAddress()
}

func redisMasterFromSentinelAddr(sentinelAddress *net.TCPAddr, masterName string) (*net.TCPAddr, error) {
conn, err := utils.TCPConnectWithTimeout(sentinelAddress.String())
if err != nil {
return nil, fmt.Errorf("error connecting to sentinel: %w", err)
}
defer conn.Close()

getMasterCommand := fmt.Sprintf("sentinel get-master-addr-by-name %s\n", masterName)
if _, err := conn.Write([]byte(getMasterCommand)); err != nil {
return nil, fmt.Errorf("error writing to sentinel: %w", err)
}

b := make([]byte, 256)
if _, err := conn.Read(b); err != nil {
return nil, fmt.Errorf("error getting info from sentinel: %w", err)
}

parts := strings.Split(string(b), "\r\n")

if len(parts) < 5 {
return nil, errors.New("couldn't get master address from sentinel")
}

// getting the string address for the master node
stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4])
addr, err := net.ResolveTCPAddr("tcp", stringaddr)
if err != nil {
return nil, fmt.Errorf("error resolving redis master: %w", err)
}

// check that there's actually someone listening on that address
if err := checkTCPConnect(addr); err != nil {
return nil, fmt.Errorf("error checking redis master: %w", err)
}
return addr, nil
}

func checkTCPConnect(addr *net.TCPAddr) error {
conn, err := utils.TCPConnectWithTimeout(addr.String())
if err != nil {
return err
}
defer conn.Close()
return nil
}
Loading

0 comments on commit 118d5fb

Please sign in to comment.