From 9fa356e52e9a989fb7ed901af23bac0e3da363b6 Mon Sep 17 00:00:00 2001 From: Gernot Messow Date: Sun, 2 May 2021 21:54:40 +0200 Subject: [PATCH 1/3] Add server SendTo method; necessary changes and new test --- examples/basic_server/basic_server.go | 3 +- osc/osc.go | 91 ++++++++++++++++++++++----- osc/osc_test.go | 64 +++++++++++++++++-- 3 files changed, 136 insertions(+), 22 deletions(-) diff --git a/examples/basic_server/basic_server.go b/examples/basic_server/basic_server.go index 93aad96..8323df7 100644 --- a/examples/basic_server/basic_server.go +++ b/examples/basic_server/basic_server.go @@ -17,6 +17,7 @@ func main() { fmt.Println("Couldn't listen: ", err) } defer conn.Close() + server.SetConnection(conn) fmt.Println("### Welcome to go-osc receiver demo") fmt.Println("Press \"q\" to exit") @@ -25,7 +26,7 @@ func main() { fmt.Println("Start listening on", addr) for { - packet, err := server.ReceivePacket(conn) + packet, err := server.ReceivePacket() if err != nil { fmt.Println("Server error: " + err.Error()) os.Exit(1) diff --git a/osc/osc.go b/osc/osc.go index 7cb6f31..6df758a 100644 --- a/osc/osc.go +++ b/osc/osc.go @@ -24,13 +24,20 @@ const ( // Packet is the interface for Message and Bundle. type Packet interface { encoding.BinaryMarshaler + SenderAddr() net.Addr } // Message represents a single OSC message. An OSC message consists of an OSC // address pattern and zero or more arguments. type Message struct { - Address string - Arguments []interface{} + Address string + Arguments []interface{} + senderAddr net.Addr +} + +// SenderAddr returns the sender address of this message. Returns nil if no address is available +func (m *Message) SenderAddr() net.Addr { + return m.senderAddr } // Verify that Messages implements the Packet interface. @@ -41,9 +48,25 @@ var _ Packet = (*Message)(nil) // elements. The OSC-timetag is a 64-bit fixed point time tag. See // http://opensoundcontrol.org/spec-1_0 for more information. type Bundle struct { - Timetag Timetag - Messages []*Message - Bundles []*Bundle + Timetag Timetag + Messages []*Message + Bundles []*Bundle + senderAddr net.Addr +} + +// SenderAddr returns the sender address of this message. Returns nil if no address is available +func (b *Bundle) SenderAddr() net.Addr { + return b.senderAddr +} + +func (b *Bundle) setSenderAddr(src net.Addr) { + b.senderAddr = src + for _, m := range b.Messages { + m.senderAddr = src + } + for _, b := range b.Bundles { + b.setSenderAddr(src) + } } // Verify that Bundle implements the Packet interface. @@ -63,6 +86,7 @@ type Server struct { Addr string Dispatcher Dispatcher ReadTimeout time.Duration + conn net.PacketConn close func() error } @@ -536,22 +560,37 @@ func (s *Server) ListenAndServe() error { s.Dispatcher = NewStandardDispatcher() } + if s.conn == nil { + s.Listen() + } + + return s.Serve() +} + +// Listen creates the listening port and sets up the connection. +func (s *Server) Listen() error { ln, err := net.ListenPacket("udp", s.Addr) if err != nil { return err } + s.conn = ln s.close = ln.Close + return nil +} - return s.Serve(ln) +// SetConnection sets the connection to use for the server. This is for a case +// where you created the connection manually instead of using ListenAndServer. +func (s *Server) SetConnection(c net.PacketConn) { + s.conn = c } // Serve retrieves incoming OSC packets from the given connection and dispatches // retrieved OSC packets. If something goes wrong an error is returned. -func (s *Server) Serve(c net.PacketConn) error { +func (s *Server) Serve() error { var tempDelay time.Duration for { - msg, err := s.readFromConnection(c) + msg, err := s.readFromConnection() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { @@ -568,6 +607,7 @@ func (s *Server) Serve(c net.PacketConn) error { return err } tempDelay = 0 + go s.Dispatcher.Dispatch(msg) } } @@ -585,36 +625,50 @@ func (s *Server) CloseConnection() error { } // ReceivePacket listens for incoming OSC packets and returns the packet if one is received. -func (s *Server) ReceivePacket(c net.PacketConn) (Packet, error) { - return s.readFromConnection(c) +func (s *Server) ReceivePacket() (Packet, error) { + return s.readFromConnection() } // readFromConnection retrieves OSC packets. -func (s *Server) readFromConnection(c net.PacketConn) (Packet, error) { +func (s *Server) readFromConnection() (Packet, error) { if s.ReadTimeout != 0 { - if err := c.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil { + if err := s.conn.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil { return nil, err } } data := make([]byte, 65535) - n, _, err := c.ReadFrom(data) + n, src, err := s.conn.ReadFrom(data) if err != nil { return nil, err } var start int - p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n) + p, err := readPacket(bufio.NewReader(bytes.NewBuffer(data)), &start, n, src) if err != nil { return nil, err } return p, nil } +// SendTo sends a message to the given address. The sender address will be the address and +// port the server is listening on. +func (s *Server) SendTo(packet Packet, addr net.Addr) error { + data, err := packet.MarshalBinary() + if err != nil { + return err + } + + if _, err = s.conn.WriteTo(data, addr); err != nil { + return err + } + return nil +} + // ParsePacket parses the given msg string and returns a Packet func ParsePacket(msg string) (Packet, error) { var start int - p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg)) + p, err := readPacket(bufio.NewReader(bytes.NewBufferString(msg)), &start, len(msg), nil) if err != nil { return nil, err } @@ -622,7 +676,7 @@ func ParsePacket(msg string) (Packet, error) { } // receivePacket receives an OSC packet from the given reader. -func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { +func readPacket(reader *bufio.Reader, start *int, end int, src net.Addr) (Packet, error) { //var buf []byte buf, err := reader.Peek(1) if err != nil { @@ -635,6 +689,7 @@ func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { if err != nil { return nil, err } + packet.senderAddr = src return packet, nil } if buf[0] == '#' { // An OSC bundle starts with a '#' @@ -642,6 +697,8 @@ func readPacket(reader *bufio.Reader, start *int, end int) (Packet, error) { if err != nil { return nil, err } + packet.setSenderAddr(src) + return packet, nil } @@ -681,7 +738,7 @@ func readBundle(reader *bufio.Reader, start *int, end int) (*Bundle, error) { } *start += 4 - p, err := readPacket(reader, start, end) + p, err := readPacket(reader, start, end, nil) if err != nil { return nil, err } diff --git a/osc/osc_test.go b/osc/osc_test.go index 80bf35b..bacc785 100644 --- a/osc/osc_test.go +++ b/osc/osc_test.go @@ -223,11 +223,12 @@ func TestServerMessageReceiving(t *testing.T) { return } defer c.Close() + server.SetConnection(c) // Start the client start <- true - packet, err := server.ReceivePacket(c) + packet, err := server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -320,10 +321,11 @@ func TestReadTimeout(t *testing.T) { t.Error(err) } defer c.Close() + server.SetConnection(c) // Start the client start <- true - p, err := server.ReceivePacket(c) + p, err := server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -334,13 +336,13 @@ func TestReadTimeout(t *testing.T) { } // Second receive should time out since client is delayed 150 milliseconds - if _, err = server.ReceivePacket(c); err == nil { + if _, err = server.ReceivePacket(); err == nil { t.Errorf("expected error") return } // Next receive should get it - p, err = server.ReceivePacket(c) + p, err = server.ReceivePacket() if err != nil { t.Errorf("server error: %v", err) return @@ -614,6 +616,60 @@ func TestOscMessageMatch(t *testing.T) { } } +func TestServerSend(t *testing.T) { + targetServer := Server{ + Addr: "127.0.0.1:6677", + } + + go func() { + d := NewStandardDispatcher() + d.AddMsgHandler("/message/test", func(msg *Message) { + reply := NewMessage("/reply/test") + err := targetServer.SendTo(reply, msg.SenderAddr()) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + }) + targetServer.Dispatcher = d + targetServer.ListenAndServe() + }() + + time.Sleep(2 * time.Second) + + result := make(chan bool, 1) + + d := NewStandardDispatcher() + d.AddMsgHandler("/reply/test", func(msg *Message) { + result <- true + }) + clientServer := Server{ + Addr: "127.0.0.1:18536", + Dispatcher: d, + } + + err := clientServer.Listen() + if err != nil { + t.Errorf("Listen failed: %v", err) + } + go clientServer.Serve() + + msg := NewMessage("/message/test") + addr, _ := net.ResolveUDPAddr("udp", targetServer.Addr) + err = clientServer.SendTo(msg, addr) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + + select { + case r := <-result: + if !r { + t.Error("did not get expected response") + } + case <-time.After(2 * time.Second): + t.Error("unexpected timeout") + } +} + const zero = string(byte(0)) // nulls returns a string of `i` nulls. From e599b12818bc7ceb3d6fc55e4039fe4a1a66906c Mon Sep 17 00:00:00 2001 From: Gernot Messow Date: Mon, 3 May 2021 21:44:22 +0200 Subject: [PATCH 2/3] Added integrated server to client; updated documentation and added example --- README.md | 35 +++++++++++- examples/bidirectional/client/main.go | 23 ++++++++ examples/bidirectional/server/main.go | 19 +++++++ osc/osc.go | 82 ++++++++++++++++++++++++--- osc/osc_test.go | 49 ++++++++++++++++ 5 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 examples/bidirectional/client/main.go create mode 100644 examples/bidirectional/server/main.go diff --git a/README.md b/README.md index ac3e6dd..65025ec 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,13 @@ go get github.com/hypebeast/go-osc ### Client ```go +package main + import "github.com/hypebeast/go-osc/osc" func main() { client := osc.NewClient("localhost", 8765) + defer client.Close() msg := osc.NewMessage("/osc/address") msg.Append(int32(111)) msg.Append(true) @@ -47,6 +50,34 @@ func main() { } ``` +### Client with response handling + +```go +package main + +import "github.com/hypebeast/go-osc/osc" + +func main() { + finished := make(chan struct{}) + + client := osc.NewClient("localhost", 8765) + defer client.Close() + + d := osc.NewStandardDispatcher() + d.AddMsgHandler("/reply", func(msg *osc.Message) { + osc.PrintMessage(msg) + finished <- struct{}{} + }) + client.SetDispatcher(d) + go client.ListenAndServe() + + msg := osc.NewMessage("/message/address") + client.Send(msg) + + <-finished +} +``` + ### Server ```go @@ -55,13 +86,15 @@ package main import "github.com/hypebeast/go-osc/osc" func main() { + var server *osc.Server addr := "127.0.0.1:8765" d := osc.NewStandardDispatcher() d.AddMsgHandler("/message/address", func(msg *osc.Message) { osc.PrintMessage(msg) + server.SendTo(osc.NewMessage("/reply"), msg.SenderAddr()) }) - server := &osc.Server{ + server = &osc.Server{ Addr: addr, Dispatcher:d, } diff --git a/examples/bidirectional/client/main.go b/examples/bidirectional/client/main.go new file mode 100644 index 0000000..cb22711 --- /dev/null +++ b/examples/bidirectional/client/main.go @@ -0,0 +1,23 @@ +package main + +import "github.com/hypebeast/go-osc/osc" + +func main() { + finished := make(chan struct{}) + + client := osc.NewClient("localhost", 8765) + defer client.Close() + + d := osc.NewStandardDispatcher() + d.AddMsgHandler("/reply", func(msg *osc.Message) { + osc.PrintMessage(msg) + finished <- struct{}{} + }) + client.SetDispatcher(d) + go client.ListenAndServe() + + msg := osc.NewMessage("/message/address") + client.Send(msg) + + <-finished +} diff --git a/examples/bidirectional/server/main.go b/examples/bidirectional/server/main.go new file mode 100644 index 0000000..f4edbfb --- /dev/null +++ b/examples/bidirectional/server/main.go @@ -0,0 +1,19 @@ +package main + +import "github.com/hypebeast/go-osc/osc" + +func main() { + var server *osc.Server + addr := "127.0.0.1:8765" + d := osc.NewStandardDispatcher() + d.AddMsgHandler("/message/address", func(msg *osc.Message) { + osc.PrintMessage(msg) + server.SendTo(osc.NewMessage("/reply"), msg.SenderAddr()) + }) + + server = &osc.Server{ + Addr: addr, + Dispatcher: d, + } + server.ListenAndServe() +} diff --git a/osc/osc.go b/osc/osc.go index 6df758a..ac209c2 100644 --- a/osc/osc.go +++ b/osc/osc.go @@ -13,6 +13,7 @@ import ( "reflect" "regexp" "strings" + "sync" "time" ) @@ -75,9 +76,12 @@ var _ Packet = (*Bundle)(nil) // Client enables you to send OSC packets. It sends OSC messages and bundles to // the given IP address and port. type Client struct { - ip string - port int - laddr *net.UDPAddr + ip string + port int + laddr *net.UDPAddr + conn *net.UDPConn + server *Server + mtx sync.Mutex } // Server represents an OSC server. The server listens on Address and Port for @@ -499,7 +503,7 @@ func (b *Bundle) MarshalBinary() ([]byte, error) { // specifies the IP address and `port` defines the target port where the // messages and bundles will be send to. func NewClient(ip string, port int) *Client { - return &Client{ip: ip, port: port, laddr: nil} + return &Client{ip: ip, port: port, laddr: nil, server: &Server{}} } // IP returns the IP address. @@ -514,6 +518,24 @@ func (c *Client) Port() int { return c.port } // SetPort sets a new port. func (c *Client) SetPort(port int) { c.port = port } +// SetConnection sets the connection to use +func (c *Client) SetConnection(conn *net.UDPConn) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.server.SetConnection(c.conn) + c.conn = conn +} + +// Connection returns the current connection +func (c *Client) Connection() *net.UDPConn { + return c.conn +} + +// SetDispatcher sets the dispatcher to use to handle responses. +func (c *Client) SetDispatcher(d Dispatcher) { + c.server.Dispatcher = d +} + // SetLocalAddr sets the local address. func (c *Client) SetLocalAddr(ip string, port int) error { laddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", ip, port)) @@ -524,29 +546,71 @@ func (c *Client) SetLocalAddr(ip string, port int) error { return nil } -// Send sends an OSC Bundle or an OSC Message. -func (c *Client) Send(packet Packet) error { +// Connect explicitly connects to the target address. Normally you don't have to call +// this, as both Send and ListenAndServe establish the connection if necessary. +func (c *Client) Connect() error { + c.mtx.Lock() + defer c.mtx.Unlock() + if c.conn != nil { + return nil // already connected + } addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", c.ip, c.port)) if err != nil { return err } - conn, err := net.DialUDP("udp", c.laddr, addr) + c.conn, err = net.DialUDP("udp", c.laddr, addr) if err != nil { return err } - defer conn.Close() + c.server.SetConnection(c.conn) + fmt.Println("DEBUG: client connected") + return nil +} + +// Close closes the current connection. +func (c *Client) Close() { + if c.conn != nil { + c.conn.Close() + c.conn = nil + } +} +// Send sends an OSC Bundle or an OSC Message. If no connection exists, one will be established. +func (c *Client) Send(packet Packet) error { + if c.conn == nil { + err := c.Connect() + if err != nil { + return err + } + } data, err := packet.MarshalBinary() if err != nil { return err } - if _, err = conn.Write(data); err != nil { + if _, err = c.conn.Write(data); err != nil { return err } return nil } +// ListenAndServe starts the listening and dispatching loop. It listens on the same port, that +// was established by the client. You only need to call this if you expect responses from the server. +// This function only returns when there is an error, so better put it into a go routine. +func (c *Client) ListenAndServe() error { + if c.conn == nil { + err := c.Connect() + if err != nil { + return err + } + } + if c.server.Dispatcher == nil { + c.server.Dispatcher = NewStandardDispatcher() + } + + return c.server.Serve() +} + //// // Server //// diff --git a/osc/osc_test.go b/osc/osc_test.go index bacc785..1b053a0 100644 --- a/osc/osc_test.go +++ b/osc/osc_test.go @@ -3,6 +3,7 @@ package osc import ( "bufio" "bytes" + "fmt" "io" "net" "reflect" @@ -670,6 +671,54 @@ func TestServerSend(t *testing.T) { } } +func TestClientRecv(t *testing.T) { + targetServer := Server{ + Addr: "127.0.0.1:6677", + } + + go func() { + d := NewStandardDispatcher() + d.AddMsgHandler("/message/test", func(msg *Message) { + fmt.Println("DEBUG: targetServer: got /message/test") + reply := NewMessage("/reply/test") + err := targetServer.SendTo(reply, msg.SenderAddr()) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + }) + targetServer.Dispatcher = d + targetServer.ListenAndServe() + }() + + time.Sleep(2 * time.Second) + + result := make(chan bool, 1) + + d := NewStandardDispatcher() + d.AddMsgHandler("/reply/test", func(msg *Message) { + result <- true + }) + + client := NewClient("127.0.0.1", 6677) + client.SetDispatcher(d) + go client.ListenAndServe() + + msg := NewMessage("/message/test") + err := client.Send(msg) + if err != nil { + t.Errorf("SendTo failed: %v", err) + } + + select { + case r := <-result: + if !r { + t.Error("did not get expected response") + } + case <-time.After(2 * time.Second): + t.Error("unexpected timeout") + } +} + const zero = string(byte(0)) // nulls returns a string of `i` nulls. From 4351418e70b6f04d160bc4a1fe4bec3942fa39a2 Mon Sep 17 00:00:00 2001 From: Gernot Messow Date: Mon, 3 May 2021 21:56:56 +0200 Subject: [PATCH 3/3] Use different ports in different tests; removed debug output; missing error handling --- osc/osc.go | 4 +++- osc/osc_test.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/osc/osc.go b/osc/osc.go index ac209c2..7940f5b 100644 --- a/osc/osc.go +++ b/osc/osc.go @@ -563,7 +563,6 @@ func (c *Client) Connect() error { return err } c.server.SetConnection(c.conn) - fmt.Println("DEBUG: client connected") return nil } @@ -695,6 +694,9 @@ func (s *Server) ReceivePacket() (Packet, error) { // readFromConnection retrieves OSC packets. func (s *Server) readFromConnection() (Packet, error) { + if s.conn == nil { + return nil, errors.New("not connected") + } if s.ReadTimeout != 0 { if err := s.conn.SetReadDeadline(time.Now().Add(s.ReadTimeout)); err != nil { return nil, err diff --git a/osc/osc_test.go b/osc/osc_test.go index 1b053a0..1932019 100644 --- a/osc/osc_test.go +++ b/osc/osc_test.go @@ -673,8 +673,9 @@ func TestServerSend(t *testing.T) { func TestClientRecv(t *testing.T) { targetServer := Server{ - Addr: "127.0.0.1:6677", + Addr: "127.0.0.1:6678", } + defer targetServer.CloseConnection() go func() { d := NewStandardDispatcher() @@ -705,6 +706,7 @@ func TestClientRecv(t *testing.T) { msg := NewMessage("/message/test") err := client.Send(msg) + defer client.Close() if err != nil { t.Errorf("SendTo failed: %v", err) }