Skip to content

Commit

Permalink
edit relay logic
Browse files Browse the repository at this point in the history
  • Loading branch information
OkutaniDaichi0106 committed Jan 11, 2025
1 parent dc36fa5 commit 1f49e3e
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 186 deletions.
16 changes: 8 additions & 8 deletions Samples/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,10 @@ func main() {
slog.Info("Active Tracks", slog.Any("announcements", announcements))

subscription := moqt.Subscription{
Track: moqt.Track{
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
},
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
}
substr, err := sess.OpenSubscribeStream(subscription)
if err != nil {
Expand All @@ -128,12 +126,14 @@ func main() {

buf := make([]byte, 1024)
n, err := stream.Read(buf)
if n > 0 {
slog.Info("received data", slog.String("data", string(buf[:n])))
}

if err != nil {
slog.Error("failed to read data", slog.String("error", err.Error()))
return
}

slog.Debug("received data", slog.String("data", string(buf[:n])))
}
}()

Expand Down
10 changes: 4 additions & 6 deletions Samples/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@ func main() {
* Subscribe
*/
subscription := moqt.Subscription{
Track: moqt.Track{
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
},
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
}
substr, err := sess.OpenSubscribeStream(subscription)
if err != nil {
Expand Down
26 changes: 10 additions & 16 deletions announce_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type receiveAnnounceStream struct {
stream transport.Stream
mu sync.RWMutex

activeMap map[string]Announcement
ch chan struct{}
annMap map[string]Announcement
ch chan struct{}
// activeCh chan []Announcement

// endedCh chan []Announcement
Expand All @@ -38,9 +38,9 @@ func (ras *receiveAnnounceStream) ReceiveAnnouncements() ([]Announcement, error)
ras.mu.Lock()
defer ras.mu.Unlock()

announcements := make([]Announcement, 0, len(ras.activeMap))
announcements := make([]Announcement, 0, len(ras.annMap))

for _, ann := range ras.activeMap {
for _, ann := range ras.annMap {
announcements = append(announcements, ann)
}

Expand Down Expand Up @@ -134,11 +134,15 @@ func (sas *sendAnnounceStream) SendAnnouncement(announcements []Announcement) er
return nil
}

func (sas *sendAnnounceStream) Close() error { // TODO
return nil
func (sas *sendAnnounceStream) Close() error {
return sas.stream.Close()
}

func (sas *sendAnnounceStream) CloseWithError(err error) error { // TODO
if err == nil {
return sas.stream.Close()
}

return nil
}

Expand All @@ -151,11 +155,6 @@ func announceActiveTrack(sas *sendAnnounceStream, ann Announcement) error {
// Get a suffix part of the Track Path
suffix := strings.TrimPrefix(ann.TrackPath, sas.interest.TrackPrefix+"/")

// Add the Authorization Info
if ann.AuthorizationInfo != "" {
ann.AnnounceParameters.Add(AUTHORIZATION_INFO, ann.AuthorizationInfo)
}

// Initialize an ANNOUNCE message
am := message.AnnounceMessage{
AnnounceStatus: message.ACTIVE,
Expand Down Expand Up @@ -184,11 +183,6 @@ func announceEndedTrack(sas *sendAnnounceStream, ann Announcement) error {
// Get a suffix part of the Track Path
suffix := strings.TrimPrefix(ann.TrackPath, sas.interest.TrackPrefix+"/")

//
if ann.AuthorizationInfo != "" {
ann.AnnounceParameters.Add(AUTHORIZATION_INFO, ann.AuthorizationInfo)
}

// Initialize an ANNOUNCE message
am := message.AnnounceMessage{
AnnounceStatus: message.ENDED,
Expand Down
25 changes: 11 additions & 14 deletions announcement.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,32 @@ const (
type AnnounceStatus message.AnnounceStatus

type Announcement struct {
AnnounceStatus AnnounceStatus

/***/
TrackPath string

/***/
AuthorizationInfo string
AnnounceParameters Parameters
}

func readAnnouncement(r io.Reader, prefix string) (AnnounceStatus, Announcement, error) {
func (a Announcement) AuthorizationInfo() (string, bool) {
return getAuthorizationInfo(a.AnnounceParameters)
}

func readAnnouncement(r io.Reader, prefix string) (Announcement, error) {
var am message.AnnounceMessage
err := am.Decode(r)
if err != nil {
slog.Error("failed to read an ANNOUNCE message", slog.String("error", err.Error()))
return 0, Announcement{}, err
return Announcement{}, err
}

// Get the full track path
trackPath := prefix + "/" + am.TrackPathSuffix

// Initialize an Announcement
ann := Announcement{
return Announcement{
AnnounceStatus: AnnounceStatus(am.AnnounceStatus),
TrackPath: trackPath,
AnnounceParameters: Parameters(am.Parameters),
}

// Set the AuthorizationInfo
if auth, ok := getAuthorizationInfo(ann.AnnounceParameters); ok {
ann.AuthorizationInfo = auth
}

return AnnounceStatus(am.AnnounceStatus), ann, nil
}, nil
}
6 changes: 0 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,6 @@ func listenUniStreams(sess *session, ctx context.Context) {
ReceivedGroup: group,
}

if err != nil {
slog.Error("failed to get a data receive stream", slog.String("error", err.Error()))
closeReceiveStreamWithInternalError(stream, err) // TODO:
return
}

queue, ok := sess.dataReceiveStreamQueues[data.SubscribeID()]
if !ok {
slog.Error("failed to get a data receive stream queue", slog.String("error", "queue not found"))
Expand Down
6 changes: 0 additions & 6 deletions downstream.go

This file was deleted.

10 changes: 5 additions & 5 deletions fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ import (
type GroupSequence message.GroupSequence

/***/
type Fetch struct {
type FetchRequest struct {
SubscribeID SubscribeID
TrackPath string
GroupPriority GroupPriority
GroupSequence GroupSequence
FrameSequence FrameSequence
}

func readFetch(r io.Reader) (Fetch, error) {
func readFetch(r io.Reader) (FetchRequest, error) {
var fm message.FetchMessage
err := fm.Decode(r)
if err != nil {
slog.Error("failed to read a FETCH message", slog.String("error", err.Error()))
return Fetch{}, err
return FetchRequest{}, err
}

req := Fetch{
req := FetchRequest{
SubscribeID: SubscribeID(fm.SubscribeID),
TrackPath: fm.TrackPath,
GroupPriority: GroupPriority(fm.GroupPriority),
Expand All @@ -44,7 +44,7 @@ func readFetch(r io.Reader) (Fetch, error) {
return req, nil
}

func writeFetch(w io.Writer, fetch Fetch) error {
func writeFetch(w io.Writer, fetch FetchRequest) error {
fm := message.FetchMessage{
SubscribeID: message.SubscribeID(fetch.SubscribeID),
TrackPath: fetch.TrackPath,
Expand Down
17 changes: 10 additions & 7 deletions fetch_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type SendFetchStream interface {
ReceiveDataStream() ReceiveDataStream

// Get a fetch
Fetch() Fetch
Fetch() FetchRequest

// Update the fetch
UpdateFetch(FetchUpdate) error
Expand All @@ -30,7 +30,7 @@ var _ SendFetchStream = (*sendFetchStream)(nil)

type sendFetchStream struct {
stream transport.Stream
fetch Fetch
fetch FetchRequest
mu sync.Mutex
}

Expand All @@ -46,7 +46,7 @@ func (sfs *sendFetchStream) ReceiveDataStream() ReceiveDataStream {
}
}

func (sfs *sendFetchStream) Fetch() Fetch {
func (sfs *sendFetchStream) Fetch() FetchRequest {
return sfs.fetch
}

Expand Down Expand Up @@ -108,8 +108,8 @@ type ReceiveFetchStream interface {
// Get a SendDataStream
SendDataStream() SendDataStream

// Get a fetch
Fetch() Fetch
// Get a fetch request
FetchRequest() FetchRequest

// Close the stream
Close() error
Expand All @@ -121,7 +121,7 @@ type ReceiveFetchStream interface {
var _ ReceiveFetchStream = (*receiveFetchStream)(nil)

type receiveFetchStream struct {
fetch Fetch
fetch FetchRequest
stream transport.Stream
mu sync.Mutex
}
Expand All @@ -137,11 +137,14 @@ func (rfs *receiveFetchStream) SendDataStream() SendDataStream {
}
}

func (rfs *receiveFetchStream) Fetch() Fetch {
func (rfs *receiveFetchStream) FetchRequest() FetchRequest {
return rfs.fetch
}

func (rfs *receiveFetchStream) CloseWithError(err error) error {
rfs.mu.Lock()
defer rfs.mu.Unlock()

if err == nil {
return rfs.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion fetch_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func writeFetchUpdate(w io.Writer, update FetchUpdate) error {
return nil
}

func updateFetch(fetch Fetch, update FetchUpdate) (Fetch, error) {
func updateFetch(fetch FetchRequest, update FetchUpdate) (FetchRequest, error) {
fetch.GroupPriority = update.GroupPriority

return fetch, nil
Expand Down
2 changes: 1 addition & 1 deletion parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (p Parameters) ReadAsBool(key uint64) (bool, error) {
var ErrParameterNotFound = errors.New("parameter not found")

const (
ROLE uint64 = 0x00
// ROLE uint64 = 0x00
PATH uint64 = 0x01
MAX_SUBSCRIBE_ID uint64 = 0x02
AUTHORIZATION_INFO uint64 = 0x03
Expand Down
Loading

0 comments on commit 1f49e3e

Please sign in to comment.