Skip to content

Commit

Permalink
move datastream method
Browse files Browse the repository at this point in the history
  • Loading branch information
OkutaniDaichi0106 committed Dec 23, 2024
1 parent d830454 commit 113e2c4
Show file tree
Hide file tree
Showing 21 changed files with 794 additions and 995 deletions.
96 changes: 89 additions & 7 deletions Samples/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,56 @@ func main() {
QUICConfig: &quic.Config{},
}

sess, _, err := c.Dial("https://localhost:8080/path", context.Background())
// Get a setup request
req := moqt.SetupRequest{
URL: "https://localhost:8080/path",
}

// Dial to the server with the setup request
sess, _, err := c.Dial(req, context.Background())
if err != nil {
slog.Error(err.Error())
return
}

// Run a publisher
go func() {
pub := sess.Publisher()

pub.Announce(moqt.Announcement{TrackPathSuffix: "japan/kyoto/kiu"})
moqt.NewTrack()
for {
stream, err := pub.OpenDataStream(track, moqt.Group{})
interest, err := pub.AcceptInterest(context.Background())
if err != nil {
slog.Error("failed to accept an interest", slog.String("error", err.Error()))
return
}

tracks := moqt.NewTracks([]moqt.Track{{
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
}, {
TrackPath: "japan/kyoto/kiu/image",
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
}})

// Announce the tracks
interest.Announce(tracks)

subscription, err := pub.AcceptSubscription(context.Background())
if err != nil {
slog.Error("failed to accept a subscription", slog.String("error", err.Error()))
return
}

if subscription.TrackPath != echoTrackPath {
slog.Error("failed to get a track path", slog.String("error", "track path is invalid"))
return
}

for sequence := moqt.GroupSequence(0); sequence < 30; sequence++ {
stream, err := subscription.OpenDataStream(sequence, 0)
if err != nil {
slog.Error("failed to open a data stream", slog.String("error", err.Error()))
return
Expand All @@ -53,11 +90,56 @@ func main() {
}
}()

go func() {
// Run a subscriber
func() {
sub := sess.Subscriber()

sub.Interest(moqt.Interest{TrackPrefix: "japan/kyoto"})
interest, err := sub.Interest(moqt.Interest{TrackPrefix: echoTrackPrefix})
if err != nil {
slog.Error("failed to get an interest", slog.String("error", err.Error()))
return
}

tracks, err := interest.NextActiveTracks()
if err != nil {
slog.Error("failed to get active tracks", slog.String("error", err.Error()))
return
}

if _, ok := tracks[echoTrackPath]; !ok {
slog.Error("failed to get a track", slog.String("error", "track is not found"))
return
}

subscription, err := sub.Subscribe(moqt.Subscription{
Track: moqt.Track{
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
},
})
if err != nil {
slog.Error("failed to subscribe", slog.String("error", err.Error()))
return
}

for {
stream, err := subscription.AcceptDataStream(context.Background())
if err != nil {
slog.Error("failed to accept a data stream", slog.String("error", err.Error()))
return
}

buf := make([]byte, 1024)
n, err := stream.Read(buf)
if err != nil {
slog.Error("failed to read data", slog.String("error", err.Error()))
return
}

slog.Debug("received data", slog.String("data", string(buf[:n])))
}
}()

}
67 changes: 46 additions & 21 deletions Samples/server/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"context"
"crypto/tls"
"log/slog"
"time"

moqt "github.com/OkutaniDaichi0106/gomoqt"
"github.com/quic-go/quic-go"
Expand Down Expand Up @@ -50,40 +52,63 @@ func main() {
/*
* Interest
*/
interest := moqt.Interest{

interest, err := subscriber.Interest(moqt.Interest{
TrackPrefix: echoTrackPrefix,
}
annstr, err := subscriber.Interest(interest)
})
if err != nil {
slog.Error("failed to interest", slog.String("error", err.Error()))
return
}

/*
* Get Announcements
*/
anns, err := interest.NextActiveTracks()
if err != nil {
slog.Error("failed to get active tracks", slog.String("error", err.Error()))
return
}

_, ok := anns[echoTrackPath]
if !ok {
slog.Error("failed to get the active track", slog.String("error", "track is not active"))
return
}

subscription, err := subscriber.Subscribe(moqt.Subscription{
Track: moqt.Track{
TrackPath: echoTrackPath,
TrackPriority: 0,
GroupOrder: 0,
GroupExpires: 1 * time.Second,
},
})
if err != nil {
slog.Error("failed to subscribe", slog.String("error", err.Error()))
return
}

// Publish to the

for {
ann, err := annstr.Read()

stream, err := subscription.AcceptDataStream(context.Background())
if err != nil {
slog.Error("failed to read an announcement", slog.String("error", err.Error()))
slog.Error("failed to accept a data stream", slog.String("error", err.Error()))
return
}
slog.Info("Received an announcement", slog.Any("announcement", ann))

/*
* Subscribe
*/
subscription := moqt.Subscription{
TrackPath: echoTrackPath,
}
go func(stream moqt.DataReceiveStream) {
for {
buf := make([]byte, 1024)
n, err := stream.Read(buf)
if err != nil {
slog.Error("failed to read data", slog.String("error", err.Error()))
return
}
slog.Info("Received", slog.String("data", string(buf[:n])))

info, err := subscriber.Subscribe(subscription)
if err != nil {
slog.Error("failed to subscribe", slog.String("error", err.Error()))
return
}
}

slog.Info("successfully subscribed", slog.Any("subscription", subscription), slog.Any("info", info))
}(stream)
}
})

Expand Down
66 changes: 51 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,20 @@ func setupConnection(req SetupRequest, conn transport.Connection) (ClientSession
},
}

go listenSession(&sess.session, context.Background()) // TODO:

return sess, rsp, nil
}

