diff --git a/Makefile b/Makefile index c0277bef..e6242f8d 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,12 @@ build: run: go mod tidy && go run main.go run +clean: + go clean -testcache + +test: + go test -v ./... + protolint: buf lint diff --git a/cmd/config_parser.go b/cmd/config_parser.go index 98126cc4..cbc506d1 100644 --- a/cmd/config_parser.go +++ b/cmd/config_parser.go @@ -114,11 +114,17 @@ func poolConfig() (int, *network.Client) { net := globalConfig.String(ref + ".network") address := globalConfig.String(ref + ".address") receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize") + receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize") + receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline") + sendDeadline := globalConfig.Duration(ref + ".sendDeadline") return poolSize, &network.Client{ Network: net, Address: address, ReceiveBufferSize: receiveBufferSize, + ReceiveChunkSize: receiveChunkSize, + ReceiveDeadline: receiveDeadline, + SendDeadline: sendDeadline, } } diff --git a/cmd/run.go b/cmd/run.go index f358ad52..df4abaf7 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -113,6 +113,9 @@ var runCmd = &cobra.Command{ clientConfig.Network, clientConfig.Address, clientConfig.ReceiveBufferSize, + clientConfig.ReceiveChunkSize, + clientConfig.ReceiveDeadline, + clientConfig.SendDeadline, logger, ) @@ -171,6 +174,9 @@ var runCmd = &cobra.Command{ "network": elasticClientConfig.Network, "address": elasticClientConfig.Address, "receiveBufferSize": elasticClientConfig.ReceiveBufferSize, + "receiveChunkSize": elasticClientConfig.ReceiveChunkSize, + "receiveDeadline": elasticClientConfig.ReceiveDeadline.Seconds(), + "sendDeadline": elasticClientConfig.SendDeadline.Seconds(), }, } _, err = hooksConfig.Run( diff --git a/gatewayd.yaml b/gatewayd.yaml index 2de46d2f..e5914c3c 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -16,6 +16,9 @@ clients: network: tcp address: localhost:5432 # receiveBufferSize: 16777216 + # receiveChunkSize: 4096 + # receiveDeadline: 0ms # duration, 0ms/0s means no deadline + # sendDeadline: 0ms # duration, 0ms/0s means no deadline # Pool config pool: @@ -48,7 +51,7 @@ server: proxy: proxy enableTicker: False - tickInterval: 5s # seconds + tickInterval: 5s # duration multiCore: True lockOSThread: False loadBalancer: roundrobin @@ -58,7 +61,7 @@ server: # socketSendBuffer: 16777216 reuseAddress: True reusePort: True - tcpKeepAlive: 3s # seconds + tcpKeepAlive: 3s # duration tcpNoDelay: True plugins: diff --git a/network/client.go b/network/client.go index 93ad74e5..6966b9c8 100644 --- a/network/client.go +++ b/network/client.go @@ -2,14 +2,17 @@ package network import ( "net" + "time" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/rs/zerolog" ) const ( - DefaultSeed = 1000 - DefaultChunkSize = 4096 + DefaultSeed = 1000 + DefaultChunkSize = 4096 + DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) + DefaultSendDeadline = 0 ) type ClientInterface interface { @@ -24,18 +27,26 @@ type Client struct { logger zerolog.Logger - ID string ReceiveBufferSize int + ReceiveChunkSize int + ReceiveDeadline time.Duration + SendDeadline time.Duration + ID string Network string // tcp/udp/unix Address string - // TODO: add read/write deadline and deal with timeouts } var _ ClientInterface = &Client{} // TODO: implement a better connection management algorithm -func NewClient(network, address string, receiveBufferSize int, logger zerolog.Logger) *Client { +//nolint:funlen +func NewClient( + network, address string, + receiveBufferSize, receiveChunkSize int, + receiveDeadline, sendDeadline time.Duration, + logger zerolog.Logger, +) *Client { var client Client client.logger = logger @@ -69,12 +80,41 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo } client.Conn = conn + + if receiveDeadline <= 0 { + client.ReceiveDeadline = DefaultReceiveDeadline + } else { + client.ReceiveDeadline = receiveDeadline + if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil { + logger.Error().Err(err).Msg("Failed to set receive deadline") + } else { + logger.Debug().Msgf("Receive deadline set to %s", client.ReceiveDeadline) + } + } + + if sendDeadline <= 0 { + client.SendDeadline = DefaultSendDeadline + } else { + client.SendDeadline = sendDeadline + if err := client.Conn.SetWriteDeadline(time.Now().Add(client.SendDeadline)); err != nil { + logger.Error().Err(err).Msg("Failed to set send deadline") + } else { + logger.Debug().Msgf("Send deadline set to %s", client.SendDeadline) + } + } + if receiveBufferSize <= 0 { client.ReceiveBufferSize = DefaultBufferSize } else { client.ReceiveBufferSize = receiveBufferSize } + if receiveChunkSize <= 0 { + client.ReceiveChunkSize = DefaultChunkSize + } else { + client.ReceiveChunkSize = receiveChunkSize + } + logger.Debug().Msgf("New client created: %s", client.Address) client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger) @@ -95,7 +135,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) { var received int buffer := make([]byte, 0, c.ReceiveBufferSize) for { - smallBuf := make([]byte, DefaultChunkSize) + smallBuf := make([]byte, c.ReceiveChunkSize) read, err := c.Conn.Read(smallBuf) switch { case read > 0 && err != nil: @@ -111,7 +151,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) { buffer = append(buffer, smallBuf[:read]...) } - if read == 0 || read < DefaultChunkSize { + if read == 0 || read < c.ReceiveChunkSize { break } } diff --git a/network/client_test.go b/network/client_test.go index d30e1a09..ac77f47b 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -30,7 +30,14 @@ func TestNewClient(t *testing.T) { logger := logging.NewLogger(cfg) - client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) defer client.Close() assert.Equal(t, "tcp", client.Network) @@ -61,7 +68,14 @@ func TestSend(t *testing.T) { logger := logging.NewLogger(cfg) - client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) defer client.Close() assert.NotNil(t, client) @@ -92,7 +106,14 @@ func TestReceive(t *testing.T) { logger := logging.NewLogger(cfg) - client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) defer client.Close() assert.NotNil(t, client) @@ -133,7 +154,14 @@ func TestClose(t *testing.T) { logger := logging.NewLogger(cfg) - client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) assert.NotNil(t, client) client.Close() assert.Equal(t, "", client.ID) diff --git a/network/proxy.go b/network/proxy.go index 8bbbdf1b..4aaee962 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -75,6 +75,9 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { pr.ClientConfig.Network, pr.ClientConfig.Address, pr.ClientConfig.ReceiveBufferSize, + pr.ClientConfig.ReceiveChunkSize, + pr.ClientConfig.ReceiveDeadline, + pr.ClientConfig.SendDeadline, pr.logger, ) pr.logger.Debug().Msgf("Reused the client %s by putting it back in the pool", client.ID) diff --git a/network/proxy_test.go b/network/proxy_test.go index e6ff4484..743a10e4 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -34,7 +34,14 @@ func TestNewProxy(t *testing.T) { // Create a connection pool pool := pool.NewPool(EmptyPoolCapacity) - client := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) err := pool.Put(client.ID, client) assert.Nil(t, err) diff --git a/network/server.go b/network/server.go index 5b833aa0..eba7abf5 100644 --- a/network/server.go +++ b/network/server.go @@ -189,7 +189,6 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action { if err := s.proxy.PassThrough(gconn); err != nil { s.logger.Error().Err(err).Msg("Failed to pass through traffic") - // TODO: Close the connection *gracefully* switch { case errors.Is(err, gerr.ErrPoolExhausted): case errors.Is(err, gerr.ErrCastFailed): diff --git a/network/server_test.go b/network/server_test.go index 534233c9..b570a4f6 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -91,10 +91,24 @@ func TestRunServer(t *testing.T) { // Create a connection pool pool := pool.NewPool(2) - client1 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client1 := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) err := pool.Put(client1.ID, client1) assert.Nil(t, err) - client2 := NewClient("tcp", "localhost:5432", DefaultBufferSize, logger) + client2 := NewClient( + "tcp", + "localhost:5432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) err = pool.Put(client2.ID, client2) assert.Nil(t, err) @@ -138,7 +152,14 @@ func TestRunServer(t *testing.T) { for { if server.IsRunning() { - client := NewClient("tcp", "127.0.0.1:15432", DefaultBufferSize, logger) + client := NewClient( + "tcp", + "127.0.0.1:15432", + DefaultBufferSize, + DefaultChunkSize, + DefaultReceiveDeadline, + DefaultSendDeadline, + logger) defer client.Close() assert.NotNil(t, client)