Skip to content

Commit

Permalink
edit relay process
Browse files Browse the repository at this point in the history
  • Loading branch information
OkutaniDaichi0106 committed Dec 26, 2024
1 parent f577fe0 commit fd5a23e
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 114 deletions.
118 changes: 118 additions & 0 deletions data.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,127 @@
package moqt

import "github.com/OkutaniDaichi0106/gomoqt/internal/transport"

// /*
// * data interface is implemented by dataReceiveStream and receivedDatagram.
// */
// type data interface {
// Group
// io.Reader
// }

type dataFragment interface {

//
SubscribeID() SubscribeID
TrackPriority() TrackPriority
GroupOrder() GroupOrder

//
GroupPriority() GroupPriority
GroupSequence() GroupSequence

Payload() []byte
}

var _ dataFragment = (*streamDataFragment)(nil)

type streamDataFragment struct {
trackPriority TrackPriority
groupOrder GroupOrder

streamID transport.StreamID
receivedGroup
payload []byte
}

func (d streamDataFragment) StreamID() transport.StreamID {
return d.streamID
}

func (d streamDataFragment) TrackPriority() TrackPriority {
return d.trackPriority
}

func (d streamDataFragment) GroupOrder() GroupOrder {
return d.groupOrder
}

func (d streamDataFragment) Payload() []byte {
return d.payload
}

var _ dataFragment = (*datagramData)(nil)

type datagramData struct {
trackPriority TrackPriority
groupOrder GroupOrder

receivedGroup
payload []byte
}

func (d datagramData) TrackPriority() TrackPriority {
return d.trackPriority
}

func (d datagramData) GroupOrder() GroupOrder {
return d.groupOrder
}

func (d datagramData) Payload() []byte {
return d.payload
}

/*
* dataQueue implements heap.Interface.
*/
type dataQueue []dataFragment

func (q dataQueue) Len() int {
return len(q)
}

func (q dataQueue) Less(i, j int) bool {
return schedule(q[i], q[j])
}

func (q dataQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
}

func (q *dataQueue) Push(x interface{}) {
*q = append(*q, x.(dataFragment))
}

func (q *dataQueue) Pop() interface{} {
old := *q
n := len(old)
x := old[n-1]
*q = old[:n-1]
return x
}

func schedule(a, b dataFragment) bool {
if a.SubscribeID() != b.SubscribeID() {
if a.TrackPriority() != b.TrackPriority() {
return a.TrackPriority() < b.TrackPriority()
}
}

if a.GroupPriority() != b.GroupPriority() {
return a.GroupPriority() < b.GroupPriority()
}

switch a.GroupOrder() {
case DEFAULT:
return true
case ASCENDING:
return a.GroupSequence() < b.GroupSequence()
case DESCENDING:
return a.GroupSequence() > b.GroupSequence()
default:
}

return false
}
10 changes: 5 additions & 5 deletions data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

type DataSendStream interface {
transport.SendStream
Group
SentGroup
}

var _ DataSendStream = (*dataSendStream)(nil)

type dataSendStream struct {
transport.SendStream
SentGroup
sentGroup
}

