diff --git a/transport/conn.go b/transport/conn.go index f6d48e1..48633fd 100644 --- a/transport/conn.go +++ b/transport/conn.go @@ -19,6 +19,7 @@ import ( "io" "net" "sync" + "time" "go.nanomsg.org/mangos/v3" ) @@ -93,6 +94,10 @@ func (p *conn) Send(msg *Message) error { func (p *conn) Close() error { p.Lock() defer p.Unlock() + // TODO: This does nothing during dial/handshake. + // Ultimately this results in a Dialer that cannot Close while handshake is in progress. + // Even with a timeout mechanism it would be good for Dialer.Close to + // be able to abort pending handshakes in the workQ. if p.open { p.open = false return p.c.Close() @@ -166,10 +171,26 @@ type connHeader struct { // As a side effect, the peer's protocol number is stored in the conn. // Also, various properties are initialized. func (p *conn) handshake() error { + + // Rational for a timeout mechanism (which may need to move elsewhere) + // The dial may have worked but that does not guarantee the + // server is going to send us any data. + // No/partial data will cause binary.Read to block indefinitely + // if the socket remains open. Closing the dialer does not work + // because the handshake is not complete (check for p.open). + + // TODO: Timeout should be configurable however that results in a + // large change because each transport dialer would need to + // support it as an option to pass it to the connection + // handshaker. + p.c.SetDeadline(time.Now().Add(5 * time.Second)) + defer p.c.SetDeadline(time.Time{}) + var err error h := connHeader{S: 'S', P: 'P', Proto: p.proto.Self} if err = binary.Write(p.c, binary.BigEndian, &h); err != nil { + // TODO: should this call _ = p.c.Close() return err } if err = binary.Read(p.c, binary.BigEndian, &h); err != nil { @@ -252,6 +273,8 @@ func (h *connHandshaker) Close() { h.closed = true h.cv.Broadcast() for conn := range h.workq { + // This does not do anything because conn.open is + // false and conn.Close is a no-op _ = conn.Close() } for len(h.doneq) != 0 { diff --git a/transport/ipc/ipc_peer_solaris.go b/transport/ipc/ipc_peer_solaris.go index 9c934b7..84dbf48 100644 --- a/transport/ipc/ipc_peer_solaris.go +++ b/transport/ipc/ipc_peer_solaris.go @@ -1,3 +1,4 @@ +//go:build solaris && cgo // +build solaris,cgo // Copyright 2020 The Mangos Authors @@ -49,14 +50,20 @@ import ( import "C" func getPeer(c *net.UnixConn, pipe transport.ConnPipe) { - if f, err := c.File(); err == nil { - mc := &C.mycred_t{} - if C.getucred(C.int(f.Fd()), mc) == 0 { - pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) - pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) - pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) - pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) - } + // This change was necessary to support SetDeadline and Close + // which aborts pending reads/writes. + // The prior code was calling c.File().Fd(). It was not closing + // the file. Additionally the docs say Fd() causes deadlines to not work. + if sc, err := c.SyscallConn(); err == nil { + sc.Control(func(fd uintptr) { + mc := &C.mycred_t{} + if C.getucred(C.int(fd), mc) == 0 { + pipe.SetOption(mangos.OptionPeerPID, int(mc.pid)) + pipe.SetOption(mangos.OptionPeerUID, int(mc.uid)) + pipe.SetOption(mangos.OptionPeerGID, int(mc.gid)) + pipe.SetOption(mangos.OptionPeerZone, int(mc.zid)) + } + }) } } diff --git a/transport/ipc/ipc_unix.go b/transport/ipc/ipc_unix.go index 3ee107b..e5a4c68 100644 --- a/transport/ipc/ipc_unix.go +++ b/transport/ipc/ipc_unix.go @@ -1,3 +1,4 @@ +//go:build !windows && !plan9 && !js // +build !windows,!plan9,!js // Copyright 2021 The Mangos Authors @@ -50,6 +51,10 @@ type dialer struct { // Dial implements the Dialer Dial method func (d *dialer) Dial() (transport.Pipe, error) { + // TODO: It might be good to pass a context here to abort the dial after + // some timeout deadline. Ideally a dial/handshake would cover both + // initial dial and handshake. This is a bigger change because + // it would need to be implemented by each transport dialer. conn, err := net.DialUnix("unix", nil, d.addr) if err != nil { return nil, err