func listen(sess *ClientSession, ctx context.Context) {
func listenSession(sess *session, ctx context.Context) {
// Listen the bidirectional streams
listenBiStreams(sess, ctx)
go listenBiStreams(sess, ctx)

// Listen the unidirectional streams
listenUniStreams(sess, ctx)
go listenUniStreams(sess, ctx)

// Listen the datagrams
go listenDatagrams(sess, ctx)
}

func openSessionStream(conn transport.Connection) (SessionStream, error) {
Expand Down Expand Up @@ -223,7 +228,7 @@ func sendSetupRequest(w io.Writer, req SetupRequest) error {
return nil
}

func listenBiStreams(sess *ClientSession, ctx context.Context) {
func listenBiStreams(sess *session, ctx context.Context) {
for {
/*
* Accept a bidirectional stream
Expand Down Expand Up @@ -301,15 +306,8 @@ func listenBiStreams(sess *ClientSession, ctx context.Context) {
return
}

// Get the track
track, ok := sess.publisherManager.tracks[req.TrackPath]
if !ok {
slog.Error("track does not exist", slog.String("track path", req.TrackPath))
req.CloseWithError(ErrTrackDoesNotExist)
return
}

req.Inform(track.Info())
// Enqueue the info-request
sess.publisherManager.receivedInfoRequestQueue.Enqueue(req)
default:
slog.Debug("An unknown type of stream was opend")

Expand All @@ -322,7 +320,7 @@ func listenBiStreams(sess *ClientSession, ctx context.Context) {
}
}

func listenUniStreams(sess *ClientSession, ctx context.Context) {
func listenUniStreams(sess *session, ctx context.Context) {
for {
/*
* Accept a unidirectional stream
Expand Down Expand Up @@ -359,8 +357,15 @@ func listenUniStreams(sess *ClientSession, ctx context.Context) {
return
}

subscription, ok := sess.subscriberManager.getSentSubscription(data.SubscribeID())
if !ok {
slog.Error("failed to get a subscription", slog.String("error", "subscription not found"))
closeReceiveStreamWithInternalError(stream, ErrProtocolViolation) // TODO:
return
}

// Enqueue the receiver
sess.subscriberManager.dataReceiverQueue.Enqueue(data)
subscription.dataReceiveStreamQueue.Enqueue(data)
default:
slog.Debug("An unknown type of stream was opend")

Expand All @@ -373,6 +378,37 @@ func listenUniStreams(sess *ClientSession, ctx context.Context) {
}
}

func listenDatagrams(sess *session, ctx context.Context) {
for {
/*
* Receive a datagram
*/
buf, err := sess.conn.ReceiveDatagram(ctx)
if err != nil {
slog.Error("failed to receive a datagram", slog.String("error", err.Error()))
return
}

// Handle the datagram
go func(buf []byte) {
data, err := newReceivedDatagram(buf)
if err != nil {
slog.Error("failed to get a received datagram", slog.String("error", err.Error()))
return
}

subscription, ok := sess.subscriberManager.getSentSubscription(data.SubscribeID())
if !ok {
slog.Error("failed to get a subscription", slog.String("error", "subscription not found"))
return
}

// Enqueue the datagram
subscription.receivedDatagramQueue.Enqueue(data)
}(buf)
}
}

func closeStreamWithInternalError(stream transport.Stream, err error) {
if err == nil {
stream.Close()
Expand Down
Loading

0 comments on commit 113e2c4

Please sign in to comment.