func (stream dataSendStream) Write(buf []byte) (int, error) {
Expand All @@ -34,7 +34,7 @@ func (stream dataSendStream) Write(buf []byte) (int, error) {

type DataReceiveStream interface {
transport.ReceiveStream
Group
ReceivedGroup
}

func newDataReceiveStream(stream transport.ReceiveStream) (DataReceiveStream, error) {
Expand All @@ -46,15 +46,15 @@ func newDataReceiveStream(stream transport.ReceiveStream) (DataReceiveStream, er

return &dataReceiveStream{
ReceiveStream: stream,
ReceivedGroup: group,
receivedGroup: group,
}, nil
}

var _ DataReceiveStream = (*dataReceiveStream)(nil)

type dataReceiveStream struct {
transport.ReceiveStream
ReceivedGroup
receivedGroup
}

func (stream dataReceiveStream) Read(buf []byte) (int, error) {
Expand Down
16 changes: 8 additions & 8 deletions datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

type ReceivedDatagram interface {
io.Reader
// io.Reader
Payload() []byte
Group
ReceivedGroup
}

func newReceivedDatagram(datagram []byte) (ReceivedDatagram, error) {
Expand All @@ -25,25 +25,25 @@ func newReceivedDatagram(datagram []byte) (ReceivedDatagram, error) {
}

return &receivedDatagram{
ReceivedGroup: group,
receivedGroup: group,
payload: datagram[len(datagram)-reader.Len():],
}, nil
}

var _ ReceivedDatagram = (*receivedDatagram)(nil)

type receivedDatagram struct {
ReceivedGroup
receivedGroup
payload []byte
}

func (d receivedDatagram) Payload() []byte {
return d.payload
}

func (d receivedDatagram) Read(buf []byte) (int, error) {
return copy(buf, d.payload), nil
}
// func (d receivedDatagram) Read(buf []byte) (int, error) {
// return copy(buf, d.payload), nil
// }

type receivedDatagramQueue struct {
mu sync.Mutex
Expand Down Expand Up @@ -104,7 +104,7 @@ type SentDatagram interface {
var _ SentDatagram = (*sentDatagram)(nil)

type sentDatagram struct {
SentGroup
sentGroup
payload []byte
}

Expand Down
2 changes: 1 addition & 1 deletion fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (fetch *ReceivedFetch) OpenDataStream(id SubscribeID, sequence GroupSequenc

return dataSendStream{
SendStream: fetch.stream,
SentGroup: SentGroup{
sentGroup: sentGroup{
subscribeID: id,
groupSequence: sequence,
groupPriority: priority,
Expand Down
43 changes: 32 additions & 11 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ type Group interface {
GroupSequence() GroupSequence
GroupPriority() GroupPriority
}
type ReceivedGroup interface {
Group
ReceivedAt() time.Time
}

var _ ReceivedGroup = (*receivedGroup)(nil)

type ReceivedGroup struct {
type receivedGroup struct {
subscribeID SubscribeID

groupSequence GroupSequence
Expand All @@ -28,19 +34,30 @@ type ReceivedGroup struct {
receivedAt time.Time // TODO:
}

func (g ReceivedGroup) SubscribeID() SubscribeID {
func (g receivedGroup) SubscribeID() SubscribeID {
return g.subscribeID
}

func (g ReceivedGroup) GroupSequence() GroupSequence {
func (g receivedGroup) GroupSequence() GroupSequence {
return g.groupSequence
}

func (g ReceivedGroup) GroupPriority() GroupPriority {
func (g receivedGroup) GroupPriority() GroupPriority {
return g.groupPriority
}

type SentGroup struct {
func (g receivedGroup) ReceivedAt() time.Time {
return g.receivedAt
}

type SentGroup interface {
Group
SentAt() time.Time
}

var _ SentGroup = (*sentGroup)(nil)

type sentGroup struct {
subscribeID SubscribeID

groupSequence GroupSequence
Expand All @@ -54,29 +71,33 @@ type SentGroup struct {
sentAt time.Time // TODO:
}

func (g SentGroup) SubscribeID() SubscribeID {
func (g sentGroup) SubscribeID() SubscribeID {
return g.subscribeID
}

func (g SentGroup) GroupSequence() GroupSequence {
func (g sentGroup) GroupSequence() GroupSequence {
return g.groupSequence
}

func (g SentGroup) GroupPriority() GroupPriority {
func (g sentGroup) GroupPriority() GroupPriority {
return g.groupPriority
}

func readGroup(r io.Reader) (ReceivedGroup, error) {
func (g sentGroup) SentAt() time.Time {
return g.sentAt
}

func readGroup(r io.Reader) (receivedGroup, error) {
// Read a GROUP message
var gm message.GroupMessage
err := gm.Decode(r)
if err != nil {
slog.Error("failed to read a GROUP message", slog.String("error", err.Error()))
return ReceivedGroup{}, err
return receivedGroup{}, err
}

//
return ReceivedGroup{
return receivedGroup{
subscribeID: SubscribeID(gm.SubscribeID),
groupSequence: GroupSequence(gm.GroupSequence),
groupPriority: GroupPriority(gm.GroupPriority),
Expand Down
35 changes: 0 additions & 35 deletions priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,3 @@ const (
ASCENDING GroupOrder = 0x1
DESCENDING GroupOrder = 0x2
)

type data interface {
//
SubscribeID() SubscribeID
TrackPriority() TrackPriority
GroupOrder() GroupOrder

//
GroupPriority() GroupPriority
GroupSequence() GroupSequence
}

func schedule(a, b data) bool {
if a.SubscribeID() != b.SubscribeID() {
if a.TrackPriority() != b.TrackPriority() {
return a.TrackPriority() < b.TrackPriority()
}
}

if a.GroupPriority() != b.GroupPriority() {
return a.GroupPriority() < b.GroupPriority()
}

switch a.GroupOrder() {
case DEFAULT:
return true
case ASCENDING:
return a.GroupSequence() < b.GroupSequence()
case DESCENDING:
return a.GroupSequence() > b.GroupSequence()
default:
}

return false
}
2 changes: 1 addition & 1 deletion publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func openGroupStream(conn transport.Connection) (transport.SendStream, error) {
return stream, nil
}

func sendDatagram(conn transport.Connection, g SentGroup, payload []byte) error {
func sendDatagram(conn transport.Connection, g sentGroup, payload []byte) error {
if g.groupSequence == 0 {
return errors.New("0 sequence number")
}
Expand Down
Loading

0 comments on commit fd5a23e

Please sign in to comment.