Skip to content

Commit

Permalink
Add new APIs to enable simulcast probes without dropping packets
Browse files Browse the repository at this point in the history
  • Loading branch information
cptpcrd committed May 29, 2024
1 parent f598d3b commit a732e10
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
7 changes: 6 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ type streamSession interface {
decrypt([]byte) error
}

type newStream struct {
readStream readStream
payloadType uint8
}

type session struct {
localContextMutex sync.Mutex
localContext, remoteContext *Context
localOptions, remoteOptions []ContextOption

newStream chan readStream
newStream chan newStream
acceptStreamTimeout time.Time

started chan interface{}
Expand Down
10 changes: 5 additions & 5 deletions session_srtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewSessionSRTCP(conn net.Conn, config *Config) (*SessionSRTCP, error) { //n
localOptions: localOpts,
remoteOptions: remoteOpts,
readStreams: map[uint32]readStream{},
newStream: make(chan readStream),
newStream: make(chan newStream),
acceptStreamTimeout: config.AcceptStreamTimeout,
started: make(chan interface{}),
closed: make(chan interface{}),
Expand Down Expand Up @@ -93,17 +93,17 @@ func (s *SessionSRTCP) OpenReadStream(ssrc uint32) (*ReadStreamSRTCP, error) {

// AcceptStream returns a stream to handle RTCP for a single SSRC
func (s *SessionSRTCP) AcceptStream() (*ReadStreamSRTCP, uint32, error) {
stream, ok := <-s.newStream
newStream, ok := <-s.newStream
if !ok {
return nil, 0, errStreamAlreadyClosed
}

readStream, ok := stream.(*ReadStreamSRTCP)
readStream, ok := newStream.readStream.(*ReadStreamSRTCP)
if !ok {
return nil, 0, errFailedTypeAssertion
}

return readStream, stream.GetSSRC(), nil
return readStream, readStream.GetSSRC(), nil
}

// Close ends the session
Expand Down Expand Up @@ -172,7 +172,7 @@ func (s *SessionSRTCP) decrypt(buf []byte) error {
if !s.session.acceptStreamTimeout.IsZero() {
_ = s.session.nextConn.SetReadDeadline(time.Time{})
}
s.session.newStream <- r // Notify AcceptStream
s.session.newStream <- newStream{readStream: r} // Notify AcceptStream
}

readStream, ok := r.(*ReadStreamSRTCP)
Expand Down
24 changes: 16 additions & 8 deletions session_srtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewSessionSRTP(conn net.Conn, config *Config) (*SessionSRTP, error) { //nol
localOptions: localOpts,
remoteOptions: remoteOpts,
readStreams: map[uint32]readStream{},
newStream: make(chan readStream),
newStream: make(chan newStream),
acceptStreamTimeout: config.AcceptStreamTimeout,
started: make(chan interface{}),
closed: make(chan interface{}),
Expand Down Expand Up @@ -93,19 +93,26 @@ func (s *SessionSRTP) OpenReadStream(ssrc uint32) (*ReadStreamSRTP, error) {
return nil, errFailedTypeAssertion
}

// AcceptStream returns a stream to handle RTCP for a single SSRC
// AcceptStream returns a stream to handle RTP for a single SSRC
func (s *SessionSRTP) AcceptStream() (*ReadStreamSRTP, uint32, error) {
stream, ok := <-s.newStream
readStream, ssrc, _, err := s.AcceptStreamWithPayloadType()
return readStream, ssrc, err
}

// AcceptStreamWithPayloadType returns a stream to handle RTP for a single SSRC.
// It returns the payload type as well as the SSRC.
func (s *SessionSRTP) AcceptStreamWithPayloadType() (*ReadStreamSRTP, uint32, uint8, error) {
newStream, ok := <-s.newStream
if !ok {
return nil, 0, errStreamAlreadyClosed
return nil, 0, 0, errStreamAlreadyClosed
}

readStream, ok := stream.(*ReadStreamSRTP)
readStream, ok := newStream.readStream.(*ReadStreamSRTP)
if !ok {
return nil, 0, errFailedTypeAssertion
return nil, 0, 0, errFailedTypeAssertion

Check warning on line 112 in session_srtp.go

View check run for this annotation

Codecov / codecov/patch

session_srtp.go#L112

Added line #L112 was not covered by tests
}

return readStream, stream.GetSSRC(), nil
return readStream, readStream.GetSSRC(), newStream.payloadType, nil
}

// Close ends the session
Expand Down Expand Up @@ -178,7 +185,8 @@ func (s *SessionSRTP) decrypt(buf []byte) error {
if !s.session.acceptStreamTimeout.IsZero() {
_ = s.session.nextConn.SetReadDeadline(time.Time{})
}
s.session.newStream <- r // Notify AcceptStream
// notify AcceptStream
s.session.newStream <- newStream{readStream: r, payloadType: h.PayloadType}
}

readStream, ok := r.(*ReadStreamSRTP)
Expand Down
25 changes: 25 additions & 0 deletions stream_srtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"io"
"sync"
"sync/atomic"
"time"

"github.com/pion/rtp"
Expand All @@ -27,6 +28,9 @@ type ReadStreamSRTP struct {
isInited bool

buffer io.ReadWriteCloser

peekedPacket atomic.Value

Check failure on line 32 in stream_srtp.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
peekedPacketMu sync.Mutex
}

// Used by getOrCreateReadStream
Expand Down Expand Up @@ -74,8 +78,29 @@ func (r *ReadStreamSRTP) write(buf []byte) (n int, err error) {
return n, err
}

// Peek reads the next full RTP packet from the nextConn, but queues it internally.
// The next call to Read (or the next call to Peek without a call to Read in between)
// will return the same packet again.
func (r *ReadStreamSRTP) Peek(buf []byte) (int, error) {
r.peekedPacketMu.Lock()
defer r.peekedPacketMu.Unlock()
if pkt, ok := r.peekedPacket.Swap((*[]byte)(nil)).(*[]byte); ok && pkt != nil {
return copy(buf, *pkt), nil

Check warning on line 88 in stream_srtp.go

View check run for this annotation

Codecov / codecov/patch

stream_srtp.go#L84-L88

Added lines #L84 - L88 were not covered by tests
}
n, err := r.buffer.Read(buf)
if err == nil {
peekedPacket := make([]byte, n)
copy(peekedPacket, buf)
r.peekedPacket.Store(&peekedPacket)

Check warning on line 94 in stream_srtp.go

View check run for this annotation

Codecov / codecov/patch

stream_srtp.go#L90-L94

Added lines #L90 - L94 were not covered by tests
}
return n, err

Check warning on line 96 in stream_srtp.go

View check run for this annotation

Codecov / codecov/patch

stream_srtp.go#L96

Added line #L96 was not covered by tests
}

// Read reads and decrypts full RTP packet from the nextConn
func (r *ReadStreamSRTP) Read(buf []byte) (int, error) {
if pkt, ok := r.peekedPacket.Swap((*[]byte)(nil)).(*[]byte); ok && pkt != nil {
return copy(buf, *pkt), nil

Check warning on line 102 in stream_srtp.go

View check run for this annotation

Codecov / codecov/patch

stream_srtp.go#L102

Added line #L102 was not covered by tests
}
return r.buffer.Read(buf)
}

Expand Down

0 comments on commit a732e10

Please sign in to comment.