Skip to content

Commit

Permalink
Fix: Support multiple subgroup streams (#164)
Browse files Browse the repository at this point in the history
* fix: support multiple subgroup stream

* fix: spawn thread per subgroup stream

* fix: create stream per subgroup

* check subgroup support

* refactor: separate files

* fix: start forwarders for current group if contents exists
  • Loading branch information
tetter27 authored Feb 11, 2025
1 parent 722f0b2 commit 0b19666
Show file tree
Hide file tree
Showing 24 changed files with 3,606 additions and 2,364 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ Supported version: draft-ietf-moq-transport-06
- [ ] TRACK_STATUS
- [x] SUBSCRIBE_NAMESPACE_OK
- [x] SUBSCRIBE_NAMESPACE_ERROR
- [ ] Data Streams
- [x] Data Streams
- [x] Datagram
- [x] Track Stream
- [ ] Subgroup Stream
- [x] Subgroup Stream
- [ ] Features
- [x] Manage Publisher / Subscriber
- [x] Forword Messages
Expand Down
10 changes: 8 additions & 2 deletions js/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ <h4>Header</h4>
<label>Track Alias: <input type="text" name="object-track-alias" id="" value="0" /> </label>
<br />
<div id="subgroupHeaderContents" style="display: none;">
<label>Group ID: <input type="text" name="subgroup-group-id" id="" value="0" /> </label>
Group ID: <p id="mutableGroupId">0</p>
<br />
<button type="button" id="descendMutableGroupIdBtn">&lt;</button>
<button type="button" id="ascendMutableGroupIdBtn">&gt;</button>
<br />
Subgroup ID: <p id="mutableSubgroupId">0</p>
<br />
<label>Subgroup ID: <input type="text" name="subgroup-id" id="" value="0" /> </label>
<button type="button" id="descendMutableSubgroupIdBtn">&lt;</button>
<button type="button" id="ascendMutableSubgroupIdBtn">&gt;</button>
<br />
</div>
<label>Publisher Priority: <input type="text" name="publisher-priority" id="" value="0" /> </label>
Expand Down
54 changes: 43 additions & 11 deletions js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import init, { MOQTClient } from './pkg/moqt_client_sample'
init().then(async () => {
console.log('init wasm-pack')

let headerSend = false
let trackHeaderSent = false
const subgroupHeaderSent = new Set()
let objectId = 0n
let mutableGroupId = 0n
let mutableSubgroupId = 0n

const connectBtn = document.getElementById('connectBtn')
connectBtn.addEventListener('click', async () => {
Expand Down Expand Up @@ -99,6 +101,7 @@ init().then(async () => {

const objectIdElement = document.getElementById('objectId')
const mutableGroupIdElement = document.getElementById('mutableGroupId')
const mutableSubgroupIdElement = document.getElementById('mutableSubgroupId')

const sendSetupBtn = document.getElementById('sendSetupBtn')
sendSetupBtn.addEventListener('click', async () => {
Expand Down Expand Up @@ -195,9 +198,9 @@ init().then(async () => {
const objectPayloadArray = new TextEncoder().encode(objectPayloadString)

// send header if it is the first time
if (!headerSend) {
if (!trackHeaderSent) {
await client.sendTrackStreamHeaderMessage(BigInt(subscribeId), BigInt(trackAlias), publisherPriority)
headerSend = true
trackHeaderSent = true
}

await client.sendTrackStreamObject(BigInt(subscribeId), mutableGroupId, objectId++, objectPayloadArray)
Expand All @@ -208,38 +211,44 @@ init().then(async () => {
sendSubgroupObjectBtn.addEventListener('click', async () => {
console.log('send subgroup stream object btn clicked')
const subscribeId = form['object-subscribe-id'].value
const groupId = form['subgroup-group-id'].value
const subgroupId = form['subgroup-id'].value
const trackAlias = form['object-track-alias'].value
const publisherPriority = form['publisher-priority'].value
const objectPayloadString = form['object-payload'].value

// encode the text to the object array
const objectPayloadArray = new TextEncoder().encode(objectPayloadString)
const key = `${mutableGroupId}:${mutableSubgroupId}`

// send header if it is the first time
if (!headerSend) {
if (!subgroupHeaderSent.has(key)) {
await client.sendSubgroupStreamHeaderMessage(
BigInt(subscribeId),
BigInt(trackAlias),
BigInt(groupId),
BigInt(subgroupId),
mutableGroupId,
mutableSubgroupId,
publisherPriority
)
headerSend = true
subgroupHeaderSent.add(key)
}

await client.sendSubgroupStreamObject(subscribeId, objectId++, objectPayloadArray)
await client.sendSubgroupStreamObject(
subscribeId,
mutableGroupId,
mutableSubgroupId,
objectId++,
objectPayloadArray
)
objectIdElement.textContent = objectId
})

const ascendMutableGroupId = document.getElementById('ascendMutableGroupIdBtn')
ascendMutableGroupId.addEventListener('click', async () => {
mutableGroupId++
mutableSubgroupId = 0n
objectId = 0n
console.log('ascend mutableGroupId', mutableGroupId)

mutableGroupIdElement.textContent = mutableGroupId
mutableSubgroupIdElement.textContent = mutableSubgroupId
objectIdElement.textContent = objectId
})

Expand All @@ -249,9 +258,32 @@ init().then(async () => {
return
}
mutableGroupId--
mutableSubgroupId = 0n
objectId = 0n
console.log('descend mutableGroupId', mutableGroupId)
mutableGroupIdElement.textContent = mutableGroupId
mutableSubgroupIdElement.textContent = mutableSubgroupId
objectIdElement.textContent = objectId
})

const ascendMutableSubgroupId = document.getElementById('ascendMutableSubgroupIdBtn')
ascendMutableSubgroupId.addEventListener('click', async () => {
mutableSubgroupId++
objectId = 0n
console.log('ascend mutableSubgroupId', mutableSubgroupId)
mutableSubgroupIdElement.textContent = mutableSubgroupId
objectIdElement.textContent = objectId
})

const descendMutableSubroupId = document.getElementById('descendMutableSubgroupIdBtn')
descendMutableSubroupId.addEventListener('click', async () => {
if (mutableSubgroupId === 0n) {
return
}
mutableSubgroupId--
objectId = 0n
console.log('descend mutableSubgroupId', mutableSubgroupId)
mutableSubgroupIdElement.textContent = mutableSubgroupId
objectIdElement.textContent = objectId
})

Expand Down
94 changes: 62 additions & 32 deletions moqt-client-sample/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ fn main() {
utils::set_panic_hook();
}

#[cfg(web_sys_unstable_apis)]
type SubscribeId = u64;
#[cfg(web_sys_unstable_apis)]
type GroupId = u64;
#[cfg(web_sys_unstable_apis)]
type SubgroupId = u64;
#[cfg(web_sys_unstable_apis)]
type WriterKey = (SubscribeId, Option<(GroupId, SubgroupId)>);

#[cfg(web_sys_unstable_apis)]
#[wasm_bindgen]
pub struct MOQTClient {
Expand All @@ -76,7 +85,7 @@ pub struct MOQTClient {
transport: Rc<RefCell<Option<web_sys::WebTransport>>>,
control_stream_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
datagram_writer: Rc<RefCell<Option<web_sys::WritableStreamDefaultWriter>>>,
stream_writers: Rc<RefCell<HashMap<u64, web_sys::WritableStreamDefaultWriter>>>,
stream_writers: Rc<RefCell<HashMap<WriterKey, web_sys::WritableStreamDefaultWriter>>>,
callbacks: Rc<RefCell<MOQTCallbacks>>,
}

Expand Down Expand Up @@ -557,7 +566,7 @@ impl MOQTClient {
.get_writer()?;
*self.datagram_writer.borrow_mut() = Some(datagram_writer);
}
"track" | "subgroup" => {
"track" => {
let send_uni_stream = web_sys::WritableStream::from(
JsFuture::from(
self.transport
Expand All @@ -570,9 +579,13 @@ impl MOQTClient {
);
let send_uni_stream_writer = send_uni_stream.get_writer()?;

let writer_key = (subscribe_id, None);
self.stream_writers
.borrow_mut()
.insert(subscribe_id, send_uni_stream_writer);
.insert(writer_key, send_uni_stream_writer);
}
"subgroup" => {
// Writer will be generated when sending in a new Subgroup Stream
}
_ => {}
}
Expand Down Expand Up @@ -780,7 +793,8 @@ impl MOQTClient {
publisher_priority: u8,
) -> Result<JsValue, JsValue> {
let stream_writers = self.stream_writers.borrow();
if let Some(writer) = stream_writers.get(&subscribe_id) {
let writer_key = (subscribe_id, None);
if let Some(writer) = stream_writers.get(&writer_key) {
let track_stream_header_message =
track_stream::Header::new(subscribe_id, track_alias, publisher_priority).unwrap();
let mut track_stream_header_message_buf = BytesMut::new();
Expand Down Expand Up @@ -820,7 +834,8 @@ impl MOQTClient {
object_payload: Vec<u8>,
) -> Result<JsValue, JsValue> {
let stream_writers = self.stream_writers.borrow();
if let Some(writer) = stream_writers.get(&subscribe_id) {
let writer_key = (subscribe_id, None);
if let Some(writer) = stream_writers.get(&writer_key) {
let track_stream_object =
track_stream::Object::new(group_id, object_id, None, object_payload).unwrap();
let mut track_stream_object_buf = BytesMut::new();
Expand Down Expand Up @@ -856,44 +871,59 @@ impl MOQTClient {
subgroup_id: u64,
publisher_priority: u8,
) -> Result<JsValue, JsValue> {
let stream_writers = self.stream_writers.borrow();
if let Some(writer) = stream_writers.get(&subscribe_id) {
let subgroup_stream_header_message = subgroup_stream::Header::new(
subscribe_id,
track_alias,
group_id,
subgroup_id,
publisher_priority,
)
.unwrap();
let mut subgroup_stream_header_message_buf = BytesMut::new();
let _ =
subgroup_stream_header_message.packetize(&mut subgroup_stream_header_message_buf);

let mut buf = Vec::new();
// Message Type
buf.extend(write_variable_integer(
u8::from(DataStreamType::StreamHeaderSubgroup) as u64,
));
buf.extend(subgroup_stream_header_message_buf);

let buffer = js_sys::Uint8Array::new_with_length(buf.len() as u32);
buffer.copy_from(&buf);
JsFuture::from(writer.write_with_chunk(&buffer)).await
} else {
return Err(JsValue::from_str("stream_writer is None"));
let mut stream_writers = self.stream_writers.borrow_mut();
let writer_key = (subscribe_id, Some((group_id, subgroup_id)));
if stream_writers.get(&writer_key).is_none() {
let send_uni_stream = web_sys::WritableStream::from(
JsFuture::from(
self.transport
.borrow()
.as_ref()
.unwrap()
.create_unidirectional_stream(),
)
.await?,
);
let send_uni_stream_writer = send_uni_stream.get_writer()?;
stream_writers.insert(writer_key, send_uni_stream_writer);
}

let writer = stream_writers.get(&writer_key).unwrap();
let subgroup_stream_header_message = subgroup_stream::Header::new(
subscribe_id,
track_alias,
group_id,
subgroup_id,
publisher_priority,
)
.unwrap();
let mut subgroup_stream_header_message_buf = BytesMut::new();
let _ = subgroup_stream_header_message.packetize(&mut subgroup_stream_header_message_buf);

let mut buf = Vec::new();
// Message Type
buf.extend(write_variable_integer(
u8::from(DataStreamType::StreamHeaderSubgroup) as u64,
));
buf.extend(subgroup_stream_header_message_buf);

let buffer = js_sys::Uint8Array::new_with_length(buf.len() as u32);
buffer.copy_from(&buf);
JsFuture::from(writer.write_with_chunk(&buffer)).await
}

#[wasm_bindgen(js_name = sendSubgroupStreamObject)]
pub async fn send_subgroup_stream_object(
&self,
subscribe_id: u64,
group_id: u64,
subgroup_id: u64,
object_id: u64,
object_payload: Vec<u8>,
) -> Result<JsValue, JsValue> {
let stream_writers = self.stream_writers.borrow();
if let Some(writer) = stream_writers.get(&subscribe_id) {
let writer_key = (subscribe_id, Some((group_id, subgroup_id)));
if let Some(writer) = stream_writers.get(&writer_key) {
let subgroup_stream_object =
subgroup_stream::Object::new(object_id, None, object_payload).unwrap();
let mut subgroup_stream_object_buf = BytesMut::new();
Expand Down
7 changes: 5 additions & 2 deletions moqt-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ pub use modules::config::MOQTConfig;
use modules::{
buffer_manager::{buffer_manager, BufferCommand},
logging::init_logging,
object_cache_storage::{object_cache_storage, ObjectCacheStorageCommand},
object_cache_storage::{
cache::SubgroupStreamId, commands::ObjectCacheStorageCommand, storage::object_cache_storage,
},
pubsub_relation_manager::{commands::PubSubRelationCommand, manager::pubsub_relation_manager},
send_stream_dispatcher::{send_stream_dispatcher, SendStreamDispatchCommand},
server_processes::{
Expand All @@ -23,7 +25,8 @@ use moqt_core::{
};

type SubscribeId = u64;
pub(crate) type SenderToOpenSubscription = Sender<(SubscribeId, DataStreamType)>;
pub(crate) type SenderToOpenSubscription =
Sender<(SubscribeId, DataStreamType, Option<SubgroupStreamId>)>;
pub(crate) type TerminationError = (TerminationErrorCode, String);

pub struct MOQTServer {
Expand Down
5 changes: 3 additions & 2 deletions moqt-server/src/modules/message_handlers/control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::modules::{
},
},
moqt_client::MOQTClientStatus,
object_cache_storage::ObjectCacheStorageWrapper,
object_cache_storage::wrapper::ObjectCacheStorageWrapper,
};
use crate::SenderToOpenSubscription;
use anyhow::{bail, Result};
Expand Down Expand Up @@ -352,7 +352,8 @@ pub(crate) mod test_helper_fn {
message_handlers::control_message::{control_message_handler, MessageProcessResult},
moqt_client::{MOQTClient, MOQTClientStatus},
object_cache_storage::{
object_cache_storage, ObjectCacheStorageCommand, ObjectCacheStorageWrapper,
commands::ObjectCacheStorageCommand, storage::object_cache_storage,
wrapper::ObjectCacheStorageWrapper,
},
pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down
Loading

0 comments on commit 0b19666

Please sign in to comment.