Skip to content

Commit

Permalink
Merge pull request #133 from nttcom/refactor/tidy_message_handlers
Browse files Browse the repository at this point in the history
Tidy up message handlers directory
  • Loading branch information
yuki-uchida authored Dec 4, 2024
2 parents 6ea4ffb + ad7ae44 commit ed11b42
Show file tree
Hide file tree
Showing 31 changed files with 94 additions and 65 deletions.
8 changes: 5 additions & 3 deletions moqt-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ use wtransport::{
mod modules;
use modules::{
buffer_manager::{buffer_manager, request_buffer, BufferCommand},
control_message_handler::{control_message_handler, MessageProcessResult},
logging,
message_handlers::{
control_message::{control_message_handler, MessageProcessResult},
object_stream::{object_stream_handler, ObjectStreamProcessResult},
stream_header::{stream_header_handler, StreamHeaderProcessResult},
},
moqt_client::MOQTClient,
object_cache_storage::{
object_cache_storage, CacheHeader, CacheObject, ObjectCacheStorageCommand,
ObjectCacheStorageWrapper,
},
object_stream_handler::{object_stream_handler, ObjectStreamProcessResult},
pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
},
send_stream_dispatcher::{
send_stream_dispatcher, SendStreamDispatchCommand, SendStreamDispatcher,
},
stream_header_handler::{stream_header_handler, StreamHeaderProcessResult},
};
pub use moqt_core::constants;
use moqt_core::{
Expand Down
6 changes: 1 addition & 5 deletions moqt-server/src/modules.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
pub mod buffer_manager;
pub(crate) mod control_message_handler;
pub(crate) mod handlers;
pub(crate) mod logging;
pub(crate) mod message_handlers;
pub(crate) mod moqt_client;
pub(crate) mod object_cache_storage;
pub(crate) mod object_stream_handler;
pub(crate) mod pubsub_relation_manager;
pub(crate) mod send_stream_dispatcher;
pub(crate) mod server_processes;
pub(crate) mod stream_header_handler;
3 changes: 3 additions & 0 deletions moqt-server/src/modules/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod control_message;
pub(crate) mod object_stream;
pub(crate) mod stream_header;
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
pub(crate) mod handlers;
pub(crate) mod server_processes;

use crate::constants::TerminationErrorCode;
use crate::modules::moqt_client::MOQTClient;
use crate::modules::{
handlers::unannounce_handler::unannounce_handler,
message_handlers::control_message::{
handlers::unannounce_handler::unannounce_handler,
server_processes::{
announce_error_message::process_announce_error_message,
announce_message::process_announce_message,
announce_ok_message::process_announce_ok_message,
client_setup_message::process_client_setup_message,
subscribe_error_message::process_subscribe_error_message,
subscribe_message::process_subscribe_message,
subscribe_namespace_message::process_subscribe_namespace_message,
subscribe_ok_message::process_subscribe_ok_message,
},
},
moqt_client::MOQTClientStatus,
object_cache_storage::ObjectCacheStorageWrapper,
server_processes::{
announce_error_message::process_announce_error_message,
announce_message::process_announce_message,
announce_ok_message::process_announce_ok_message,
client_setup_message::process_client_setup_message,
subscribe_error_message::process_subscribe_error_message,
subscribe_message::process_subscribe_message,
subscribe_namespace_message::process_subscribe_namespace_message,
subscribe_ok_message::process_subscribe_ok_message,
},
};
use crate::SenderToOpenSubscription;
use anyhow::{bail, Result};
Expand All @@ -28,8 +34,6 @@ use moqt_core::{
use std::{collections::HashMap, io::Cursor, sync::Arc};
use tokio::sync::Mutex;

use super::moqt_client::MOQTClient;

#[derive(Debug, PartialEq)]
pub enum MessageProcessResult {
Success(BytesMut),
Expand Down Expand Up @@ -345,7 +349,7 @@ pub async fn control_message_handler(
#[cfg(test)]
pub(crate) mod test_helper_fn {
use crate::modules::{
control_message_handler::{control_message_handler, MessageProcessResult},
message_handlers::control_message::{control_message_handler, MessageProcessResult},
moqt_client::{MOQTClient, MOQTClientStatus},
object_cache_storage::{
object_cache_storage, ObjectCacheStorageCommand, ObjectCacheStorageWrapper,
Expand Down Expand Up @@ -421,7 +425,7 @@ pub(crate) mod test_helper_fn {
#[cfg(test)]
mod success {
use crate::modules::{
control_message_handler::{test_helper_fn, MessageProcessResult},
message_handlers::control_message::{test_helper_fn, MessageProcessResult},
moqt_client::MOQTClientStatus,
};
use moqt_core::control_message_type::ControlMessageType;
Expand Down Expand Up @@ -484,7 +488,7 @@ mod success {
#[cfg(test)]
mod failure {
use crate::modules::{
control_message_handler::{test_helper_fn, MessageProcessResult},
message_handlers::control_message::{test_helper_fn, MessageProcessResult},
moqt_client::MOQTClientStatus,
};
use moqt_core::{constants::TerminationErrorCode, control_message_type::ControlMessageType};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
pub(crate) mod announce_handler;
pub(crate) mod announce_ok_handler;
pub(crate) mod server_setup_handler;
pub(crate) mod stream_subgroup_header_handler;
pub(crate) mod stream_track_header_handler;
pub(crate) mod subscribe_error_handler;
pub(crate) mod subscribe_handler;
pub(crate) mod subscribe_namespace_handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub(crate) async fn announce_handler(
mod success {
use std::sync::Arc;

use crate::modules::handlers::announce_handler::announce_handler;
use super::announce_handler;
use crate::modules::moqt_client::MOQTClient;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down Expand Up @@ -289,7 +289,7 @@ mod success {
mod failure {
use std::sync::Arc;

use crate::modules::handlers::announce_handler::announce_handler;
use super::announce_handler;
use crate::modules::moqt_client::MOQTClient;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) async fn announce_ok_handler(

#[cfg(test)]
mod success {
use crate::modules::handlers::announce_ok_handler::announce_ok_handler;
use super::announce_ok_handler;
use crate::modules::moqt_client::MOQTClient;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ mod success {
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
};
use crate::{constants, modules::handlers::server_setup_handler::setup_handler};
use crate::{
constants,
modules::message_handlers::control_message::handlers::server_setup_handler::setup_handler,
};
use moqt_core::messages::control_messages::{
client_setup::ClientSetup,
setup_parameters::{Path, Role, RoleCase, SetupParameter},
Expand Down Expand Up @@ -226,12 +229,13 @@ mod success {

#[cfg(test)]
mod failure {
use super::setup_handler;
use crate::constants;
use crate::modules::moqt_client::MOQTClient;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
};
use crate::{constants, modules::handlers::server_setup_handler::setup_handler};
use moqt_core::messages::control_messages::{
client_setup::ClientSetup,
setup_parameters::{Path, Role, RoleCase, SetupParameter},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn delete_downstream_and_upstream_subscription(

#[cfg(test)]
mod success {
use crate::modules::handlers::subscribe_error_handler::subscribe_error_handler;
use super::subscribe_error_handler;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
Expand All @@ -124,13 +124,15 @@ mod success {
send_stream_dispatcher, SendStreamDispatchCommand, SendStreamDispatcher,
};
use crate::MOQTClient;
use moqt_core::constants::StreamDirection;
use moqt_core::messages::control_messages::subscribe::{FilterType, GroupOrder};
use moqt_core::messages::{
control_messages::subscribe_error::{SubscribeError, SubscribeErrorCode},
moqt_payload::MOQTPayload,
};
use moqt_core::pubsub_relation_manager_repository::PubSubRelationManagerRepository;
use moqt_core::{
constants::StreamDirection,
messages::{
control_messages::subscribe::{FilterType, GroupOrder},
control_messages::subscribe_error::{SubscribeError, SubscribeErrorCode},
moqt_payload::MOQTPayload,
},
};
use std::sync::Arc;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -255,7 +257,7 @@ mod success {

#[cfg(test)]
mod failure {
use crate::modules::handlers::subscribe_error_handler::subscribe_error_handler;
use super::subscribe_error_handler;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
Expand All @@ -264,11 +266,13 @@ mod failure {
send_stream_dispatcher, SendStreamDispatchCommand, SendStreamDispatcher,
};
use crate::MOQTClient;
use moqt_core::messages::control_messages::subscribe::{FilterType, GroupOrder};
use moqt_core::messages::control_messages::subscribe_error::{
SubscribeError, SubscribeErrorCode,
use moqt_core::{
messages::control_messages::{
subscribe::{FilterType, GroupOrder},
subscribe_error::{SubscribeError, SubscribeErrorCode},
},
pubsub_relation_manager_repository::PubSubRelationManagerRepository,
};
use moqt_core::pubsub_relation_manager_repository::PubSubRelationManagerRepository;
use tokio::sync::mpsc;

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub(crate) async fn subscribe_handler(
Ok(None)
}

// TODO: Check if “publisher not found” should turn into closing connection
// TODO: Check if “publisher not found” should turn into closing session
None => {
let reason_phrase = "publisher not found".to_string();
let subscribe_error = SubscribeError::new(
Expand Down Expand Up @@ -507,8 +507,8 @@ async fn set_downstream_and_upstream_subscription(

#[cfg(test)]
mod success {
use super::subscribe_handler;
use crate::modules::{
handlers::subscribe_handler::subscribe_handler,
moqt_client::MOQTClient,
object_cache_storage::{
object_cache_storage, CacheHeader, CacheObject, ObjectCacheStorageCommand,
Expand Down Expand Up @@ -985,8 +985,8 @@ mod success {

#[cfg(test)]
mod failure {
use super::subscribe_handler;
use crate::modules::{
handlers::subscribe_handler::subscribe_handler,
moqt_client::MOQTClient,
object_cache_storage::{
object_cache_storage, ObjectCacheStorageCommand, ObjectCacheStorageWrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ pub(crate) async fn subscribe_namespace_handler(

#[cfg(test)]
mod success {
use super::subscribe_namespace_handler;
use crate::modules::{
handlers::subscribe_namespace_handler::subscribe_namespace_handler,
moqt_client::MOQTClient,
pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down Expand Up @@ -199,8 +199,8 @@ mod success {

#[cfg(test)]
mod failure {
use super::subscribe_namespace_handler;
use crate::modules::{
handlers::subscribe_namespace_handler::subscribe_namespace_handler,
moqt_client::MOQTClient,
pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) async fn subscribe_ok_handler(

#[cfg(test)]
mod success {
use crate::modules::handlers::subscribe_ok_handler::subscribe_ok_handler;
use super::subscribe_ok_handler;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
Expand Down Expand Up @@ -240,7 +240,7 @@ mod success {

#[cfg(test)]
mod failure {
use crate::modules::handlers::subscribe_ok_handler::subscribe_ok_handler;
use crate::modules::message_handlers::control_message::handlers::subscribe_ok_handler::subscribe_ok_handler;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ pub(crate) mod announce_error_message;
pub(crate) mod announce_message;
pub(crate) mod announce_ok_message;
pub(crate) mod client_setup_message;
pub(crate) mod stream_track_header;
pub(crate) mod stream_track_subgroup;
pub(crate) mod subscribe_error_message;
pub(crate) mod subscribe_message;
pub(crate) mod subscribe_namespace_message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use moqt_core::{
send_stream_dispatcher_repository::SendStreamDispatcherRepository,
};

use crate::modules::{handlers::announce_handler::announce_handler, moqt_client::MOQTClient};
use crate::modules::{
message_handlers::control_message::handlers::announce_handler::announce_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_announce_message(
payload_buf: &mut BytesMut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use moqt_core::{
pubsub_relation_manager_repository::PubSubRelationManagerRepository,
};

use crate::modules::{handlers::announce_ok_handler::announce_ok_handler, moqt_client::MOQTClient};
use crate::modules::{
message_handlers::control_message::handlers::announce_ok_handler::announce_ok_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_announce_ok_message(
payload_buf: &mut BytesMut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use moqt_core::{
pubsub_relation_manager_repository::PubSubRelationManagerRepository,
};

use crate::modules::{handlers::server_setup_handler::setup_handler, moqt_client::MOQTClient};
use crate::modules::{
message_handlers::control_message::handlers::server_setup_handler::setup_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_client_setup_message(
payload_buf: &mut BytesMut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use moqt_core::{
};

use crate::modules::{
handlers::subscribe_error_handler::subscribe_error_handler, moqt_client::MOQTClient,
message_handlers::control_message::handlers::subscribe_error_handler::subscribe_error_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_subscribe_error_message(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::modules::moqt_client::MOQTClient;
use crate::modules::{
handlers::subscribe_handler::subscribe_handler, object_cache_storage::ObjectCacheStorageWrapper,
message_handlers::control_message::handlers::subscribe_handler::subscribe_handler,
object_cache_storage::ObjectCacheStorageWrapper,
};
use crate::SenderToOpenSubscription;
use anyhow::{bail, Result};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use moqt_core::{
};

use crate::modules::{
handlers::subscribe_namespace_handler::subscribe_namespace_handler, moqt_client::MOQTClient,
message_handlers::control_message::handlers::subscribe_namespace_handler::subscribe_namespace_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_subscribe_namespace_message(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use moqt_core::{
};

use crate::modules::{
handlers::subscribe_ok_handler::subscribe_ok_handler, moqt_client::MOQTClient,
message_handlers::control_message::handlers::subscribe_ok_handler::subscribe_ok_handler,
moqt_client::MOQTClient,
};

pub(crate) async fn process_subscribe_ok_message(
Expand Down
Loading

0 comments on commit ed11b42

Please sign in to comment.