diff --git a/moqt-server/src/modules/server_processes.rs b/moqt-server/src/modules/server_processes.rs index cd516a0..8a57161 100644 --- a/moqt-server/src/modules/server_processes.rs +++ b/moqt-server/src/modules/server_processes.rs @@ -1,4 +1,5 @@ +pub(crate) mod control_stream; +pub(crate) mod data_streams; pub(crate) mod senders; pub(crate) mod session_handler; -pub(crate) mod stream_and_datagram; pub(crate) mod thread_starters; diff --git a/moqt-server/src/modules/server_processes/control_stream.rs b/moqt-server/src/modules/server_processes/control_stream.rs new file mode 100644 index 0000000..79fb720 --- /dev/null +++ b/moqt-server/src/modules/server_processes/control_stream.rs @@ -0,0 +1,3 @@ +pub(crate) mod bi_stream; +pub(crate) mod handler; +pub(crate) mod sender; diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/stream.rs b/moqt-server/src/modules/server_processes/control_stream/bi_stream.rs similarity index 100% rename from moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/stream.rs rename to moqt-server/src/modules/server_processes/control_stream/bi_stream.rs diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/receiver.rs b/moqt-server/src/modules/server_processes/control_stream/handler.rs similarity index 97% rename from moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/receiver.rs rename to moqt-server/src/modules/server_processes/control_stream/handler.rs index 13a2942..277f265 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/receiver.rs +++ b/moqt-server/src/modules/server_processes/control_stream/handler.rs @@ -1,3 +1,4 @@ +use super::bi_stream::BiStream; use crate::modules::{ buffer_manager::{request_buffer, BufferCommand}, message_handlers::control_message::{control_message_handler, MessageProcessResult}, @@ -12,9 +13,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{self}; -use super::stream::BiStream; - -pub(crate) async fn handle_bi_recv_stream( +pub(crate) async fn handle_control_stream( stream: &mut BiStream, client: Arc>, ) -> Result<()> { diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/forwarder.rs b/moqt-server/src/modules/server_processes/control_stream/sender.rs similarity index 98% rename from moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/forwarder.rs rename to moqt-server/src/modules/server_processes/control_stream/sender.rs index 8641a52..d9bf9cf 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream/forwarder.rs +++ b/moqt-server/src/modules/server_processes/control_stream/sender.rs @@ -16,7 +16,7 @@ use tokio::sync::{mpsc::Receiver, Mutex}; use tracing::{self}; use wtransport::SendStream; -pub(crate) async fn forward_control_message( +pub(crate) async fn send_control_stream( send_stream: Arc>, mut message_rx: Receiver>>, ) { diff --git a/moqt-server/src/modules/server_processes/data_streams.rs b/moqt-server/src/modules/server_processes/data_streams.rs new file mode 100644 index 0000000..7ba1af2 --- /dev/null +++ b/moqt-server/src/modules/server_processes/data_streams.rs @@ -0,0 +1,2 @@ +pub(crate) mod datagram; +pub(crate) mod stream; diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/datagram.rs b/moqt-server/src/modules/server_processes/data_streams/datagram.rs similarity index 100% rename from moqt-server/src/modules/server_processes/stream_and_datagram/datagram.rs rename to moqt-server/src/modules/server_processes/data_streams/datagram.rs diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/datagram/forwarder.rs b/moqt-server/src/modules/server_processes/data_streams/datagram/forwarder.rs similarity index 97% rename from moqt-server/src/modules/server_processes/stream_and_datagram/datagram/forwarder.rs rename to moqt-server/src/modules/server_processes/data_streams/datagram/forwarder.rs index a62a07c..8fcfd7e 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/datagram/forwarder.rs +++ b/moqt-server/src/modules/server_processes/data_streams/datagram/forwarder.rs @@ -22,7 +22,7 @@ use tokio::sync::Mutex; use tracing::{self}; use wtransport::Connection; -pub(crate) struct DatagramForwarder { +pub(crate) struct ObjectDatagramForwarder { session: Arc, senders: Arc, downstream_subscribe_id: u64, @@ -31,7 +31,7 @@ pub(crate) struct DatagramForwarder { sleep_time: Duration, } -impl DatagramForwarder { +impl ObjectDatagramForwarder { pub(crate) async fn init( session: Arc, downstream_subscribe_id: u64, @@ -56,7 +56,7 @@ impl DatagramForwarder { let cache_key = CacheKey::new(upstream_session_id, upstream_subscribe_id); - let datagram_forwarder = DatagramForwarder { + let object_datagram_forwarder = ObjectDatagramForwarder { session, senders, downstream_subscribe_id, @@ -65,7 +65,7 @@ impl DatagramForwarder { sleep_time, }; - Ok(datagram_forwarder) + Ok(object_datagram_forwarder) } pub(crate) async fn start(&mut self) -> Result<()> { @@ -96,7 +96,7 @@ impl DatagramForwarder { }) .await?; - tracing::info!("DatagramForwarder finished"); + tracing::info!("ObjectDatagramForwarder finished"); Ok(()) } diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/datagram/receiver.rs b/moqt-server/src/modules/server_processes/data_streams/datagram/receiver.rs similarity index 98% rename from moqt-server/src/modules/server_processes/stream_and_datagram/datagram/receiver.rs rename to moqt-server/src/modules/server_processes/data_streams/datagram/receiver.rs index 4d1d01b..42ca9ff 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/datagram/receiver.rs +++ b/moqt-server/src/modules/server_processes/data_streams/datagram/receiver.rs @@ -23,14 +23,14 @@ use wtransport::datagram::Datagram; use self::object_cache_storage::CacheKey; -pub(crate) struct DatagramReceiver { +pub(crate) struct ObjectDatagramReceiver { buf: Arc>, senders: Arc, client: Arc>, duration: u64, } -impl DatagramReceiver { +impl ObjectDatagramReceiver { pub(crate) async fn init(client: Arc>) -> Self { let senders = client.lock().await.senders(); let stable_id = client.lock().await.id(); @@ -39,7 +39,7 @@ impl DatagramReceiver { // TODO: Set the accurate duration let duration = 100000; - DatagramReceiver { + ObjectDatagramReceiver { buf, senders, client, diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream.rs b/moqt-server/src/modules/server_processes/data_streams/stream.rs similarity index 65% rename from moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream.rs rename to moqt-server/src/modules/server_processes/data_streams/stream.rs index 5e95ae1..084692c 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream.rs +++ b/moqt-server/src/modules/server_processes/data_streams/stream.rs @@ -1,3 +1,3 @@ pub(crate) mod forwarder; pub(crate) mod receiver; -pub(crate) mod streams; +pub(crate) mod uni_stream; diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/forwarder.rs b/moqt-server/src/modules/server_processes/data_streams/stream/forwarder.rs similarity index 99% rename from moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/forwarder.rs rename to moqt-server/src/modules/server_processes/data_streams/stream/forwarder.rs index e412736..0258a63 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/forwarder.rs +++ b/moqt-server/src/modules/server_processes/data_streams/stream/forwarder.rs @@ -1,3 +1,4 @@ +use super::uni_stream::UniSendStream; use crate::modules::{ buffer_manager::BufferCommand, message_handlers::{object_stream::StreamObject, stream_header::StreamHeader}, @@ -25,8 +26,6 @@ use std::{sync::Arc, thread, time::Duration}; use tokio::sync::Mutex; use tracing::{self}; -use super::streams::UniSendStream; - pub(crate) struct ObjectStreamForwarder { stream: UniSendStream, senders: Arc, diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/receiver.rs b/moqt-server/src/modules/server_processes/data_streams/stream/receiver.rs similarity index 98% rename from moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/receiver.rs rename to moqt-server/src/modules/server_processes/data_streams/stream/receiver.rs index 9b6babc..7ae06c3 100644 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/receiver.rs +++ b/moqt-server/src/modules/server_processes/data_streams/stream/receiver.rs @@ -2,7 +2,7 @@ use self::{ object_cache_storage::CacheKey, object_stream::StreamObject, stream_header::StreamHeader, }; -use super::streams::UniRecvStream; +use super::uni_stream::UniRecvStream; use crate::{ modules::{ buffer_manager::{request_buffer, BufferCommand}, @@ -32,7 +32,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{self}; -pub(crate) struct UniStreamReceiver { +pub(crate) struct ObjectStreamReceiver { stream: UniRecvStream, buf: Arc>, senders: Arc, @@ -43,7 +43,7 @@ pub(crate) struct UniStreamReceiver { upstream_subscription: Option, } -impl UniStreamReceiver { +impl ObjectStreamReceiver { pub(crate) async fn init(stream: UniRecvStream, client: Arc>) -> Self { let senders = client.lock().await.senders(); let stable_id = stream.stable_id(); @@ -52,7 +52,7 @@ impl UniStreamReceiver { // TODO: Set the accurate duration let duration = 100000; - UniStreamReceiver { + ObjectStreamReceiver { stream, buf, senders, @@ -109,7 +109,7 @@ impl UniStreamReceiver { }) .await?; - tracing::debug!("UniStreamReceiver finished"); + tracing::debug!("ObjectStreamReceiver finished"); Ok(()) } diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/streams.rs b/moqt-server/src/modules/server_processes/data_streams/stream/uni_stream.rs similarity index 100% rename from moqt-server/src/modules/server_processes/stream_and_datagram/uni_directional_stream/streams.rs rename to moqt-server/src/modules/server_processes/data_streams/stream/uni_stream.rs diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram.rs b/moqt-server/src/modules/server_processes/stream_and_datagram.rs deleted file mode 100644 index 9e07c72..0000000 --- a/moqt-server/src/modules/server_processes/stream_and_datagram.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) mod bi_directional_stream; -pub(crate) mod datagram; -pub(crate) mod uni_directional_stream; diff --git a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream.rs b/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream.rs deleted file mode 100644 index f37e194..0000000 --- a/moqt-server/src/modules/server_processes/stream_and_datagram/bi_directional_stream.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) mod forwarder; -pub(crate) mod receiver; -pub(crate) mod stream; diff --git a/moqt-server/src/modules/server_processes/thread_starters.rs b/moqt-server/src/modules/server_processes/thread_starters.rs index 8bb51d5..96fbe82 100644 --- a/moqt-server/src/modules/server_processes/thread_starters.rs +++ b/moqt-server/src/modules/server_processes/thread_starters.rs @@ -1,12 +1,14 @@ -use super::stream_and_datagram::{ - bi_directional_stream::{ - forwarder::forward_control_message, receiver::handle_bi_recv_stream, stream::BiStream, +use super::{ + control_stream::{ + bi_stream::BiStream, handler::handle_control_stream, sender::send_control_stream, }, - datagram::{forwarder::DatagramForwarder, receiver::DatagramReceiver}, - uni_directional_stream::{ - forwarder::ObjectStreamForwarder, - receiver::UniStreamReceiver, - streams::{UniRecvStream, UniSendStream}, + data_streams::{ + datagram::{forwarder::ObjectDatagramForwarder, receiver::ObjectDatagramReceiver}, + stream::{ + forwarder::ObjectStreamForwarder, + receiver::ObjectStreamReceiver, + uni_stream::{UniRecvStream, UniSendStream}, + }, }, }; use crate::modules::{moqt_client::MOQTClient, send_stream_dispatcher::SendStreamDispatchCommand}; @@ -21,7 +23,7 @@ use tokio::sync::{mpsc, Mutex}; use tracing::{self, Instrument}; use wtransport::{datagram::Datagram, Connection, RecvStream, SendStream}; -async fn spawn_bi_stream_threads( +async fn spawn_control_stream_threads( client: Arc>, send_stream: SendStream, recv_stream: RecvStream, @@ -45,7 +47,7 @@ async fn spawn_bi_stream_threads( let stable_id = client.lock().await.id(); let session_span = tracing::info_span!("Session", stable_id); session_span.in_scope(|| { - tracing::info!("Accepted BI stream"); + tracing::info!("Accepted bi-directional stream"); }); let (message_tx, message_rx) = mpsc::channel::>>(1024); @@ -61,26 +63,26 @@ async fn spawn_bi_stream_threads( // The send_stream is wrapped with a Mutex to make it thread-safe since it can be called from multiple threads for returning and forwarding messages. let shared_send_stream = Arc::new(Mutex::new(send_stream)); - // Spawn thread listenning for WebTransport messages + // Spawn a thread to listen for control messages from the client let send_stream = Arc::clone(&shared_send_stream); let session_span = session_span.clone(); let stream_id = recv_stream.id().into_u64(); tokio::spawn( async move { let mut stream = BiStream::new(stable_id, stream_id, recv_stream, send_stream); - handle_bi_recv_stream(&mut stream, client) + handle_control_stream(&mut stream, client) .instrument(session_span) .await } .in_current_span(), ); - // Thread to forward messages (ANNOUNCE SUBSCRIBE) from the server + // Spawn a thread to send control messages: respond to the client or forward to the other client let send_stream = Arc::clone(&shared_send_stream); tokio::spawn( async move { let session_span = tracing::info_span!("Session", stable_id); - forward_control_message(send_stream, message_rx) + send_control_stream(send_stream, message_rx) .instrument(session_span) .await; } @@ -90,14 +92,14 @@ async fn spawn_bi_stream_threads( Ok(()) } -async fn spawn_uni_recv_stream_thread( +async fn spawn_object_stream_receiver_thread( client: Arc>, recv_stream: RecvStream, ) -> Result<()> { let stable_id = client.lock().await.id(); let session_span = tracing::info_span!("Session", stable_id); session_span.in_scope(|| { - tracing::info!("Accepted UNI Recv stream"); + tracing::info!("Accepted uni-directional recv stream"); }); let stream_id = recv_stream.id().into_u64(); @@ -105,11 +107,11 @@ async fn spawn_uni_recv_stream_thread( async move { let stream = UniRecvStream::new(stable_id, stream_id, recv_stream); let senders = client.lock().await.senders(); - let mut uni_stream_receiver = UniStreamReceiver::init(stream, client) + let mut object_stream_receiver = ObjectStreamReceiver::init(stream, client) .instrument(session_span.clone()) .await; - match uni_stream_receiver + match object_stream_receiver .start() .instrument(session_span.clone()) .await @@ -125,14 +127,17 @@ async fn spawn_uni_recv_stream_thread( } } - let _ = uni_stream_receiver.finish().instrument(session_span).await; + let _ = object_stream_receiver + .finish() + .instrument(session_span) + .await; } .in_current_span(), ); Ok(()) } -async fn spawn_uni_send_stream_thread( +async fn spawn_object_stream_forwarder_thread( client: Arc>, send_stream: SendStream, subscribe_id: u64, @@ -142,7 +147,7 @@ async fn spawn_uni_send_stream_thread( let session_span = tracing::info_span!("Session", stable_id); session_span.in_scope(|| { tracing::info!( - "Open UNI Send stream for stream type: {:?}", + "Open uni-directional send for stream type: {:?}", data_stream_type ); }); @@ -188,25 +193,25 @@ async fn spawn_uni_send_stream_thread( Ok(()) } -async fn spawn_recv_datagram_thread( +async fn spawn_object_datagram_receiver_thread( client: Arc>, datagram: Datagram, ) -> Result<()> { let stable_id = client.lock().await.id(); let session_span = tracing::info_span!("Session", stable_id); session_span.in_scope(|| { - tracing::info!("Accepted Datagram"); + tracing::info!("Received a datagram"); }); // No loop: End after receiving once tokio::spawn( async move { let senders = client.lock().await.senders(); - let mut datagram_receiver = DatagramReceiver::init(client) + let mut object_datagram_receiver = ObjectDatagramReceiver::init(client) .instrument(session_span.clone()) .await; - match datagram_receiver + match object_datagram_receiver .receive_object(datagram) .instrument(session_span) .await @@ -227,7 +232,7 @@ async fn spawn_recv_datagram_thread( Ok(()) } -async fn spawn_send_datagram_thread( +async fn spawn_object_datagram_forwarder_thread( client: Arc>, session: Arc, subscribe_id: u64, @@ -235,18 +240,19 @@ async fn spawn_send_datagram_thread( let stable_id = client.lock().await.id(); let session_span = tracing::info_span!("Session", stable_id); session_span.in_scope(|| { - tracing::info!("Accepted Datagram"); + tracing::info!("Open datagrams send thread"); }); tokio::spawn( async move { let senders = client.lock().await.senders(); - let mut datagram_forwarder = DatagramForwarder::init(session, subscribe_id, client) - .instrument(session_span.clone()) - .await - .unwrap(); + let mut object_datagram_forwarder = + ObjectDatagramForwarder::init(session, subscribe_id, client) + .instrument(session_span.clone()) + .await + .unwrap(); - match datagram_forwarder + match object_datagram_forwarder .start() .instrument(session_span.clone()) .await @@ -254,7 +260,7 @@ async fn spawn_send_datagram_thread( Ok(_) => {} Err(e) => { let code = TerminationErrorCode::InternalError; - let reason = format!("DatagramForwarder: {:?}", e); + let reason = format!("ObjectDatagramForwarder: {:?}", e); tracing::error!(reason); @@ -265,7 +271,10 @@ async fn spawn_send_datagram_thread( } } - let _ = datagram_forwarder.finish().instrument(session_span).await; + let _ = object_datagram_forwarder + .finish() + .instrument(session_span) + .await; } .in_current_span(), ); @@ -284,26 +293,26 @@ pub(crate) async fn select_spawn_thread( tokio::select! { stream = session.accept_bi() => { let (send_stream, recv_stream) = stream?; - spawn_bi_stream_threads(client.clone(), send_stream, recv_stream, is_control_stream_opened).await?; + spawn_control_stream_threads(client.clone(), send_stream, recv_stream, is_control_stream_opened).await?; }, stream = session.accept_uni() => { let recv_stream = stream?; - spawn_uni_recv_stream_thread(client.clone(), recv_stream).await?; + spawn_object_stream_receiver_thread(client.clone(), recv_stream).await?; }, datagram = session.receive_datagram() => { let datagram = datagram?; - spawn_recv_datagram_thread(client.clone(), datagram).await?; + spawn_object_datagram_receiver_thread(client.clone(), datagram).await?; }, - // Waiting for a uni-directional send stream open request and forwarding the message + // Waiting for requests to open a new data stream thread Some((subscribe_id, data_stream_type)) = open_downstream_stream_or_datagram_rx.recv() => { match data_stream_type { DataStreamType::StreamHeaderTrack | DataStreamType::StreamHeaderSubgroup => { let send_stream = session.open_uni().await?.await?; - spawn_uni_send_stream_thread(client.clone(), send_stream, subscribe_id, data_stream_type).await?; + spawn_object_stream_forwarder_thread(client.clone(), send_stream, subscribe_id, data_stream_type).await?; } DataStreamType::ObjectDatagram => { let session = session.clone(); - spawn_send_datagram_thread(client.clone(), session, subscribe_id).await?; + spawn_object_datagram_forwarder_thread(client.clone(), session, subscribe_id).await?; } }