Skip to content

Commit

Permalink
Make srtconn.ReadPacket and srtconn.WritePacket public
Browse files Browse the repository at this point in the history
  • Loading branch information
ioppermann committed Mar 19, 2024
1 parent 385888d commit 06384dd
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 29 deletions.
19 changes: 13 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ type Conn interface {
// time limit; see SetDeadline and SetReadDeadline.
Read(p []byte) (int, error)

// ReadPacket reads a packet from the queue of received packets. It blocks
// if the queue is empty. Only data packets are returned. Using ReadPacket
// and Read at the same time may lead to data loss.
ReadPacket() (packet.Packet, error)

// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(p []byte) (int, error)

// WritePacket writes a packet to the write queue. Packets on the write queue
// will be sent to the peer of the connection. Only data packets will be sent.
WritePacket(p packet.Packet) error

// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
Close() error
Expand Down Expand Up @@ -367,9 +376,7 @@ func (c *srtConn) ticker(ctx context.Context) {
}
}

// readPacket reads a packet from the queue of received packets. It blocks
// if the queue is empty. Only data packets are returned.
func (c *srtConn) readPacket() (packet.Packet, error) {
func (c *srtConn) ReadPacket() (packet.Packet, error) {
var p packet.Packet
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -400,7 +407,7 @@ func (c *srtConn) Read(b []byte) (int, error) {

c.readBuffer.Reset()

p, err := c.readPacket()
p, err := c.ReadPacket()
if err != nil {
return 0, err
}
Expand All @@ -413,9 +420,9 @@ func (c *srtConn) Read(b []byte) (int, error) {
return c.readBuffer.Read(b)
}

// writePacket writes a packet to the write queue. Packets on the write queue
// WritePacket writes a packet to the write queue. Packets on the write queue
// will be sent to the peer of the connection. Only data packets will be sent.
func (c *srtConn) writePacket(p packet.Packet) error {
func (c *srtConn) WritePacket(p packet.Packet) error {
if p.Header().IsControlPacket {
// Ignore control packets
return nil
Expand Down
8 changes: 4 additions & 4 deletions dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (dl *dialer) Read(p []byte) (n int, err error) {
return dl.conn.Read(p)
}

func (dl *dialer) readPacket() (packet.Packet, error) {
func (dl *dialer) ReadPacket() (packet.Packet, error) {
if err := dl.checkConnection(); err != nil {
return nil, err
}
Expand All @@ -713,7 +713,7 @@ func (dl *dialer) readPacket() (packet.Packet, error) {
return nil, fmt.Errorf("no connection")
}

return dl.conn.readPacket()
return dl.conn.ReadPacket()
}

func (dl *dialer) Write(p []byte) (n int, err error) {
Expand All @@ -731,7 +731,7 @@ func (dl *dialer) Write(p []byte) (n int, err error) {
return dl.conn.Write(p)
}

func (dl *dialer) writePacket(p packet.Packet) error {
func (dl *dialer) WritePacket(p packet.Packet) error {
if err := dl.checkConnection(); err != nil {
return err
}
Expand All @@ -743,7 +743,7 @@ func (dl *dialer) writePacket(p packet.Packet) error {
return fmt.Errorf("no connection")
}

return dl.conn.writePacket(p)
return dl.conn.WritePacket(p)
}

func (dl *dialer) SetDeadline(t time.Time) error { return dl.conn.SetDeadline(t) }
Expand Down
21 changes: 2 additions & 19 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ type PubSub interface {
Subscribe(c Conn) error
}

type packetReadWriter interface {
readPacket() (packet.Packet, error)
writePacket(p packet.Packet) error
}

// pubSub is an implementation of the PubSub interface
type pubSub struct {
incoming chan packet.Packet
Expand Down Expand Up @@ -107,12 +102,6 @@ func (pb *pubSub) Publish(c Conn) error {

var p packet.Packet
var err error
conn, ok := c.(packetReadWriter)
if !ok {
err := fmt.Errorf("the provided connection is not a SRT connection")
pb.logger.Print("pubsub:error", 0, 1, func() string { return err.Error() })
return err
}

socketId := c.SocketId()

Expand All @@ -121,7 +110,7 @@ func (pb *pubSub) Publish(c Conn) error {
pb.publish = true

for {
p, err = conn.readPacket()
p, err = c.ReadPacket()
if err != nil {
pb.logger.Print("pubsub:error", socketId, 1, func() string { return err.Error() })
break
Expand All @@ -142,12 +131,6 @@ func (pb *pubSub) Publish(c Conn) error {
func (pb *pubSub) Subscribe(c Conn) error {
l := make(chan packet.Packet, 1024)
socketId := c.SocketId()
conn, ok := c.(packetReadWriter)
if !ok {
err := fmt.Errorf("the provided connection is not a SRT connection")
pb.logger.Print("pubsub:error", 0, 1, func() string { return err.Error() })
return err
}

pb.logger.Print("pubsub:subscribe", socketId, 1, func() string { return "new subscriber" })

Expand All @@ -166,7 +149,7 @@ func (pb *pubSub) Subscribe(c Conn) error {
case <-pb.ctx.Done():
return io.EOF
case p := <-l:
err := conn.writePacket(p)
err := c.WritePacket(p)
p.Decommission()
if err != nil {
pb.logger.Print("pubsub:error", socketId, 1, func() string { return err.Error() })
Expand Down

0 comments on commit 06384dd

Please sign in to comment.