Skip to content

Commit

Permalink
Rename and reorganize Receiver and Forwarder functions (#158)
Browse files Browse the repository at this point in the history
* rename control stream functions

* rename ObjectDatagramForwarder

* rename ObjectDatagramReceiver

* rename ObjectStreamReceiver

* refactor: reorganize directory structure and rename receiver/forwarder files

* refactor: rename functions to spawn threads
  • Loading branch information
tetter27 authored Jan 19, 2025
1 parent fafe8ca commit 00a262a
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 67 deletions.
3 changes: 2 additions & 1 deletion moqt-server/src/modules/server_processes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod control_stream;
pub(crate) mod data_streams;
pub(crate) mod senders;
pub(crate) mod session_handler;
pub(crate) mod stream_and_datagram;
pub(crate) mod thread_starters;
3 changes: 3 additions & 0 deletions moqt-server/src/modules/server_processes/control_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod bi_stream;
pub(crate) mod handler;
pub(crate) mod sender;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::bi_stream::BiStream;
use crate::modules::{
buffer_manager::{request_buffer, BufferCommand},
message_handlers::control_message::{control_message_handler, MessageProcessResult},
Expand All @@ -12,9 +13,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{self};

use super::stream::BiStream;

pub(crate) async fn handle_bi_recv_stream(
pub(crate) async fn handle_control_stream(
stream: &mut BiStream,
client: Arc<Mutex<MOQTClient>>,
) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::sync::{mpsc::Receiver, Mutex};
use tracing::{self};
use wtransport::SendStream;

pub(crate) async fn forward_control_message(
pub(crate) async fn send_control_stream(
send_stream: Arc<Mutex<SendStream>>,
mut message_rx: Receiver<Arc<Box<dyn MOQTPayload>>>,
) {
Expand Down
2 changes: 2 additions & 0 deletions moqt-server/src/modules/server_processes/data_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod datagram;
pub(crate) mod stream;
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::sync::Mutex;
use tracing::{self};
use wtransport::Connection;

pub(crate) struct DatagramForwarder {
pub(crate) struct ObjectDatagramForwarder {
session: Arc<Connection>,
senders: Arc<Senders>,
downstream_subscribe_id: u64,
Expand All @@ -31,7 +31,7 @@ pub(crate) struct DatagramForwarder {
sleep_time: Duration,
}

impl DatagramForwarder {
impl ObjectDatagramForwarder {
pub(crate) async fn init(
session: Arc<Connection>,
downstream_subscribe_id: u64,
Expand All @@ -56,7 +56,7 @@ impl DatagramForwarder {

let cache_key = CacheKey::new(upstream_session_id, upstream_subscribe_id);

let datagram_forwarder = DatagramForwarder {
let object_datagram_forwarder = ObjectDatagramForwarder {
session,
senders,
downstream_subscribe_id,
Expand All @@ -65,7 +65,7 @@ impl DatagramForwarder {
sleep_time,
};

Ok(datagram_forwarder)
Ok(object_datagram_forwarder)
}

pub(crate) async fn start(&mut self) -> Result<()> {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl DatagramForwarder {
})
.await?;

tracing::info!("DatagramForwarder finished");
tracing::info!("ObjectDatagramForwarder finished");

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use wtransport::datagram::Datagram;

use self::object_cache_storage::CacheKey;

pub(crate) struct DatagramReceiver {
pub(crate) struct ObjectDatagramReceiver {
buf: Arc<Mutex<BytesMut>>,
senders: Arc<Senders>,
client: Arc<Mutex<MOQTClient>>,
duration: u64,
}

impl DatagramReceiver {
impl ObjectDatagramReceiver {
pub(crate) async fn init(client: Arc<Mutex<MOQTClient>>) -> Self {
let senders = client.lock().await.senders();
let stable_id = client.lock().await.id();
Expand All @@ -39,7 +39,7 @@ impl DatagramReceiver {
// TODO: Set the accurate duration
let duration = 100000;

DatagramReceiver {
ObjectDatagramReceiver {
buf,
senders,
client,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub(crate) mod forwarder;
pub(crate) mod receiver;
pub(crate) mod streams;
pub(crate) mod uni_stream;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::uni_stream::UniSendStream;
use crate::modules::{
buffer_manager::BufferCommand,
message_handlers::{object_stream::StreamObject, stream_header::StreamHeader},
Expand Down Expand Up @@ -25,8 +26,6 @@ use std::{sync::Arc, thread, time::Duration};
use tokio::sync::Mutex;
use tracing::{self};

use super::streams::UniSendStream;

pub(crate) struct ObjectStreamForwarder {
stream: UniSendStream,
senders: Arc<Senders>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use self::{
object_cache_storage::CacheKey, object_stream::StreamObject, stream_header::StreamHeader,
};

use super::streams::UniRecvStream;
use super::uni_stream::UniRecvStream;
use crate::{
modules::{
buffer_manager::{request_buffer, BufferCommand},
Expand Down Expand Up @@ -32,7 +32,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{self};

pub(crate) struct UniStreamReceiver {
pub(crate) struct ObjectStreamReceiver {
stream: UniRecvStream,
buf: Arc<Mutex<BytesMut>>,
senders: Arc<Senders>,
Expand All @@ -43,7 +43,7 @@ pub(crate) struct UniStreamReceiver {
upstream_subscription: Option<Subscription>,
}

impl UniStreamReceiver {
impl ObjectStreamReceiver {
pub(crate) async fn init(stream: UniRecvStream, client: Arc<Mutex<MOQTClient>>) -> Self {
let senders = client.lock().await.senders();
let stable_id = stream.stable_id();
Expand All @@ -52,7 +52,7 @@ impl UniStreamReceiver {
// TODO: Set the accurate duration
let duration = 100000;

UniStreamReceiver {
ObjectStreamReceiver {
stream,
buf,
senders,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl UniStreamReceiver {
})
.await?;

tracing::debug!("UniStreamReceiver finished");
tracing::debug!("ObjectStreamReceiver finished");

Ok(())
}
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 00a262a

Please sign in to comment.