Skip to content

Commit

Permalink
Merge pull request #54 from gatewayd-io/improve-client-code
Browse files Browse the repository at this point in the history
Improve client code
  • Loading branch information
mostafa authored Dec 22, 2022
2 parents 38e5df1 + 9a4ab9b commit 2dde98e
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 18 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ build:
run:
go mod tidy && go run main.go run

clean:
go clean -testcache

test:
go test -v ./...

protolint:
buf lint

Expand Down
6 changes: 6 additions & 0 deletions cmd/config_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,17 @@ func poolConfig() (int, *network.Client) {
net := globalConfig.String(ref + ".network")
address := globalConfig.String(ref + ".address")
receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize")
receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize")
receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline")
sendDeadline := globalConfig.Duration(ref + ".sendDeadline")

return poolSize, &network.Client{
Network: net,
Address: address,
ReceiveBufferSize: receiveBufferSize,
ReceiveChunkSize: receiveChunkSize,
ReceiveDeadline: receiveDeadline,
SendDeadline: sendDeadline,
}
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ var runCmd = &cobra.Command{
clientConfig.Network,
clientConfig.Address,
clientConfig.ReceiveBufferSize,
clientConfig.ReceiveChunkSize,
clientConfig.ReceiveDeadline,
clientConfig.SendDeadline,
logger,
)

Expand Down Expand Up @@ -171,6 +174,9 @@ var runCmd = &cobra.Command{
"network": elasticClientConfig.Network,
"address": elasticClientConfig.Address,
"receiveBufferSize": elasticClientConfig.ReceiveBufferSize,
"receiveChunkSize": elasticClientConfig.ReceiveChunkSize,
"receiveDeadline": elasticClientConfig.ReceiveDeadline.Seconds(),
"sendDeadline": elasticClientConfig.SendDeadline.Seconds(),
},
}
_, err = hooksConfig.Run(
Expand Down
7 changes: 5 additions & 2 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ clients:
network: tcp
address: localhost:5432
# receiveBufferSize: 16777216
# receiveChunkSize: 4096
# receiveDeadline: 0ms # duration, 0ms/0s means no deadline
# sendDeadline: 0ms # duration, 0ms/0s means no deadline

# Pool config
pool:
Expand Down Expand Up @@ -48,7 +51,7 @@ server:
proxy: proxy

enableTicker: False
tickInterval: 5s # seconds
tickInterval: 5s # duration
multiCore: True
lockOSThread: False
loadBalancer: roundrobin
Expand All @@ -58,7 +61,7 @@ server:
# socketSendBuffer: 16777216
reuseAddress: True
reusePort: True
tcpKeepAlive: 3s # seconds
tcpKeepAlive: 3s # duration
tcpNoDelay: True

plugins:
Expand Down
54 changes: 47 additions & 7 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package network

import (
"net"
"time"

gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/rs/zerolog"
)

const (
DefaultSeed = 1000
DefaultChunkSize = 4096
DefaultSeed = 1000
DefaultChunkSize = 4096
DefaultReceiveDeadline = 0 // 0 means no deadline (timeout)
DefaultSendDeadline = 0
)

type ClientInterface interface {
Expand All @@ -24,18 +27,26 @@ type Client struct {

logger zerolog.Logger

ID string
ReceiveBufferSize int
ReceiveChunkSize int
ReceiveDeadline time.Duration
SendDeadline time.Duration
ID string
Network string // tcp/udp/unix
Address string
// TODO: add read/write deadline and deal with timeouts
}

var _ ClientInterface = &Client{}

// TODO: implement a better connection management algorithm

func NewClient(network, address string, receiveBufferSize int, logger zerolog.Logger) *Client {
//nolint:funlen
func NewClient(
network, address string,
receiveBufferSize, receiveChunkSize int,
receiveDeadline, sendDeadline time.Duration,
logger zerolog.Logger,
) *Client {
var client Client

client.logger = logger
Expand Down Expand Up @@ -69,12 +80,41 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo
}

client.Conn = conn

if receiveDeadline <= 0 {
client.ReceiveDeadline = DefaultReceiveDeadline
} else {
client.ReceiveDeadline = receiveDeadline
if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil {
logger.Error().Err(err).Msg("Failed to set receive deadline")
} else {
logger.Debug().Msgf("Receive deadline set to %s", client.ReceiveDeadline)
}
}

if sendDeadline <= 0 {
client.SendDeadline = DefaultSendDeadline
} else {
client.SendDeadline = sendDeadline
if err := client.Conn.SetWriteDeadline(time.Now().Add(client.SendDeadline)); err != nil {
logger.Error().Err(err).Msg("Failed to set send deadline")
} else {
logger.Debug().Msgf("Send deadline set to %s", client.SendDeadline)
}
}

if receiveBufferSize <= 0 {
client.ReceiveBufferSize = DefaultBufferSize
} else {
client.ReceiveBufferSize = receiveBufferSize
}

if receiveChunkSize <= 0 {
client.ReceiveChunkSize = DefaultChunkSize
} else {
client.ReceiveChunkSize = receiveChunkSize
}

logger.Debug().Msgf("New client created: %s", client.Address)
client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger)

Expand All @@ -95,7 +135,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) {
var received int
buffer := make([]byte, 0, c.ReceiveBufferSize)
for {
smallBuf := make([]byte, DefaultChunkSize)
smallBuf := make([]byte, c.ReceiveChunkSize)
read, err := c.Conn.Read(smallBuf)
switch {
case read > 0 && err != nil:
Expand All @@ -111,7 +151,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) {
buffer = append(buffer, smallBuf[:read]...)
}

if read == 0 || read < DefaultChunkSize {
if read == 0 || read < c.ReceiveChunkSize {
break
}
}
Expand Down
36 changes: 32 additions & 4 deletions network/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ func TestNewClient(t *testing.T) {

logger := logging.NewLogger(cfg)

client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
defer client.Close()

assert.Equal(t, "tcp", client.Network)
Expand Down Expand Up @@ -61,7 +68,14 @@ func TestSend(t *testing.T) {

logger := logging.NewLogger(cfg)

client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
defer client.Close()

assert.NotNil(t, client)
Expand Down Expand Up @@ -92,7 +106,14 @@ func TestReceive(t *testing.T) {

logger := logging.NewLogger(cfg)

client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
defer client.Close()

assert.NotNil(t, client)
Expand Down Expand Up @@ -133,7 +154,14 @@ func TestClose(t *testing.T) {

logger := logging.NewLogger(cfg)

client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
assert.NotNil(t, client)
client.Close()
assert.Equal(t, "", client.ID)
Expand Down
3 changes: 3 additions & 0 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {
pr.ClientConfig.Network,
pr.ClientConfig.Address,
pr.ClientConfig.ReceiveBufferSize,
pr.ClientConfig.ReceiveChunkSize,
pr.ClientConfig.ReceiveDeadline,
pr.ClientConfig.SendDeadline,
pr.logger,
)
pr.logger.Debug().Msgf("Reused the client %s by putting it back in the pool", client.ID)
Expand Down
9 changes: 8 additions & 1 deletion network/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ func TestNewProxy(t *testing.T) {

// Create a connection pool
pool := pool.NewPool(EmptyPoolCapacity)
client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
err := pool.Put(client.ID, client)
assert.Nil(t, err)

Expand Down
1 change: 0 additions & 1 deletion network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action {

if err := s.proxy.PassThrough(gconn); err != nil {
s.logger.Error().Err(err).Msg("Failed to pass through traffic")
// TODO: Close the connection *gracefully*
switch {
case errors.Is(err, gerr.ErrPoolExhausted):
case errors.Is(err, gerr.ErrCastFailed):
Expand Down
27 changes: 24 additions & 3 deletions network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,24 @@ func TestRunServer(t *testing.T) {

// Create a connection pool
pool := pool.NewPool(2)
client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client1 := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
err := pool.Put(client1.ID, client1)
assert.Nil(t, err)
client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger)
client2 := NewClient(
"tcp",
"localhost:5432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
err = pool.Put(client2.ID, client2)
assert.Nil(t, err)

Expand Down Expand Up @@ -138,7 +152,14 @@ func TestRunServer(t *testing.T) {

for {
if server.IsRunning() {
client := NewClient("tcp", "127.0.0.1:15432", DefaultBufferSize, logger)
client := NewClient(
"tcp",
"127.0.0.1:15432",
DefaultBufferSize,
DefaultChunkSize,
DefaultReceiveDeadline,
DefaultSendDeadline,
logger)
defer client.Close()

assert.NotNil(t, client)
Expand Down

0 comments on commit 2dde98e

Please sign in to comment.