Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ANNOUNCE_OK and ANNOUNCE_ERROR handler #116

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl AnnounceOk {
pub fn new(track_namespace: Vec<String>) -> Self {
Self { track_namespace }
}

pub fn track_namespace(&self) -> &Vec<String> {
&self.track_namespace
}
}

impl MOQTPayload for AnnounceOk {
Expand Down
36 changes: 34 additions & 2 deletions moqt-server/src/modules/control_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use crate::modules::handlers::{
announce_handler::AnnounceResponse, unannounce_handler::unannounce_handler,
};
use crate::modules::server_processes::announce_error_message::process_announce_error_message;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use grouping

use crate::modules::server_processes::announce_ok_message::process_announce_ok_message;
use crate::modules::server_processes::subscribe_namespace_message::process_subscribe_namespace_message;
use crate::modules::server_processes::{
announce_message::process_announce_message, client_setup_message::process_client_setup_message,
Expand Down Expand Up @@ -214,8 +216,38 @@
}
}
}
// ControlMessageType::AnnounceOk => {}
// ControlMessageType::AnnounceError => {}
ControlMessageType::AnnounceOk => {
match process_announce_ok_message(
&mut payload_buf,

Check warning on line 221 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L220-L221

Added lines #L220 - L221 were not covered by tests
client,
pubsub_relation_manager_repository,
)
.await

Check warning on line 225 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L225

Added line #L225 was not covered by tests
{
Ok(_) => {
return MessageProcessResult::SuccessWithoutResponse;

Check warning on line 228 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L228

Added line #L228 was not covered by tests
}
Err(err) => {
return MessageProcessResult::Failure(
TerminationErrorCode::InternalError,
err.to_string(),

Check warning on line 233 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L230-L233

Added lines #L230 - L233 were not covered by tests
);
}
}
}
ControlMessageType::AnnounceError => {
match process_announce_error_message(&mut payload_buf).await {

Check warning on line 239 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L239

Added line #L239 was not covered by tests
Ok(_) => {
return MessageProcessResult::SuccessWithoutResponse;

Check warning on line 241 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L241

Added line #L241 was not covered by tests
}
Err(err) => {
return MessageProcessResult::Failure(
TerminationErrorCode::InternalError,
err.to_string(),

Check warning on line 246 in moqt-server/src/modules/control_message_handler.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/control_message_handler.rs#L243-L246

Added lines #L243 - L246 were not covered by tests
);
}
}
}
ControlMessageType::SubscribeNamespace => {
match process_subscribe_namespace_message(
&mut payload_buf,
Expand Down
1 change: 1 addition & 0 deletions moqt-server/src/modules/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod announce_handler;
pub(crate) mod announce_ok_handler;
pub(crate) mod server_setup_handler;
pub(crate) mod stream_track_header_handler;
pub(crate) mod subscribe_handler;
Expand Down
71 changes: 71 additions & 0 deletions moqt-server/src/modules/handlers/announce_ok_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use anyhow::Result;
use moqt_core::pubsub_relation_manager_repository::PubSubRelationManagerRepository;
use moqt_core::{messages::control_messages::announce_ok::AnnounceOk, MOQTClient};

pub(crate) async fn announce_ok_handler(
announce_ok_message: AnnounceOk,
client: &mut MOQTClient,
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
) -> Result<()> {
tracing::trace!("announce_ok_handler start.");
tracing::debug!("announce_ok_message: {:#?}", announce_ok_message);

// Record the announced Track Namespace
pubsub_relation_manager_repository
.set_downstream_announced_namespace(
announce_ok_message.track_namespace().clone(),
client.id,
)
.await?;

Ok(())
}

#[cfg(test)]
mod success {
use crate::modules::handlers::announce_ok_handler::announce_ok_handler;
use crate::modules::pubsub_relation_manager::{
commands::PubSubRelationCommand, manager::pubsub_relation_manager,
wrapper::PubSubRelationManagerWrapper,
};
use moqt_core::messages::moqt_payload::MOQTPayload;
use moqt_core::pubsub_relation_manager_repository::PubSubRelationManagerRepository;
use moqt_core::{messages::control_messages::announce_ok::AnnounceOk, moqt_client::MOQTClient};
use tokio::sync::mpsc;

#[tokio::test]
async fn normal_case() {
// Generate ANNOUNCE message
let track_namespace = Vec::from(["test".to_string(), "test".to_string()]);
let announce_ok_message = AnnounceOk::new(track_namespace.clone());
let mut buf = bytes::BytesMut::new();
announce_ok_message.packetize(&mut buf);

// Generate client
let downstream_session_id = 0;
let mut client = MOQTClient::new(downstream_session_id);

// Generate PubSubRelationManagerWrapper
let (track_namespace_tx, mut track_namespace_rx) =
mpsc::channel::<PubSubRelationCommand>(1024);
tokio::spawn(async move { pubsub_relation_manager(&mut track_namespace_rx).await });
let mut pubsub_relation_manager: PubSubRelationManagerWrapper =
PubSubRelationManagerWrapper::new(track_namespace_tx);

let max_subscribe_id = 10;

let _ = pubsub_relation_manager
.setup_subscriber(max_subscribe_id, downstream_session_id)
.await;

// Execute announce_ok_handler and get result
let result = announce_ok_handler(
announce_ok_message,
&mut client,
&mut pubsub_relation_manager,
)
.await;

assert!(result.is_ok());
}
}
2 changes: 2 additions & 0 deletions moqt-server/src/modules/server_processes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub(crate) mod announce_error_message;
pub(crate) mod announce_message;
pub(crate) mod announce_ok_message;
pub(crate) mod client_setup_message;
pub(crate) mod stream_track_header;
pub(crate) mod subscribe_message;
Expand Down
19 changes: 19 additions & 0 deletions moqt-server/src/modules/server_processes/announce_error_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use anyhow::{bail, Result};
use bytes::BytesMut;
use moqt_core::messages::{
control_messages::announce_error::AnnounceError, moqt_payload::MOQTPayload,
};

pub(crate) async fn process_announce_error_message(payload_buf: &mut BytesMut) -> Result<()> {
let announce_error_message = match AnnounceError::depacketize(payload_buf) {
Ok(announce_error_message) => announce_error_message,
Err(err) => {
tracing::error!("{:#?}", err);
bail!(err.to_string());

Check warning on line 12 in moqt-server/src/modules/server_processes/announce_error_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_error_message.rs#L7-L12

Added lines #L7 - L12 were not covered by tests
}
};

tracing::warn!("announce_error_message: {:#?}", announce_error_message);

Check warning on line 16 in moqt-server/src/modules/server_processes/announce_error_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_error_message.rs#L16

Added line #L16 was not covered by tests

Ok(())

Check warning on line 18 in moqt-server/src/modules/server_processes/announce_error_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_error_message.rs#L18

Added line #L18 was not covered by tests
}
29 changes: 29 additions & 0 deletions moqt-server/src/modules/server_processes/announce_ok_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::modules::handlers::announce_ok_handler::announce_ok_handler;
use anyhow::{bail, Result};
use bytes::BytesMut;
use moqt_core::pubsub_relation_manager_repository::PubSubRelationManagerRepository;
use moqt_core::{
messages::{control_messages::announce_ok::AnnounceOk, moqt_payload::MOQTPayload},
MOQTClient,
};

pub(crate) async fn process_announce_ok_message(

Check warning on line 10 in moqt-server/src/modules/server_processes/announce_ok_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_ok_message.rs#L10

Added line #L10 was not covered by tests
payload_buf: &mut BytesMut,
client: &mut MOQTClient,
pubsub_relation_manager_repository: &mut dyn PubSubRelationManagerRepository,
) -> Result<()> {
let announce_ok_message = match AnnounceOk::depacketize(payload_buf) {
Ok(announce_ok_message) => announce_ok_message,
Err(err) => {
tracing::error!("{:#?}", err);
bail!(err.to_string());

Check warning on line 19 in moqt-server/src/modules/server_processes/announce_ok_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_ok_message.rs#L15-L19

Added lines #L15 - L19 were not covered by tests
}
};

announce_ok_handler(
announce_ok_message,
client,
pubsub_relation_manager_repository,
)
.await

Check warning on line 28 in moqt-server/src/modules/server_processes/announce_ok_message.rs

View check run for this annotation

Codecov / codecov/patch

moqt-server/src/modules/server_processes/announce_ok_message.rs#L28

Added line #L28 was not covered by tests
}