From 92a9fe83601a2890411c34aeb77b249adf7c7af1 Mon Sep 17 00:00:00 2001 From: mezei Date: Wed, 1 Feb 2023 17:07:52 -0500 Subject: [PATCH 1/2] Added dial/handshake timeout for non-responsive connections Change ipc solaris to use SyscallConn which enables deadlines/close/abort to work --- transport/conn.go | 23 +++++++++++++++++++++++ transport/ipc/ipc_peer_solaris.go | 23 +++++++++++++++-------- transport/ipc/ipc_unix.go | 5 +++++ 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/transport/conn.go b/transport/conn.go index f6d48e16..afa3b957 100644 --- a/transport/conn.go +++ b/transport/conn.go @@ -19,6 +19,7 @@ import ( "io" "net" "sync" + "time" "go.nanomsg.org/mangos/v3" ) @@ -90,6 +91,10 @@ func (p *conn) Send(msg *Message) error { } // Close implements the Pipe Close method. +// TODO: This does nothing during dial/handshake. +// Ultimately this results in a Dialer that cannot Close while handshake is in progress. +// Even with a timeout mechanism it would be good for Dialer.Close to +// be able to abort pending handshakes in the workQ. func (p *conn) Close() error { p.Lock() defer p.Unlock() @@ -166,10 +171,26 @@ type connHeader struct { // As a side effect, the peer's protocol number is stored in the conn. // Also, various properties are initialized. func (p *conn) handshake() error { + + // Rational for a timeout mechanism (which may need to move elsewhere) + // The dial may have worked but that does not guarantee the + // server is going to send us any data. + // No/partial data will cause binary.Read to block indefinitely + // if the socket remains open. Closing the dialer does not work + // because the handshake is not complete (check for p.open). + + // TODO: Timeout should be configurable however that results in a + // large change because each transport dialer would need to + // support it as an option to pass it to the connection + // handshaker. + p.c.SetDeadline(time.Now().Add(5 * time.Second)) + defer p.c.SetDeadline(time.Time{}) + var err error h := connHeader{S: 'S', P: 'P', Proto: p.proto.Self} if err = binary.Write(p.c, binary.BigEndian, &h); err != nil { + // TODO: should this call _ = p.c.Close() return err } if err = binary.Read(p.c, binary.BigEndian, &h); err != nil { @@ -252,6 +273,8 @@ func (h *connHandshaker) Close() { h.closed = true h.cv.Broadcast() for conn := range h.workq { + // This does not do anything because conn.open is + // false and conn.Close is a no-op _ = conn.Close() } for len(h.doneq) != 0 { diff --git a/transport/ipc/ipc_peer_solaris.go b/transport/ipc/ipc_peer_solaris.go index 9c934b76..84dbf48e 100644 --- a/transport/ipc/ipc_peer_solaris.go +++ b/transport/ipc/ipc_peer_solaris.go @@ -1,3 +1,4 @@ +//go:build solaris && cgo // +build solaris,cgo // Copyright 2020 The Mangos Authors @@ -49,14 +50,20 @@ import ( import "C" func getPeer(c *net.UnixConn, pipe transport.ConnPipe) { - if f, err := c.File(); err == nil { - mc := &C.mycred_t{} - if C.getucred(C.int(f.Fd()), mc) == 0 { - pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) - pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) - pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) - pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) - } + // This change was necessary to support SetDeadline and Close + // which aborts pending reads/writes. + // The prior code was calling c.File().Fd(). It was not closing + // the file. Additionally the docs say Fd() causes deadlines to not work. + if sc, err := c.SyscallConn(); err == nil { + sc.Control(func(fd uintptr) { + mc := &C.mycred_t{} + if C.getucred(C.int(fd), mc) == 0 { + pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) + pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) + pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) + pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) + } + }) } } diff --git a/transport/ipc/ipc_unix.go b/transport/ipc/ipc_unix.go index 3ee107b4..422ee26e 100644 --- a/transport/ipc/ipc_unix.go +++ b/transport/ipc/ipc_unix.go @@ -1,3 +1,4 @@ +//go:build !windows && !plan9 && !js // +build !windows,!plan9,!js // Copyright 2021 The Mangos Authors @@ -50,6 +51,10 @@ type dialer struct { // Dial implements the Dialer Dial method func (d *dialer) Dial() (transport.Pipe, error) { + // TODO: It might be good to pass a context here to abort the dial after + // some timeout deadline. Ideally a dial/handshake would cover both + // initial dial and handshake. This is a bigger change because + // it would need to be implemented each transport dialer. conn, err := net.DialUnix("unix", nil, d.addr) if err != nil { return nil, err From d6ac9b3038a2c3ff4f0e6ccc3c5566f6f2f92740 Mon Sep 17 00:00:00 2001 From: mezei Date: Wed, 1 Feb 2023 17:12:09 -0500 Subject: [PATCH 2/2] comments --- transport/conn.go | 8 ++++---- transport/ipc/ipc_unix.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/transport/conn.go b/transport/conn.go index afa3b957..48633fdc 100644 --- a/transport/conn.go +++ b/transport/conn.go @@ -91,13 +91,13 @@ func (p *conn) Send(msg *Message) error { } // Close implements the Pipe Close method. -// TODO: This does nothing during dial/handshake. -// Ultimately this results in a Dialer that cannot Close while handshake is in progress. -// Even with a timeout mechanism it would be good for Dialer.Close to -// be able to abort pending handshakes in the workQ. func (p *conn) Close() error { p.Lock() defer p.Unlock() + // TODO: This does nothing during dial/handshake. + // Ultimately this results in a Dialer that cannot Close while handshake is in progress. + // Even with a timeout mechanism it would be good for Dialer.Close to + // be able to abort pending handshakes in the workQ. if p.open { p.open = false return p.c.Close() diff --git a/transport/ipc/ipc_unix.go b/transport/ipc/ipc_unix.go index 422ee26e..e5a4c688 100644 --- a/transport/ipc/ipc_unix.go +++ b/transport/ipc/ipc_unix.go @@ -54,7 +54,7 @@ func (d *dialer) Dial() (transport.Pipe, error) { // TODO: It might be good to pass a context here to abort the dial after // some timeout deadline. Ideally a dial/handshake would cover both // initial dial and handshake. This is a bigger change because - // it would need to be implemented each transport dialer. + // it would need to be implemented by each transport dialer. conn, err := net.DialUnix("unix", nil, d.addr) if err != nil { return nil, err