From b180b17fb974154a32dca1852756d8ded946e058 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Tue, 17 Dec 2024 18:46:49 +0200 Subject: [PATCH] add write timeout, in case peer stalled --- chotki.go | 9 +++++++-- protocol/net.go | 10 ++++++++++ protocol/net_test.go | 4 ++-- protocol/peer.go | 4 ++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/chotki.go b/chotki.go index 7d13767..9101740 100644 --- a/chotki.go +++ b/chotki.go @@ -84,8 +84,8 @@ type Options struct { ReadMinBufferSizeToProcess int TcpReadBufferSize int TcpWriteBufferSize int - - TlsConfig *tls.Config + WriteTimeout time.Duration + TlsConfig *tls.Config } func (o *Options) SetDefaults() { @@ -119,6 +119,10 @@ func (o *Options) SetDefaults() { o.ReadAccumTimeLimit = 5 * time.Second } + if o.WriteTimeout == 0 { + o.WriteTimeout = 5 * time.Minute + } + o.Merger = &pebble.Merger{ Name: "CRDT", Merge: func(key, value []byte) (pebble.ValueMerger, error) { @@ -262,6 +266,7 @@ func Open(dirname string, opts Options) (*Chotki, error) { BufferMinToProcess: cho.opts.ReadMinBufferSizeToProcess, }, &protocol.TcpBufferSizeOpt{Read: cho.opts.TcpReadBufferSize, Write: cho.opts.TcpWriteBufferSize}, + &protocol.NetWriteTimeoutOpt{Timeout: cho.opts.WriteTimeout}, ) if !exists { diff --git a/protocol/net.go b/protocol/net.go index f6837f2..f2ffdc5 100644 --- a/protocol/net.go +++ b/protocol/net.go @@ -57,6 +57,7 @@ type Net struct { readBufferTcpSize int writeBufferTcpSize int readAccumTimeLimit time.Duration + writeTimeout time.Duration bufferMaxSize int bufferMinToProcess int } @@ -65,6 +66,14 @@ type NetOpt interface { Apply(*Net) } +type NetWriteTimeoutOpt struct { + Timeout time.Duration +} + +func (opt *NetWriteTimeoutOpt) Apply(n *Net) { + n.writeTimeout = opt.Timeout +} + type NetTlsConfigOpt struct { Config *tls.Config } @@ -315,6 +324,7 @@ func (n *Net) keepPeer(name string, conn net.Conn) { peer := &Peer{ inout: n.onInstall(name), conn: conn, + writeTimeout: n.writeTimeout, readAccumtTimeLimit: n.readAccumTimeLimit, bufferMaxSize: n.bufferMaxSize, bufferMinToProcess: n.bufferMinToProcess, diff --git a/protocol/net_test.go b/protocol/net_test.go index 22d2f2d..37d2091 100644 --- a/protocol/net_test.go +++ b/protocol/net_test.go @@ -80,7 +80,7 @@ func TestTCPDepot_Connect(t *testing.T) { lCon := utils.NewFDQueue[Records](16, time.Millisecond, 0) l := NewNet(log, func(_ string) FeedDrainCloserTraced { return &TracedQueue[Records, []byte]{lCon} - }, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}) + }, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute}) err := l.Listen(loop) assert.Nil(t, err) @@ -88,7 +88,7 @@ func TestTCPDepot_Connect(t *testing.T) { cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0) c := NewNet(log, func(_ string) FeedDrainCloserTraced { return &TracedQueue[Records, []byte]{cCon} - }, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}) + }, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute}) err = c.Connect(loop) assert.Nil(t, err) diff --git a/protocol/peer.go b/protocol/peer.go index a35c66b..8c6ee41 100644 --- a/protocol/peer.go +++ b/protocol/peer.go @@ -26,6 +26,7 @@ type Peer struct { readAccumtTimeLimit time.Duration bufferMaxSize int bufferMinToProcess int + writeTimeout time.Duration } func (p *Peer) getReadTimeLimit() time.Duration { @@ -148,6 +149,9 @@ func (p *Peer) keepWrite(ctx context.Context) error { p.writeBatchSize.Add(float64(batchSize)) b := net.Buffers(recs) + if p.writeTimeout != 0 { + p.conn.SetWriteDeadline(time.Now().Add(p.writeTimeout)) + } for len(b) > 0 && err == nil { if _, err = b.WriteTo(p.conn); err != nil { return err