diff --git a/Cargo.lock b/Cargo.lock index 95abbae3..1c8bd2af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,24 +471,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" -[[package]] -name = "bytesio" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff19234db41a12c2dedbfda4b113b7e4bc39a3dba3ce29ff0ce258cad0a54129" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "failure", - "futures", - "log", - "rand 0.3.23", - "tokio", - "tokio-stream", - "tokio-util 0.6.10", -] - [[package]] name = "bytesio" version = "0.3.3" @@ -1357,20 +1339,7 @@ version = "0.2.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.3", - "failure", - "log", -] - -[[package]] -name = "h264-decoder" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8f343949d0d608d23ba6ebf2d79337ef188749bae7143c187da2851e80bc171" -dependencies = [ - "byteorder", - "bytes", - "bytesio 0.3.1", + "bytesio", "failure", "log", ] @@ -1427,10 +1396,10 @@ dependencies = [ "commonlib", "failure", "log", - "streamhub 0.1.1", + "streamhub", "tokio", "tokio-util 0.6.10", - "xflv 0.3.0", + "xflv", "xmpegts", ] @@ -1552,9 +1521,9 @@ dependencies = [ "failure", "futures", "log", - "streamhub 0.1.1", + "streamhub", "tokio", - "xflv 0.3.0", + "xflv", ] [[package]] @@ -2231,8 +2200,8 @@ dependencies = [ "clap", "env_logger", "log", - "rtmp 0.4.0", - "streamhub 0.1.0", + "rtmp", + "streamhub", "tokio", ] @@ -2474,32 +2443,6 @@ dependencies = [ "webrtc-util", ] -[[package]] -name = "rtmp" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93e619cfc092d254ad1b5f08f99b88af0721a10ab257822109adc4f7a16e0db1" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "bytesio 0.3.1", - "chrono", - "failure", - "h264-decoder 0.2.1", - "hmac 0.11.0", - "indexmap 1.9.3", - "log", - "rand 0.3.23", - "reqwest", - "serde", - "serde_json", - "sha2 0.9.9", - "streamhub 0.1.0", - "tokio", - "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "rtmp" version = "0.4.1" @@ -2507,11 +2450,11 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "chrono", "commonlib", "failure", - "h264-decoder 0.2.0", + "h264-decoder", "hex", "hmac 0.11.0", "indexmap 1.9.3", @@ -2520,9 +2463,9 @@ dependencies = [ "serde", "serde_json", "sha2 0.9.9", - "streamhub 0.1.1", + "streamhub", "tokio", - "xflv 0.3.0", + "xflv", ] [[package]] @@ -2862,28 +2805,6 @@ dependencies = [ "der", ] -[[package]] -name = "streamhub" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2ed21d266e3a637f461a706bb1bfa6eb01b7acf57c1b42f8c0aa5bb05f00d71" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "bytesio 0.3.1", - "chrono", - "failure", - "indexmap 1.9.3", - "log", - "rand 0.8.5", - "reqwest", - "serde", - "serde_json", - "tokio", - "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "streamhub" version = "0.1.1" @@ -2891,7 +2812,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "chrono", "failure", "indexmap 1.9.3", @@ -2901,7 +2822,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "xflv 0.3.0", + "xflv", ] [[package]] @@ -3958,29 +3879,14 @@ version = "0.3.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "failure", - "h264-decoder 0.2.0", + "h264-decoder", "indexmap 1.9.3", "log", "serde", ] -[[package]] -name = "xflv" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99aa4657821f6191a4640e7f7566f8aece2efd6269845403b520dd26266a7577" -dependencies = [ - "byteorder", - "bytes", - "bytesio 0.3.1", - "failure", - "h264-decoder 0.2.1", - "log", - "serde", -] - [[package]] name = "xiu" version = "0.9.0" @@ -3995,11 +3901,11 @@ dependencies = [ "httpflv", "libc", "log", - "rtmp 0.4.1", + "rtmp", "serde", "serde_derive", "serde_json", - "streamhub 0.1.1", + "streamhub", "tokio", "tokio-metrics", "toml", @@ -4013,7 +3919,7 @@ version = "0.2.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "failure", ] @@ -4025,7 +3931,7 @@ dependencies = [ "base64 0.21.2", "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "chrono", "commonlib", "failure", @@ -4036,7 +3942,7 @@ dependencies = [ "log", "rand 0.8.5", "serde_json", - "streamhub 0.1.1", + "streamhub", "tokio", ] @@ -4048,17 +3954,17 @@ dependencies = [ "audiopus", "byteorder", "bytes", - "bytesio 0.3.3", + "bytesio", "commonlib", "failure", "fdk-aac", "http 0.2.9", "indexmap 1.9.3", "log", - "streamhub 0.1.1", + "streamhub", "tokio", "webrtc", - "xflv 0.3.0", + "xflv", ] [[package]] diff --git a/application/pprtmp/CHANGELOG.md b/application/pprtmp/CHANGELOG.md index 0b934067..0856d134 100644 --- a/application/pprtmp/CHANGELOG.md +++ b/application/pprtmp/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.1.4] - 2021-08-11 +- Reference new RTMP and streamhub versions. + ## [0.1.3] - 2021-05-18 - Reference new RTMP and streamhub versions. diff --git a/application/pprtmp/Cargo.toml b/application/pprtmp/Cargo.toml index 22f2e578..8d449701 100644 --- a/application/pprtmp/Cargo.toml +++ b/application/pprtmp/Cargo.toml @@ -16,8 +16,9 @@ log = "0.4.0" env_logger = "0.10.0" clap = "4.1.4" -rtmp = "0.4.0" -streamhub = "0.1.0" +streamhub = { path = "../../library/streamhub/" } +rtmp = { path = "../../protocol/rtmp/" } + [dependencies.tokio] version = "1.26.0" diff --git a/application/pprtmp/src/main.rs b/application/pprtmp/src/main.rs index d3e3312b..04c03855 100644 --- a/application/pprtmp/src/main.rs +++ b/application/pprtmp/src/main.rs @@ -2,7 +2,7 @@ use { anyhow::Result, clap::{value_parser, Arg, Command}, rtmp::session::client_session::ClientSession, - rtmp::session::client_session::ClientType, + rtmp::session::client_session::ClientSessionType, rtmp::utils::RtmpUrlParser, std::env, std::process::exit, @@ -59,13 +59,13 @@ async fn main() -> Result<()> { log::error!("err: {}", err); } pull_parser.append_port(String::from("1935")); - let stream1 = TcpStream::connect(pull_parser.raw_domain_name.clone()).await?; + let stream1 = TcpStream::connect(pull_parser.host_with_port.clone()).await?; let mut pull_client_session = ClientSession::new( stream1, - ClientType::Play, - pull_parser.raw_domain_name, + ClientSessionType::Pull, + pull_parser.host_with_port, pull_parser.app_name.clone(), - pull_parser.raw_stream_name, + pull_parser.stream_name_with_query, producer.clone(), 0, ); @@ -83,13 +83,13 @@ async fn main() -> Result<()> { } push_parser.append_port(String::from("1935")); // push the rtmp stream from local to remote rtmp server - let stream2 = TcpStream::connect(push_parser.raw_domain_name.clone()).await?; + let stream2 = TcpStream::connect(push_parser.host_with_port.clone()).await?; let mut push_client_session = ClientSession::new( stream2, - ClientType::Publish, - push_parser.raw_domain_name, + ClientSessionType::Push, + push_parser.host_with_port, push_parser.app_name, - push_parser.raw_stream_name, + push_parser.stream_name_with_query, producer.clone(), 0, ); diff --git a/application/xiu/CHANGELOG.md b/application/xiu/CHANGELOG.md index 7a691ac2..ee36f2af 100644 --- a/application/xiu/CHANGELOG.md +++ b/application/xiu/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.13.0] - 2021-08-11 +- Feat: Abstract streamhub message notifications. by @karaler +- Feat: Add optional push_password for simple authentication. by @Ceron257 +- Feat: Implement Authorization header support for WebRTC. by @Ceron257 +- Fix: XIU cannot receive RTSP stream published by gstreamer #135 +- Feat: Support Rtsp pull client logic, and you can pull a remote RTSP stream using HTTP API to XIU now. + ## [0.12.7] - 2021-05-18 - Fix: RTMP publish single AAC from ffmpeg client. by @suzp1984 - Fix: RTMP Auth failing due to empty string query string in packet. by @radiohertz diff --git a/application/xiu/src/api.rs b/application/xiu/src/api.rs index de31deed..ce4abebf 100644 --- a/application/xiu/src/api.rs +++ b/application/xiu/src/api.rs @@ -9,7 +9,7 @@ use { serde_json::Value, std::sync::Arc, streamhub::{ - define::{self, StreamHubEventSender}, + define::{self, RelayType, StreamHubEventSender}, stream::StreamIdentifier, utils::Uuid, }, @@ -25,7 +25,7 @@ struct ApiResponse { // the input to our `KickOffClient` handler #[derive(Deserialize)] -struct KickOffClient { +struct KickOffClientParams { uuid: String, } @@ -36,12 +36,21 @@ struct QueryWholeStreamsParams { } #[derive(Deserialize)] -struct QueryStream { +struct QueryStreamParams { identifier: StreamIdentifier, // if specify uuid, then query the stream by uuid and filter no used data. uuid: Option, } +#[derive(Deserialize)] +struct RelayStreamParams { + //guaranteed by the user to be unique + id: String, + identifier: Option, + server_address: Option, + relay_type: RelayType, +} + #[derive(Clone)] struct ApiService { channel_event_producer: StreamHubEventSender, @@ -93,7 +102,7 @@ impl ApiService { } } - async fn query_stream(&self, stream: QueryStream) -> Json> { + async fn query_stream(&self, stream: QueryStreamParams) -> Json> { let uuid = if let Some(uid) = stream.uuid { Uuid::from_str2(&uid) } else { @@ -132,7 +141,7 @@ impl ApiService { } } - async fn kick_off_client(&self, id: KickOffClient) -> Result { + async fn kick_off_client(&self, id: KickOffClientParams) -> Result { let id_result = Uuid::from_str2(&id.uuid); if let Some(id) = id_result { @@ -145,6 +154,103 @@ impl ApiService { Ok(String::from("ok")) } + + async fn start_relay_stream(&self, relay_info: RelayStreamParams) -> Json> { + if relay_info.identifier.is_none() || relay_info.server_address.is_none() { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("identifier or server_address is none"), + data: Value::Null, + }; + return Json(api_response); + } + + let (result_sender, result_receiver) = oneshot::channel(); + + let hub_event = define::StreamHubEvent::ApiStartRelayStream { + id: relay_info.id, + identifier: relay_info.identifier.unwrap(), + server_address: relay_info.server_address.unwrap(), + relay_type: relay_info.relay_type, + result_sender, + }; + + if let Err(err) = self.channel_event_producer.send(hub_event) { + log::error!("send api relay_stream event error: {}", err); + } + + match result_receiver.await { + Ok(val) => match val { + Ok(()) => { + let api_response = ApiResponse { + error_code: 0, + desp: String::from("succ"), + data: Value::Null, + }; + Json(api_response) + } + Err(err) => { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) + } + }, + Err(err) => { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) + } + } + } + + async fn stop_relay_stream(&self, relay_info: RelayStreamParams) -> Json> { + let (result_sender, result_receiver) = oneshot::channel(); + + let hub_event = define::StreamHubEvent::ApiStopRelayStream { + id: relay_info.id, + relay_type: relay_info.relay_type, + result_sender, + }; + + if let Err(err) = self.channel_event_producer.send(hub_event) { + log::error!("send api relay_stream event error: {}", err); + } + + match result_receiver.await { + Ok(val) => match val { + Ok(()) => { + let api_response = ApiResponse { + error_code: 0, + desp: String::from("succ"), + data: Value::Null, + }; + Json(api_response) + } + Err(err) => { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) + } + }, + Err(err) => { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) + } + } + } } pub async fn run(producer: StreamHubEventSender, port: usize) { @@ -161,23 +267,35 @@ pub async fn run(producer: StreamHubEventSender, port: usize) { }; let api_query_stream = api.clone(); - let query_stream = move |Json(stream): Json| async move { + let query_stream = move |Json(stream): Json| async move { api_query_stream.query_stream(stream).await }; let api_kick_off = api.clone(); - let kick_off = move |Json(id): Json| async move { + let kick_off = move |Json(id): Json| async move { match api_kick_off.kick_off_client(id).await { Ok(response) => response, Err(_) => "error".to_owned(), } }; + let api_start_relay_stream = api.clone(); + let start_relay_stream = move |Json(params): Json| async move { + api_start_relay_stream.start_relay_stream(params).await + }; + + let api_stop_relay_stream = api.clone(); + let stop_relay_stream = move |Json(params): Json| async move { + api_stop_relay_stream.stop_relay_stream(params).await + }; + let app = Router::new() .route("/", get(root)) .route("/api/query_whole_streams", get(query_streams)) .route("/api/query_stream", post(query_stream)) - .route("/api/kick_off_client", post(kick_off)); + .route("/api/kick_off_client", post(kick_off)) + .route("/api/start_relay_stream", post(start_relay_stream)) + .route("/api/stop_relay_stream", post(stop_relay_stream)); log::info!("Http api server listening on http://0.0.0.0:{}", port); axum::Server::bind(&([0, 0, 0, 0], port as u16).into()) diff --git a/application/xiu/src/config/mod.rs b/application/xiu/src/config/mod.rs index 13d2a0ff..e8bd24bd 100644 --- a/application/xiu/src/config/mod.rs +++ b/application/xiu/src/config/mod.rs @@ -44,6 +44,7 @@ impl Config { if rtsp_port > 0 { rtsp_config = Some(RtspConfig { enabled: true, + relay_enabled: false, port: rtsp_port, auth: None, }); @@ -123,6 +124,7 @@ pub struct RtspConfig { pub enabled: bool, pub port: usize, pub auth: Option, + pub relay_enabled: bool, } #[derive(Debug, Deserialize, Clone)] @@ -187,7 +189,7 @@ pub struct HttpNotifierConfig { pub struct AuthSecretConfig { pub key: String, pub password: String, - pub push_password: Option + pub push_password: Option, } #[derive(Debug, Deserialize, Clone, Default)] @@ -211,7 +213,7 @@ fn test_toml_parse() { Err(err) => println!("{}", err), } - let str = fs::read_to_string("./src/config/config.toml"); + let str = fs::read_to_string("./src/config/examples/config1.toml"); match str { Ok(val) => { diff --git a/application/xiu/src/main.rs b/application/xiu/src/main.rs index 91df0b99..78aae8c1 100644 --- a/application/xiu/src/main.rs +++ b/application/xiu/src/main.rs @@ -15,7 +15,7 @@ async fn main() -> Result<()> { let mut cmd = Command::new("XIU") .bin_name("xiu") - .version("0.12.7") + .version("0.13.0") .author("HarlanC ") .about("A secure and easy to use live media server, hope you love it!!!") .arg( diff --git a/application/xiu/src/service.rs b/application/xiu/src/service.rs index 0b084770..3e3f9cc3 100644 --- a/application/xiu/src/service.rs +++ b/application/xiu/src/service.rs @@ -1,7 +1,8 @@ +use crate::config::{AuthConfig, AuthSecretConfig}; use commonlib::auth::AuthType; use rtmp::remuxer::RtmpRemuxer; use std::sync::Arc; -use crate::config::{AuthConfig, AuthSecretConfig}; +use xrtsp::relay::pull_client_manager::RtspPullClientManager; use { super::api, @@ -16,7 +17,7 @@ use { relay::{pull_client::PullClient, push_client::PushClient}, rtmp::RtmpServer, }, - streamhub::{notify::Notifier, notify::http::HttpNotifier, StreamsHub}, + streamhub::{notify::http::HttpNotifier, notify::Notifier, StreamsHub}, tokio, xrtsp::rtsp::RtspServer, xwebrtc::webrtc::WebRTCServer, @@ -255,6 +256,19 @@ impl Service { log::error!("rtsp server error: {}", err); } }); + + if rtsp_cfg_value.relay_enabled { + let mut rtsp_relay_manager = RtspPullClientManager::new( + stream_hub.get_client_event_consumer(), + stream_hub.get_hub_event_sender(), + ); + + tokio::spawn(async move { + if let Err(err) = rtsp_relay_manager.run().await { + log::error!("rtsp relay manager error: {}", err); + } + }); + } } Ok(()) diff --git a/confs/local/hls.Cargo.toml b/confs/local/hls.Cargo.toml index 9acdd6c7..2ed64950 100644 --- a/confs/local/hls.Cargo.toml +++ b/confs/local/hls.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hls" description = "hls library." -version = "0.4.1" +version = "0.4.2" authors = ["HarlanC "] edition = "2018" description = "a common library for xiu project." diff --git a/confs/online/flv.Cargo.toml b/confs/online/flv.Cargo.toml index 02239dfa..2eb1fb41 100644 --- a/confs/online/flv.Cargo.toml +++ b/confs/online/flv.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "xflv" description = "flv library." -version = "0.4.3" +version = "0.4.4" authors = ["HarlanC "] description = "a h264 decoder" diff --git a/confs/online/hls.Cargo.toml b/confs/online/hls.Cargo.toml index 3d666363..b8e3f2b2 100644 --- a/confs/online/hls.Cargo.toml +++ b/confs/online/hls.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hls" description = "hls library." -version = "0.5.4" +version = "0.5.5" authors = ["HarlanC "] @@ -30,5 +30,5 @@ serde_json = { version = "1", default-features = false, features = [ ] } bytesio = "0.3.3" -streamhub = "0.2.3" -commonlib = "0.1.1" +streamhub = "0.2.4" +commonlib = "0.1.2" diff --git a/confs/online/streamhub.Cargo.toml b/confs/online/streamhub.Cargo.toml index 89660f83..7ee13bce 100644 --- a/confs/online/streamhub.Cargo.toml +++ b/confs/online/streamhub.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "streamhub" description = "It receives streams from publishers(rtmp/rtsp etc.) and send streams to subscribers(rtmp/rtsp/httpflv/hls)" -version = "0.2.3" +version = "0.2.4" edition = "2021" authors = ["HarlanC "] license = "MIT" @@ -27,7 +27,7 @@ serde_json = { version = "1", default-features = false, features = [ ] } serde = { version = "1.0", features = ["derive", "rc"] } -xflv = "0.4.3" +xflv = "0.4.4" bytesio = "0.3.3" [dependencies.tokio] diff --git a/confs/online/webrtc.Cargo.toml b/confs/online/webrtc.Cargo.toml index 5aa19f3c..4520aa2d 100644 --- a/confs/online/webrtc.Cargo.toml +++ b/confs/online/webrtc.Cargo.toml @@ -1,6 +1,6 @@ [package] name = "xwebrtc" -version = "0.3.4" +version = "0.3.5" description = "A whip/whep library." edition = "2021" authors = ["HarlanC "] @@ -23,6 +23,6 @@ fdk-aac = "0.6.0" audiopus = "0.3.0-rc.0" bytesio = "0.3.3" -streamhub = "0.2.3" -xflv = "0.4.3" -commonlib = "0.1.1" +streamhub = "0.2.4" +xflv = "0.4.4" +commonlib = "0.1.2" diff --git a/confs/online/xiu.Cargo.toml b/confs/online/xiu.Cargo.toml index ccb1909f..3c04c496 100644 --- a/confs/online/xiu.Cargo.toml +++ b/confs/online/xiu.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "xiu" description = "A powerful live server by Rust ." -version = "0.12.7" +version = "0.13.0" authors = ["HarlanC Option { + let local_address = format!("0.0.0.0:{local_port}"); + + if let Ok(local_socket) = UdpSocket::bind(local_address).await { + return Some(Self { + socket: local_socket, + }); + } + None + } + pub fn get_local_port(&self) -> Option { if let Ok(local_addr) = self.socket.local_addr() { log::info!("local address: {}", local_addr); @@ -66,6 +78,60 @@ impl UdpIO { } } +pub async fn new_udpio_pair() -> Option<(UdpIO, UdpIO)> { + let mut next_local_port = 0; + let first_local_port; + + // get the first available port + if let Some(udpio_0) = UdpIO::new_with_local_port(next_local_port).await { + if let Some(local_port_0) = udpio_0.get_local_port() { + first_local_port = local_port_0; + } else { + log::error!("cannot get local port"); + return None; + } + + if first_local_port == 65535 { + next_local_port = 1; + } else if let Some(udpio_1) = UdpIO::new_with_local_port(first_local_port + 1).await { + return Some((udpio_0, udpio_1)); + } else if first_local_port + 1 == 65535 { + next_local_port = 1; + } else { + next_local_port = first_local_port + 2; + } + } else { + return None; + } + + loop { + log::trace!("next local port: {next_local_port} and first port: {first_local_port}"); + + if next_local_port == 65535 { + next_local_port = 1; + continue; + } + + if next_local_port == first_local_port { + return None; + } + + if let Some(udpio_0) = UdpIO::new_with_local_port(next_local_port).await { + if let Some(udpio_1) = UdpIO::new_with_local_port(next_local_port + 1).await { + return Some((udpio_0, udpio_1)); + } else if next_local_port + 1 == 65535 { + next_local_port = 1; + } else { + next_local_port += 2; + } + } else { + // try next port + next_local_port += 1; + } + } + //None +} + #[async_trait] impl TNetIO for UdpIO { fn get_net_type(&self) -> NetType { @@ -147,3 +213,73 @@ impl TNetIO for TcpIO { } } } + +#[cfg(test)] +mod tests { + + use super::new_udpio_pair; + use super::UdpIO; + + use tokio; + + #[tokio::test] + async fn test_new_udpio_pair() { + if let Some((udpio1, udpid2)) = new_udpio_pair().await { + println!( + "{:?} == {:?}", + udpio1.get_local_port(), + udpid2.get_local_port() + ); + } + } + + #[tokio::test] + async fn test_new_udpio_pair2() { + println!("test_new_udpio_pair2 begin..."); + let mut socket: Vec = Vec::new(); + + for i in 1..=65535 { + println!("cur port:== {}", i); + //if i % 2 == 1 { + println!("cur port: {}", i); + if let Some(udpio) = UdpIO::new_with_local_port(i).await { + socket.push(udpio) + } else { + println!("new local port fail: {}", i); + } + //} + } + + println!("socket size: {}", socket.len()); + + if let Some((udpio1, udpid2)) = new_udpio_pair().await { + println!( + "{:?} == {:?}", + udpio1.get_local_port(), + udpid2.get_local_port() + ); + } + } + + #[tokio::test] + async fn test_new_udpio_pair3() { + // get the first available port + + let mut first_local_port = 0; + if let Some(udpio_0) = UdpIO::new_with_local_port(0).await { + if let Some(local_port_0) = udpio_0.get_local_port() { + first_local_port = local_port_0; + } + + // std::mem::drop(udpio_0); + } + //The object udpio_0 is automatically cleared and released when it goes out of scope here. + println!("first_local_port: {}", first_local_port); + + if (UdpIO::new_with_local_port(first_local_port).await).is_some() { + println!("success") + } else { + println!("fail") + } + } +} diff --git a/library/codec/h264/CHANGELOG.md b/library/codec/h264/CHANGELOG.md index 2da45933..9711ca41 100644 --- a/library/codec/h264/CHANGELOG.md +++ b/library/codec/h264/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.2.4] - 2021-08-11 +- Reference bytesio v0.3.4. + ## [0.2.3] - 2021-05-18 - Reference bytesio v0.3.3. diff --git a/library/common/CHANGELOG.md b/library/common/CHANGELOG.md index 3428a165..5c060961 100644 --- a/library/common/CHANGELOG.md +++ b/library/common/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.1.2] - 2021-08-11 +- Implement Authorization header support for WebRTC. +- Add get header for HttpResponse. + ## [0.1.1] - 2021-03-15 - Upgrade failure library. diff --git a/library/common/src/auth.rs b/library/common/src/auth.rs index 07296424..d205083a 100644 --- a/library/common/src/auth.rs +++ b/library/common/src/auth.rs @@ -44,14 +44,27 @@ pub fn get_secret(carrier: &SecretCarrier) -> Result { value: AuthErrorValue::InvalidTokenFormat, }); let (prefix, token) = scanf!(header, " ", String, String); - if prefix.is_none() || token.is_none() { - invalid_format - } else { - if prefix.unwrap() != "Bearer" { - invalid_format - } else { - Ok(token.unwrap()) - } + + //if prefix.is_none() || token.is_none() { + // invalid_format + //} else if prefix.unwrap() != "Bearer" { + // invalid_format + //} else { + // Ok(token.unwrap()) + //} + //fix cargo clippy --fix --allow-dirty --allow-no-vcs warnings + match token { + Some(token_val) => match prefix { + Some(prefix_val) => { + if prefix_val != "Bearer" { + invalid_format + } else { + Ok(token_val) + } + } + None => invalid_format, + }, + None => invalid_format, } } } diff --git a/library/common/src/http.rs b/library/common/src/http.rs index 33ac07f9..b46a1087 100644 --- a/library/common/src/http.rs +++ b/library/common/src/http.rs @@ -75,32 +75,31 @@ impl Unmarshal for Uri { let path_with_query = match uri.schema { Schema::RTSP => { - let rtsp_path_with_query = if let Some(rtsp_url_without_prefix) = - url.strip_prefix("rtsp://") - { - /*split host:port and path?query*/ - - if let Some(index) = rtsp_url_without_prefix.find('/') { - let path_with_query = &rtsp_url_without_prefix[index + 1..]; - /*parse host and port*/ - let host_with_port = &rtsp_url_without_prefix[..index]; - let (host_val, port_val) = scanf!(host_with_port, ':', String, u16); - if let Some(host) = host_val { - uri.host = host; - } - if let Some(port) = port_val { - uri.port = Some(port); - } + let rtsp_path_with_query = + if let Some(rtsp_url_without_prefix) = url.strip_prefix("rtsp://") { + /*split host:port and path?query*/ + + if let Some(index) = rtsp_url_without_prefix.find('/') { + let path_with_query = &rtsp_url_without_prefix[index + 1..]; + /*parse host and port*/ + let host_with_port = &rtsp_url_without_prefix[..index]; + let (host_val, port_val) = scanf!(host_with_port, ':', String, u16); + if let Some(host) = host_val { + uri.host = host; + } + if let Some(port) = port_val { + uri.port = Some(port); + } - path_with_query + path_with_query + } else { + log::error!("cannot find split '/' for host:port and path?query."); + return None; + } } else { - log::error!("cannot find split '/' for host:port and path?query."); + log::error!("cannot find RTSP prefix."); return None; - } - } else { - log::error!("cannot find RTSP prefix."); - return None; - }; + }; rtsp_path_with_query } Schema::WEBRTC => url, @@ -282,6 +281,12 @@ pub struct HttpResponse { pub body: Option, } +impl HttpResponse { + pub fn get_header(&self, header_name: &String) -> Option<&String> { + self.headers.get(header_name) + } +} + impl Unmarshal for HttpResponse { fn unmarshal(request_data: &str) -> Option { let mut http_response = HttpResponse::default(); diff --git a/library/container/flv/CHANGELOG.md b/library/container/flv/CHANGELOG.md index b3249812..cc1c841d 100644 --- a/library/container/flv/CHANGELOG.md +++ b/library/container/flv/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.4.4] - 2021-08-11 +- Reference bytesio v0.3.4. + ## [0.4.3] - 2021-05-18 - fix RTMP publish single AAC from ffmpeg client. diff --git a/library/container/mpegts/CHANGELOG.md b/library/container/mpegts/CHANGELOG.md index d83d939f..c1205609 100644 --- a/library/container/mpegts/CHANGELOG.md +++ b/library/container/mpegts/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate + +## [0.2.4] - 2021-08-11 +- Reference bytesio v0.3.4. + ## [0.2.3] - 2021-05-18 - Reference bytesio v0.3.3. diff --git a/library/streamhub/CHANGELOG.md b/library/streamhub/CHANGELOG.md index c68e3d76..1be30396 100644 --- a/library/streamhub/CHANGELOG.md +++ b/library/streamhub/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.2.4] - 2021-08-11 +- Abstract streamhub message notifications. +- Add HTTP API for RTSP pull client. +- Some refactor work. + ## [0.2.3] - 2021-05-18 - Reference bytesio v0.3.3. diff --git a/library/streamhub/src/define.rs b/library/streamhub/src/define.rs index ef00f8f0..d396377a 100644 --- a/library/streamhub/src/define.rs +++ b/library/streamhub/src/define.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Local}; +use serde::Deserialize; use serde_json::Value; use xflv::define::{AacProfile, AvcCodecId, AvcLevel, AvcProfile, SoundFormat}; @@ -19,41 +20,50 @@ use { utils::Uuid, }; +/* Subscribe streams from stream hub */ #[derive(Debug, Serialize, Clone, Eq, PartialEq)] pub enum SubscribeType { - /* Remote client request playing rtmp stream.*/ - PlayerRtmp, - /* Remote client request playing http-flv stream.*/ - PlayerHttpFlv, - /* Remote client request playing hls stream.*/ - PlayerHls, - /* Remote/local client request playing rtsp stream.*/ - PlayerRtsp, - /* Local client request playing webrtc stream, it's used for protocol remux.*/ - PlayerWebrtc, - /* Remote client request playing rtsp or webrtc(whep) raw rtp stream.*/ - PlayerRtp, - GenerateHls, - /* Local client *subscribe* from local rtmp session - and *publish* (relay push) the stream to remote server.*/ - PublisherRtmp, + /* Remote client request pulling(play) a rtmp stream.*/ + RtmpPull, + /* Remote request to play httpflv triggers remux from RTMP to httpflv. */ + RtmpRemux2HttpFlv, + /* The publishing of RTMP stream triggers remuxing from RTMP to HLS protocol.(NOTICE:It is not triggerred by players.)*/ + RtmpRemux2Hls, + /* Relay(Push) local RTMP stream from stream hub to other RTMP nodes.*/ + RtmpRelay, + /* Remote client request pulling(play) a rtsp stream.*/ + RtspPull, + /* The publishing of RTSP stream triggers remuxing from RTSP to RTMP protocol.*/ + RtspRemux2Rtmp, + /* Relay(Push) local RTSP stream to other RTSP nodes.*/ + RtspRelay, + /* Remote client request pulling(play) stream through whep.*/ + WhepPull, + /* Remuxing webrtc stream to RTMP */ + WebRTCRemux2Rtmp, + /* Relay(Push) the local webRTC stream to other nodes using Whip.*/ + WhipRelay, + /* Pull rtp stream by subscribing from stream hub.*/ + RtpPull, } -//session publish type +/* Publish streams to stream hub */ #[derive(Debug, Serialize, Clone, Eq, PartialEq)] pub enum PublishType { - /* Receive rtmp stream from remote push client */ - PushRtmp, - /* Local client *publish* the rtmp stream to local session, - the rtmp stream is *subscribed* (pull) from remote server.*/ - RelayRtmp, + /* Receive rtmp stream from remote push client. */ + RtmpPush, + /* Relay(Pull) remote RTMP stream to local stream hub. */ + RtmpRelay, /* Receive rtsp stream from remote push client */ - PushRtsp, - RelayRtsp, - /* Receive webrtc stream from remote push client(whip), */ - PushWebRTC, + RtspPush, + /* Relay(Pull) remote RTSP stream to local stream hub. */ + RtspRelay, + /* Receive whip stream from remote push client. */ + WhipPush, + /* Relay(Pull) remote WebRTC stream to local stream hub using Whep. */ + WhepRelay, /* It used for publishing raw rtp data of rtsp/whbrtc(whip) */ - PushRtp, + RtpPush, } #[derive(Debug, Serialize, Clone)] @@ -186,6 +196,9 @@ pub type PubEventExecuteResultSender = oneshot::Sender< StreamHubError, >, >; +// The trait bound `BroadcastEvent: Clone` should be satisfied, so here we cannot use oneshot. +pub type BroadcastEventExecuteResultSender = mpsc::Sender>; +pub type ApiRelayStreamResultSender = oneshot::Sender>; pub type TransceiverEventExecuteResultSender = oneshot::Sender; #[async_trait] @@ -225,7 +238,7 @@ pub enum PubDataType { Both, } -#[derive(Clone, Serialize)] +#[derive(Clone, Serialize, Debug)] pub enum StreamHubEventMessage { Subscribe { identifier: StreamIdentifier, @@ -246,6 +259,13 @@ pub enum StreamHubEventMessage { NotSupport {}, } +//we can pub frame or packet or both. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RelayType { + Pull, + Push, +} + #[derive(Serialize)] pub enum StreamHubEvent { Subscribe { @@ -279,7 +299,20 @@ pub enum StreamHubEvent { }, #[serde(skip_serializing)] ApiKickClient { id: Uuid }, - + #[serde(skip_serializing)] + ApiStartRelayStream { + id: String, + identifier: StreamIdentifier, + server_address: String, + relay_type: RelayType, + result_sender: ApiRelayStreamResultSender, + }, + #[serde(skip_serializing)] + ApiStopRelayStream { + id: String, + relay_type: RelayType, + result_sender: ApiRelayStreamResultSender, + }, #[serde(skip_serializing)] Request { identifier: StreamIdentifier, @@ -290,21 +323,34 @@ pub enum StreamHubEvent { impl StreamHubEvent { pub fn to_message(&self) -> StreamHubEventMessage { match self { - StreamHubEvent::Subscribe { identifier, info, result_sender: _result_sender } => { - StreamHubEventMessage::Subscribe { identifier: identifier.clone(), info: info.clone() } - } + StreamHubEvent::Subscribe { + identifier, + info, + result_sender: _result_sender, + } => StreamHubEventMessage::Subscribe { + identifier: identifier.clone(), + info: info.clone(), + }, StreamHubEvent::UnSubscribe { identifier, info } => { - StreamHubEventMessage::UnSubscribe { identifier: identifier.clone(), info: info.clone() } - } - StreamHubEvent::Publish { identifier, info, result_sender: _result_sender, stream_handler: _stream_handler } => { - StreamHubEventMessage::Publish { identifier: identifier.clone(), info: info.clone() } - } - StreamHubEvent::UnPublish { identifier, info } => { - StreamHubEventMessage::UnPublish { identifier: identifier.clone(), info: info.clone() } - } - _ => { - StreamHubEventMessage::NotSupport {} + StreamHubEventMessage::UnSubscribe { + identifier: identifier.clone(), + info: info.clone(), + } } + StreamHubEvent::Publish { + identifier, + info, + result_sender: _result_sender, + stream_handler: _stream_handler, + } => StreamHubEventMessage::Publish { + identifier: identifier.clone(), + info: info.clone(), + }, + StreamHubEvent::UnPublish { identifier, info } => StreamHubEventMessage::UnPublish { + identifier: identifier.clone(), + info: info.clone(), + }, + _ => StreamHubEventMessage::NotSupport {}, } } } @@ -339,11 +385,25 @@ impl fmt::Display for TransceiverEvent { #[derive(Debug, Clone)] pub enum BroadcastEvent { /*Need publish(push) a stream to other rtmp server*/ - Publish { identifier: StreamIdentifier }, - UnPublish { identifier: StreamIdentifier }, + Publish { + identifier: StreamIdentifier, + }, + UnPublish { + identifier: StreamIdentifier, + }, /*Need subscribe(pull) a stream from other rtmp server*/ - Subscribe { identifier: StreamIdentifier }, - UnSubscribe { identifier: StreamIdentifier }, + Subscribe { + id: String, + identifier: StreamIdentifier, + server_address: Option, + result_sender: Option, + }, + UnSubscribe { + id: String, + result_sender: Option, + //identifier: StreamIdentifier, + //server_address: Option, + }, } pub enum StatisticData { diff --git a/library/streamhub/src/errors.rs b/library/streamhub/src/errors.rs index 0239dde4..f57b6481 100644 --- a/library/streamhub/src/errors.rs +++ b/library/streamhub/src/errors.rs @@ -31,6 +31,8 @@ pub enum StreamHubErrorValue { RecvError(RecvError), #[fail(display = "Serde json error")] SerdeError(Error), + #[fail(display = "the client session error: {}", _0)] + RtspClientSessionError(String), } #[derive(Debug)] pub struct StreamHubError { @@ -85,6 +87,14 @@ impl From for StreamHubError { } } +impl From for StreamHubError { + fn from(error: String) -> Self { + StreamHubError { + value: StreamHubErrorValue::RtspClientSessionError(error), + } + } +} + // impl From for ChannelError { // fn from(error: CacheError) -> Self { // ChannelError { diff --git a/library/streamhub/src/lib.rs b/library/streamhub/src/lib.rs index e8d3e983..abad6a0d 100644 --- a/library/streamhub/src/lib.rs +++ b/library/streamhub/src/lib.rs @@ -1,6 +1,6 @@ use define::{ - FrameDataReceiver, PacketDataReceiver, PacketDataSender, StatisticData, StatisticDataReceiver, - StatisticDataSender, + FrameDataReceiver, PacketDataReceiver, PacketDataSender, RelayType, StatisticData, + StatisticDataReceiver, StatisticDataSender, }; use serde_json::{json, Value}; use statistics::{StatisticSubscriber, StatisticsStream}; @@ -407,7 +407,7 @@ impl StreamDataTransceiver { } TransceiverEvent::UnSubscribe { info } => { match info.sub_type { - SubscribeType::PlayerRtp | SubscribeType::PlayerWebrtc => { + SubscribeType::RtpPull | SubscribeType::WhepPull => { packet_senders.lock().await.remove(&info.id); } _ => { @@ -568,6 +568,7 @@ impl StreamsHub { pub async fn event_loop(&mut self) { while let Some(event) = self.hub_event_receiver.recv().await { let message = event.to_message(); + match event { StreamHubEvent::Publish { identifier, @@ -725,7 +726,6 @@ impl StreamsHub { uuid, result_sender, } => { - log::info!("api_statistic1: stream identifier: {:?}", identifier); let result = match self.api_statistic(top_n, identifier, uuid).await { Ok(rv) => rv, Err(err) => { @@ -743,6 +743,32 @@ impl StreamsHub { log::error!("api_kick_off_client api error: {}", err); } } + StreamHubEvent::ApiStartRelayStream { + id, + identifier, + server_address, + relay_type, + result_sender, + } => { + let result = self + .api_start_relay_stream(id, &relay_type, identifier, server_address) + .await; + + if let Err(err) = result_sender.send(result) { + log::error!("event_loop api error: {:?}", err); + } + } + StreamHubEvent::ApiStopRelayStream { + id, + relay_type, + result_sender, + } => { + let result = self.api_stop_relay_stream(id, &relay_type).await; + + if let Err(err) = result_sender.send(result) { + log::error!("event_loop api error: {:?}", err); + } + } StreamHubEvent::Request { identifier, sender } => { if let Err(err) = self.request(&identifier, sender) { log::error!("event_loop request error: {}", err); @@ -868,6 +894,69 @@ impl StreamsHub { Ok(()) } + async fn api_start_relay_stream( + &mut self, + id: String, + relay_type: &RelayType, + identifier: StreamIdentifier, + server_address: String, + ) -> Result<(), StreamHubError> { + let (result_sender, mut result_receiver) = mpsc::channel(1); + + match relay_type { + RelayType::Pull => { + let client_event = BroadcastEvent::Subscribe { + id, + identifier, + server_address: Some(server_address), + result_sender: Some(result_sender), + }; + + //send subscribe info to pull clients + self.client_event_sender + .send(client_event) + .map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendError, + })?; + } + RelayType::Push => {} + } + + if let Some(received_message) = result_receiver.recv().await { + return received_message; + } + Ok(()) + } + + async fn api_stop_relay_stream( + &mut self, + id: String, + relay_type: &RelayType, + ) -> Result<(), StreamHubError> { + let (result_sender, mut result_receiver) = mpsc::channel(1); + match relay_type { + RelayType::Pull => { + let client_event = BroadcastEvent::UnSubscribe { + id, + result_sender: Some(result_sender), + }; + + //send subscribe info to pull clients + self.client_event_sender + .send(client_event) + .map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendError, + })?; + } + RelayType::Push => {} + } + + if let Some(received_message) = result_receiver.recv().await { + return received_message; + } + Ok(()) + } + //player subscribe a stream pub async fn subscribe( &mut self, @@ -894,7 +983,10 @@ impl StreamsHub { log::info!("subscribe: try to pull stream, identifier: {}", identifer); let client_event = BroadcastEvent::Subscribe { + id: String::from("rtmp_relay"), identifier: identifer.clone(), + server_address: None, + result_sender: None, }; //send subscribe info to pull clients diff --git a/protocol/hls/CHANGELOG.md b/protocol/hls/CHANGELOG.md index 6f9c29fb..a4316e37 100644 --- a/protocol/hls/CHANGELOG.md +++ b/protocol/hls/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.5.5] - 2021-08-11 +- Some refactor work. + ## [0.5.4] - 2021-05-18 - fix RTMP publish single AAC from ffmpeg client. diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index c923d263..088b7bb7 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -51,7 +51,7 @@ impl FlvDataReceiver { } pub async fn run(&mut self) -> Result<(), HlsError> { - self.subscribe_from_rtmp_channels(self.app_name.clone(), self.stream_name.clone()) + self.subscribe_from_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; self.receive_flv_data().await?; @@ -85,14 +85,14 @@ impl FlvDataReceiver { } self.media_processor.clear_files()?; - self.unsubscribe_from_rtmp_channels().await + self.unsubscribe_from_stream_hub().await } pub fn flush_response_data(&mut self) -> Result<(), HlsError> { Ok(()) } - pub async fn subscribe_from_rtmp_channels( + pub async fn subscribe_from_stream_hub( &mut self, app_name: String, stream_name: String, @@ -100,7 +100,7 @@ impl FlvDataReceiver { /*the sub info is only used to transfer from RTMP to HLS, but not for client player */ let sub_info = SubscriberInfo { id: self.subscriber_id, - sub_type: SubscribeType::GenerateHls, + sub_type: SubscribeType::RtmpRemux2Hls, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -135,10 +135,10 @@ impl FlvDataReceiver { Ok(()) } - pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HlsError> { + pub async fn unsubscribe_from_stream_hub(&mut self) -> Result<(), HlsError> { let sub_info = SubscriberInfo { id: self.subscriber_id, - sub_type: SubscribeType::PlayerHls, + sub_type: SubscribeType::RtmpRemux2Hls, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -156,7 +156,7 @@ impl FlvDataReceiver { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}", err); + log::error!("unsubscribe_from_stream_hub err {}", err); } Ok(()) diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index 47a312a6..3bf10af9 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -38,7 +38,7 @@ async fn handle_connection(State(auth): State>, req: Request) if auth_val .authenticate( &stream_name, - &query_string.map(|q| SecretCarrier::Query(q)), + &query_string.map(SecretCarrier::Query), true, ) .is_err() diff --git a/protocol/httpflv/CHANGELOG.md b/protocol/httpflv/CHANGELOG.md index 500a58c9..9f4d209e 100644 --- a/protocol/httpflv/CHANGELOG.md +++ b/protocol/httpflv/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.4.5] - 2021-08-11 +- some refactor work. + ## [0.4.4] - 2021-05-18 - fix RTMP publish single AAC from ffmpeg client. diff --git a/protocol/httpflv/src/httpflv.rs b/protocol/httpflv/src/httpflv.rs index b27ced80..e2aa90f7 100644 --- a/protocol/httpflv/src/httpflv.rs +++ b/protocol/httpflv/src/httpflv.rs @@ -70,7 +70,7 @@ impl HttpFlv { } pub async fn run(&mut self) -> Result<(), HttpFLvError> { - self.subscribe_from_rtmp_channels().await?; + self.subscribe_from_stream_hub().await?; self.send_media_stream().await?; Ok(()) @@ -146,7 +146,7 @@ impl HttpFlv { break; } } - self.unsubscribe_from_rtmp_channels().await + self.unsubscribe_from_stream_hub().await } //used for the http-flv protocol @@ -218,10 +218,10 @@ impl HttpFlv { Ok(()) } - pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { + pub async fn unsubscribe_from_stream_hub(&mut self) -> Result<(), HttpFLvError> { let sub_info = SubscriberInfo { id: self.subscriber_id, - sub_type: SubscribeType::PlayerHttpFlv, + sub_type: SubscribeType::RtmpRemux2HttpFlv, sub_data_type: SubDataType::Frame, notify_info: NotifyInfo { request_url: self.request_url.clone(), @@ -239,16 +239,16 @@ impl HttpFlv { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}", err); + log::error!("unsubscribe_from_stream_hub err {}", err); } Ok(()) } - pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { + pub async fn subscribe_from_stream_hub(&mut self) -> Result<(), HttpFLvError> { let sub_info = SubscriberInfo { id: self.subscriber_id, - sub_type: SubscribeType::PlayerHttpFlv, + sub_type: SubscribeType::RtmpRemux2HttpFlv, sub_data_type: SubDataType::Frame, notify_info: NotifyInfo { request_url: self.request_url.clone(), @@ -286,7 +286,7 @@ impl HttpFlv { id: self.subscriber_id, remote_addr: self.remote_addr.to_string(), start_time: chrono::Local::now(), - sub_type: SubscribeType::PlayerHttpFlv, + sub_type: SubscribeType::RtmpRemux2HttpFlv, }; if let Err(err) = sender.send(statistic_subscriber) { log::error!("send statistic_subscriber err: {}", err); diff --git a/protocol/httpflv/src/server.rs b/protocol/httpflv/src/server.rs index afb4e8e5..28eddf34 100644 --- a/protocol/httpflv/src/server.rs +++ b/protocol/httpflv/src/server.rs @@ -39,7 +39,7 @@ async fn handle_connection( if auth_val .authenticate( &stream_name, - &query_string.map(|q| SecretCarrier::Query(q)), + &query_string.map(SecretCarrier::Query), true, ) .is_err() diff --git a/protocol/rtmp/CHANGELOG.md b/protocol/rtmp/CHANGELOG.md index dcc500ef..f56c5212 100644 --- a/protocol/rtmp/CHANGELOG.md +++ b/protocol/rtmp/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate - +## [0.6.5] - 2021-08-11 +- Some refactor work. ## [0.6.4] - 2021-05-18 - Fix: RTMP publish single AAC from ffmpeg client. diff --git a/protocol/rtmp/src/relay/pull_client.rs b/protocol/rtmp/src/relay/pull_client.rs index fc8d1a10..84f7ebdd 100644 --- a/protocol/rtmp/src/relay/pull_client.rs +++ b/protocol/rtmp/src/relay/pull_client.rs @@ -2,7 +2,7 @@ use streamhub::stream::StreamIdentifier; use { super::errors::ClientError, - crate::session::client_session::{ClientSession, ClientType}, + crate::session::client_session::{ClientSession, ClientSessionType}, streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, tokio::net::TcpStream, }; @@ -32,11 +32,14 @@ impl PullClient { let event = self.client_event_consumer.recv().await?; if let BroadcastEvent::Subscribe { + id: _, identifier: StreamIdentifier::Rtmp { app_name, stream_name, }, + server_address: _, + result_sender: _, } = event { log::info!( @@ -48,7 +51,7 @@ impl PullClient { let mut client_session = ClientSession::new( stream, - ClientType::Play, + ClientSessionType::Pull, self.address.clone(), app_name.clone(), stream_name.clone(), diff --git a/protocol/rtmp/src/relay/push_client.rs b/protocol/rtmp/src/relay/push_client.rs index 97c00c67..47489fa1 100644 --- a/protocol/rtmp/src/relay/push_client.rs +++ b/protocol/rtmp/src/relay/push_client.rs @@ -1,6 +1,6 @@ use { super::errors::ClientError, - crate::session::client_session::{ClientSession, ClientType}, + crate::session::client_session::{ClientSession, ClientSessionType}, streamhub::{ define::{StreamHubEventSender, BroadcastEvent, BroadcastEventReceiver}, stream::StreamIdentifier, @@ -51,7 +51,7 @@ impl PushClient { let mut client_session = ClientSession::new( stream, - ClientType::Publish, + ClientSessionType::Push, self.address.clone(), app_name, stream_name, diff --git a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs index b55f47e6..dc62ae94 100644 --- a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs +++ b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs @@ -92,14 +92,14 @@ impl Rtsp2RtmpRemuxerSession { pub async fn publish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .publish_to_channels(self.app_name.clone(), self.stream_name.clone(), 0) + .publish_to_stream_hub(self.app_name.clone(), self.stream_name.clone(), 0) .await?; Ok(()) } pub async fn unpublish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) + .unpublish_to_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; Ok(()) } @@ -108,7 +108,7 @@ impl Rtsp2RtmpRemuxerSession { let (event_result_sender, event_result_receiver) = oneshot::channel(); let sub_info = SubscriberInfo { id: self.subscribe_id, - sub_type: SubscribeType::PlayerRtmp, + sub_type: SubscribeType::RtspRemux2Rtmp, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -138,7 +138,7 @@ impl Rtsp2RtmpRemuxerSession { pub async fn unsubscribe_rtsp(&mut self) -> Result<(), RtmpRemuxerError> { let sub_info = SubscriberInfo { id: self.subscribe_id, - sub_type: SubscribeType::PlayerRtsp, + sub_type: SubscribeType::RtspRemux2Rtmp, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -153,7 +153,7 @@ impl Rtsp2RtmpRemuxerSession { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}", err); + log::error!("unsubscribe_rtsp err {}", err); } Ok(()) diff --git a/protocol/rtmp/src/remuxer/whip2rtmp.rs b/protocol/rtmp/src/remuxer/whip2rtmp.rs index 80684800..e1118483 100644 --- a/protocol/rtmp/src/remuxer/whip2rtmp.rs +++ b/protocol/rtmp/src/remuxer/whip2rtmp.rs @@ -105,14 +105,14 @@ impl Whip2RtmpRemuxerSession { pub async fn publish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .publish_to_channels(self.app_name.clone(), self.stream_name.clone(), 1) + .publish_to_stream_hub(self.app_name.clone(), self.stream_name.clone(), 1) .await?; Ok(()) } pub async fn unpublish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) + .unpublish_to_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; Ok(()) } @@ -122,7 +122,7 @@ impl Whip2RtmpRemuxerSession { let sub_info = SubscriberInfo { id: self.subscribe_id, - sub_type: SubscribeType::PlayerRtmp, + sub_type: SubscribeType::WebRTCRemux2Rtmp, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -153,7 +153,7 @@ impl Whip2RtmpRemuxerSession { pub async fn unsubscribe_whip(&mut self) -> Result<(), RtmpRemuxerError> { let sub_info = SubscriberInfo { id: self.subscribe_id, - sub_type: SubscribeType::PlayerRtmp, + sub_type: SubscribeType::WebRTCRemux2Rtmp, sub_data_type: streamhub::define::SubDataType::Frame, notify_info: NotifyInfo { request_url: String::from(""), @@ -169,7 +169,7 @@ impl Whip2RtmpRemuxerSession { info: sub_info, }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}", err); + log::error!("unsubscribe_whip err {}", err); } Ok(()) diff --git a/protocol/rtmp/src/session/client_session.rs b/protocol/rtmp/src/session/client_session.rs index b285cadd..cf212588 100644 --- a/protocol/rtmp/src/session/client_session.rs +++ b/protocol/rtmp/src/session/client_session.rs @@ -61,9 +61,9 @@ enum ClientSessionPublishState { } #[allow(dead_code)] #[derive(Clone, Debug, PartialEq)] -pub enum ClientType { - Play, - Publish, +pub enum ClientSessionType { + Pull, + Push, } pub struct ClientSession { io: Arc>>, @@ -77,7 +77,7 @@ pub struct ClientSession { raw_stream_name: String, stream_name: String, state: ClientSessionState, - client_type: ClientType, + client_type: ClientSessionType, sub_app_name: Option, sub_stream_name: Option, /*configure how many gops will be cached.*/ @@ -87,7 +87,7 @@ pub struct ClientSession { impl ClientSession { pub fn new( stream: TcpStream, - client_type: ClientType, + client_type: ClientSessionType, raw_domain_name: String, app_name: String, raw_stream_name: String, @@ -104,7 +104,7 @@ impl ClientSession { let tcp_io: Box = Box::new(TcpIO::new(stream)); let net_io = Arc::new(Mutex::new(tcp_io)); - let packetizer = if client_type == ClientType::Publish { + let packetizer = if client_type == ClientSessionType::Push { Some(ChunkPacketizer::new(Arc::clone(&net_io))) } else { None @@ -338,7 +338,7 @@ impl ClientSession { properties.app = Some(self.app_name.clone()); match self.client_type { - ClientType::Play => { + ClientSessionType::Pull => { properties.flash_ver = Some("LNX 9,0,124,2".to_string()); properties.tc_url = Some(url.clone()); properties.fpad = Some(false); @@ -347,7 +347,7 @@ impl ClientSession { properties.video_codecs = Some(252_f64); properties.video_function = Some(1_f64); } - ClientType::Publish => { + ClientSessionType::Push => { properties.pub_type = Some("nonprivate".to_string()); properties.flash_ver = Some("FMLE/3.0 (compatible; xiu)".to_string()); properties.fpad = Some(false); @@ -469,10 +469,10 @@ impl ClientSession { pub fn on_result_create_stream(&mut self) -> Result<(), SessionError> { match self.client_type { - ClientType::Play => { + ClientSessionType::Pull => { self.state = ClientSessionState::Play; } - ClientType::Publish => { + ClientSessionType::Push => { self.state = ClientSessionState::PublishingContent; } } @@ -518,11 +518,11 @@ impl ClientSession { (&self.sub_app_name, &self.sub_stream_name) { self.common - .subscribe_from_channels(app_name.clone(), stream_name.clone()) + .subscribe_from_stream_hub(app_name.clone(), stream_name.clone()) .await?; } else { self.common - .subscribe_from_channels( + .subscribe_from_stream_hub( self.app_name.clone(), self.stream_name.clone(), ) @@ -533,7 +533,7 @@ impl ClientSession { "NetStream.Play.Start" => { //pull from remote rtmp server and publish to local session self.common - .publish_to_channels( + .publish_to_stream_hub( self.app_name.clone(), self.stream_name.clone(), self.gop_num, diff --git a/protocol/rtmp/src/session/common.rs b/protocol/rtmp/src/session/common.rs index a2265544..8f63bb23 100644 --- a/protocol/rtmp/src/session/common.rs +++ b/protocol/rtmp/src/session/common.rs @@ -286,8 +286,8 @@ impl Common { }; let sub_type = match self.session_type { - SessionType::Client => SubscribeType::PublisherRtmp, - SessionType::Server => SubscribeType::PlayerRtmp, + SessionType::Client => SubscribeType::RtmpRelay, + SessionType::Server => SubscribeType::RtmpPull, }; SubscriberInfo { @@ -311,8 +311,8 @@ impl Common { }; let pub_type = match self.session_type { - SessionType::Client => PublishType::RelayRtmp, - SessionType::Server => PublishType::PushRtmp, + SessionType::Client => PublishType::RtmpRelay, + SessionType::Server => PublishType::RtmpPush, }; PublisherInfo { @@ -326,14 +326,14 @@ impl Common { } } - /*Subscribe from local channels and then send data to retmote common player or local RTMP relay push client*/ - pub async fn subscribe_from_channels( + /* Subscribe from stream hub and push stream data to players or other rtmp nodes */ + pub async fn subscribe_from_stream_hub( &mut self, app_name: String, stream_name: String, ) -> Result<(), SessionError> { log::info!( - "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}", + "subscribe_from_stream_hub, app_name: {} stream_name: {} subscribe_id: {}", app_name, stream_name, self.session_id @@ -369,7 +369,7 @@ impl Common { id: self.session_id, remote_addr: self.remote_addr.unwrap().to_string(), start_time: chrono::Local::now(), - sub_type: SubscribeType::PlayerRtmp, + sub_type: SubscribeType::RtmpPull, }; if let Err(err) = sender.send(statistic_subscriber) { log::error!("send statistic_subscriber err: {}", err); @@ -381,7 +381,7 @@ impl Common { Ok(()) } - pub async fn unsubscribe_from_channels( + pub async fn unsubscribe_from_stream_hub( &mut self, app_name: String, stream_name: String, @@ -396,14 +396,14 @@ impl Common { info: self.get_subscriber_info(), }; if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_channels err {}", err); + log::error!("unsubscribe_from_stream_hub err {}", err); } Ok(()) } - /*Begin to receive stream data from remote RTMP push client or local RTMP relay pull client*/ - pub async fn publish_to_channels( + /* Publish RTMP streams to stream hub, the streams can be pushed from remote or pulled from remote to local */ + pub async fn publish_to_stream_hub( &mut self, app_name: String, stream_name: String, @@ -451,13 +451,13 @@ impl Common { Ok(()) } - pub async fn unpublish_to_channels( + pub async fn unpublish_to_stream_hub( &mut self, app_name: String, stream_name: String, ) -> Result<(), SessionError> { log::info!( - "unpublish_to_channels, app_name:{}, stream_name:{}", + "unpublish_to_stream_hub, app_name:{}, stream_name:{}", app_name, stream_name ); @@ -472,7 +472,7 @@ impl Common { match self.event_producer.send(unpublish_event) { Err(_) => { log::error!( - "unpublish_to_channels error.app_name: {}, stream_name: {}", + "unpublish_to_stream_hub error.app_name: {}, stream_name: {}", app_name, stream_name ); @@ -482,7 +482,7 @@ impl Common { } _ => { log::info!( - "unpublish_to_channels successfully.app_name: {}, stream_name: {}", + "unpublish_to_stream_hub successfully.app_name: {}, stream_name: {}", app_name, stream_name ); @@ -577,10 +577,9 @@ impl TStreamHandler for RtmpStreamHandler { })?; } match sub_type { - SubscribeType::PlayerRtmp - | SubscribeType::PlayerHttpFlv - | SubscribeType::PlayerHls - | SubscribeType::GenerateHls => { + SubscribeType::RtmpPull + | SubscribeType::RtmpRemux2HttpFlv + | SubscribeType::RtmpRemux2Hls => { if let Some(gops_data) = cache.get_gops_data() { for gop in gops_data { for channel_data in gop.get_frame_data() { diff --git a/protocol/rtmp/src/session/server_session.rs b/protocol/rtmp/src/session/server_session.rs index 076209f0..65ad7792 100644 --- a/protocol/rtmp/src/session/server_session.rs +++ b/protocol/rtmp/src/session/server_session.rs @@ -164,7 +164,7 @@ impl ServerSession { } Err(err) => { self.common - .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) + .unpublish_to_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(SessionError { @@ -196,7 +196,7 @@ impl ServerSession { Err(err) => { if let UnpackErrorValue::CannotParse = err.value { self.common - .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) + .unpublish_to_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(err)?; } @@ -212,7 +212,7 @@ impl ServerSession { Ok(_) => {} Err(err) => { self.common - .unsubscribe_from_channels(self.app_name.clone(), self.stream_name.clone()) + .unsubscribe_from_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(err); } @@ -489,7 +489,7 @@ impl ServerSession { stream_id: &f64, ) -> Result<(), SessionError> { self.common - .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) + .unpublish_to_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); @@ -654,7 +654,7 @@ impl ServerSession { /*Now it can update the request url*/ self.common.request_url = self.get_request_url(raw_stream_name); self.common - .subscribe_from_channels(self.app_name.clone(), self.stream_name.clone()) + .subscribe_from_stream_hub(self.app_name.clone(), self.stream_name.clone()) .await?; self.state = ServerSessionState::Play; @@ -764,7 +764,7 @@ impl ServerSession { ); self.common - .publish_to_channels( + .publish_to_stream_hub( self.app_name.clone(), self.stream_name.clone(), self.gop_num, diff --git a/protocol/rtsp/CHANGELOG.md b/protocol/rtsp/CHANGELOG.md index bd6d8dbc..465dbae4 100644 --- a/protocol/rtsp/CHANGELOG.md +++ b/protocol/rtsp/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.3.0] - 2021-08-11 +- Support Rtsp pull client. + ## [0.2.3] - 2021-05-18 - Reference bytesio v0.3.3. diff --git a/protocol/rtsp/src/lib.rs b/protocol/rtsp/src/lib.rs index 8560801b..220c6cd0 100644 --- a/protocol/rtsp/src/lib.rs +++ b/protocol/rtsp/src/lib.rs @@ -1,9 +1,10 @@ pub mod global_trait; // pub mod http; +pub mod relay; pub mod rtp; pub mod rtsp; -pub mod rtsp_codec; pub mod rtsp_channel; +pub mod rtsp_codec; pub mod rtsp_range; pub mod rtsp_track; pub mod rtsp_transport; diff --git a/protocol/rtsp/src/relay/errors.rs b/protocol/rtsp/src/relay/errors.rs new file mode 100644 index 00000000..96067865 --- /dev/null +++ b/protocol/rtsp/src/relay/errors.rs @@ -0,0 +1,43 @@ +use { + failure::Fail, + std::{fmt, io::Error}, + tokio::sync::broadcast::error::RecvError, +}; + +#[derive(Debug)] +pub struct RelayError { + pub value: PushClientErrorValue, +} + +impl fmt::Display for RelayError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.value, f) + } +} + +#[derive(Debug, Fail)] +pub enum PushClientErrorValue { + #[fail(display = "receive error")] + ReceiveError(RecvError), + + #[fail(display = "send error")] + SendError, + #[fail(display = "io error")] + IOError(Error), +} + +impl From for RelayError { + fn from(error: Error) -> Self { + RelayError { + value: PushClientErrorValue::IOError(error), + } + } +} + +impl From for RelayError { + fn from(error: RecvError) -> Self { + RelayError { + value: PushClientErrorValue::ReceiveError(error), + } + } +} diff --git a/protocol/rtsp/src/relay/mod.rs b/protocol/rtsp/src/relay/mod.rs new file mode 100644 index 00000000..826df40f --- /dev/null +++ b/protocol/rtsp/src/relay/mod.rs @@ -0,0 +1,2 @@ +pub mod errors; +pub mod pull_client_manager; diff --git a/protocol/rtsp/src/relay/pull_client_manager.rs b/protocol/rtsp/src/relay/pull_client_manager.rs new file mode 100644 index 00000000..a377c34a --- /dev/null +++ b/protocol/rtsp/src/relay/pull_client_manager.rs @@ -0,0 +1,166 @@ +use { + super::errors::RelayError, + crate::{ + rtsp_transport::ProtocolType, + session::{client_session::RtspClientSession, define::ClientSessionType}, + }, + std::{ + collections::HashMap, + sync::{atomic::AtomicBool, Arc}, + }, + streamhub::{ + define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, + errors::{StreamHubError, StreamHubErrorValue}, + stream::StreamIdentifier, + }, + tokio::sync::Mutex, +}; + +pub struct RtspPullClientManager { + clients: HashMap>, + client_event_consumer: BroadcastEventReceiver, + channel_event_producer: StreamHubEventSender, +} + +impl RtspPullClientManager { + pub fn new(consumer: BroadcastEventReceiver, producer: StreamHubEventSender) -> Self { + Self { + clients: HashMap::new(), + client_event_consumer: consumer, + channel_event_producer: producer, + } + } + + pub async fn run(&mut self) -> Result<(), RelayError> { + log::info!("push client run..."); + + loop { + let val = self.client_event_consumer.recv().await?; + + match val { + BroadcastEvent::Subscribe { + id, + identifier, + server_address, + result_sender, + } => { + let sender = result_sender.unwrap(); + + if let StreamIdentifier::Rtsp { stream_path } = identifier { + if let Some(server_address) = server_address { + log::info!("publish stream_path: {}", stream_path.clone()); + + /* judge if the server address / stream path exists */ + if self.clients.get_mut(&id).is_some() { + log::warn!("the client session with id:{} exists", id); + + let err = Err(StreamHubError { + value: StreamHubErrorValue::RtspClientSessionError(format!( + "stream {} exists.", + stream_path + )), + }); + if let Err(send_err) = sender.send(err).await { + log::error!("sender error: {}", send_err); + } + continue; + } + + /* new and run a client, save the client handler for exit */ + match RtspClientSession::new( + server_address.clone(), + stream_path.clone(), + ProtocolType::TCP, + self.channel_event_producer.clone(), + ClientSessionType::Pull, + ) + .await + { + Ok(client_session) => { + self.clients.insert(id, client_session.is_running.clone()); + let arc_client_session = Arc::new(Mutex::new(client_session)); + + tokio::spawn(async move { + if let Err(err) = + arc_client_session.lock().await.run().await + { + log::error!( + "client_session as push client run error: {}", + err + ); + + //let err = Err(StreamHubError { + // value: StreamHubErrorValue::RtspClientSessionError( + // err.to_string(), + // ), + //}); + //if let Err(send_err) = sender.send(err).await { + // log::error!("sender error: {}", send_err); + //} + } + }); + + if let Err(send_err) = sender.send(Ok(())).await { + log::error!("sender error: {}", send_err); + } + } + Err(err) => { + log::error!("new client session err: {}", err); + + let err = Err(StreamHubError { + value: StreamHubErrorValue::RtspClientSessionError( + err.to_string(), + ), + }); + if let Err(send_err) = sender.send(err).await { + log::error!("sender error: {}", send_err); + } + continue; + } + } + } else { + log::error!( + "The Rtsp subscribe parameters does not contain server address: {}", + stream_path + ); + + let err = Err(StreamHubError { + value: StreamHubErrorValue::RtspClientSessionError(String::from( + "The Rtsp subscribe parameters does not contain server address", + )), + }); + if let Err(send_err) = sender.send(err).await { + log::error!("sender error: {}", send_err); + } + continue; + } + } + } + + BroadcastEvent::UnSubscribe { id, result_sender } => { + let sender = result_sender.unwrap(); + /* judge if the server address / stream path exists */ + if let Some(client) = self.clients.get_mut(&id) { + client.store(false, std::sync::atomic::Ordering::Release); + self.clients.remove(&id); + } else { + log::warn!("the client session with id:{} not exists", id); + + let err = Err(StreamHubError { + value: StreamHubErrorValue::RtspClientSessionError(String::from( + "the client session not exists", + )), + }); + if let Err(send_err) = sender.send(err).await { + log::error!("sender error: {}", send_err); + } + } + } + + _ => { + log::info!("push client receive other events"); + } + } + } + } +} diff --git a/protocol/rtsp/src/rtsp.rs b/protocol/rtsp/src/rtsp.rs index b0a049b3..68ed308a 100644 --- a/protocol/rtsp/src/rtsp.rs +++ b/protocol/rtsp/src/rtsp.rs @@ -1,6 +1,6 @@ use streamhub::define::StreamHubEventSender; -use super::session::RtspServerSession; +use super::session::server_session::RtspServerSession; use commonlib::auth::Auth; use std::net::SocketAddr; use tokio::io::Error; diff --git a/protocol/rtsp/src/rtsp_track.rs b/protocol/rtsp/src/rtsp_track.rs index 347388ef..792bf23b 100644 --- a/protocol/rtsp/src/rtsp_track.rs +++ b/protocol/rtsp/src/rtsp_track.rs @@ -62,6 +62,7 @@ impl RtspTrack { loop { match rtp_io.read().await { Ok(data) => { + //log::info!("read rtp data"); reader.extend_from_slice(&data[..]); if let Err(err) = rtp_channel_in.on_packet(&mut reader).await { log::error!("rtp_receive_loop on_packet error: {}", err); diff --git a/protocol/rtsp/src/sdp/mod.rs b/protocol/rtsp/src/sdp/mod.rs index cda2c1cf..aa7dc086 100644 --- a/protocol/rtsp/src/sdp/mod.rs +++ b/protocol/rtsp/src/sdp/mod.rs @@ -378,14 +378,14 @@ mod tests { } #[test] fn test_str() { - let fmts: Vec = vec![5]; - // fmts.push(6); - let fmts_str = fmts - .iter() - .map(|b| b.to_string()) - .collect::>() - .join(" "); - - println!("=={fmts_str}=="); + //let fmts: Vec = vec![5]; + //// fmts.push(6); + //let fmts_str = fmts + // .iter() + // .map(|b| b.to_string()) + // .collect::>() + // .join(" "); + + //println!("=={fmts_str}=="); } } diff --git a/protocol/rtsp/src/session/client_session.rs b/protocol/rtsp/src/session/client_session.rs new file mode 100644 index 00000000..2a6b240d --- /dev/null +++ b/protocol/rtsp/src/session/client_session.rs @@ -0,0 +1,607 @@ +use super::define; +use super::define::ClientSessionType; + +use crate::global_trait::Marshal; +use crate::global_trait::Unmarshal; +use crate::rtsp_codec; + +use crate::rtsp_transport::CastType; + +use super::server_session::InterleavedBinaryData; +use commonlib::http::HttpRequest as RtspRequest; +use commonlib::http::HttpResponse as RtspResponse; +use commonlib::http::Marshal as RtspMarshal; +use commonlib::http::Unmarshal as RtspUnmarshal; +use commonlib::http::Uri; +use streamhub::define::SubscriberInfo; + +use crate::rtp::RtpPacket; + +use crate::rtsp_codec::RtspCodecInfo; +use crate::rtsp_track::RtspTrack; +use crate::rtsp_track::TrackType; +use crate::rtsp_transport::ProtocolType; +use crate::rtsp_transport::RtspTransport; + +use bytes::BytesMut; +use bytesio::bytes_reader::BytesReader; +use bytesio::bytes_writer::AsyncBytesWriter; + +use super::errors::SessionError; +use super::errors::SessionErrorValue; + +use tokio::sync::oneshot; + +use crate::rtp::errors::UnPackerError; +use crate::sdp::Sdp; + +use super::define::rtsp_method_name; + +use bytesio::bytesio::TNetIO; +use bytesio::bytesio::TcpIO; + +use std::collections::HashMap; + +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use super::define::USER_AGENT; + +use streamhub::{ + define::{ + FrameData, NotifyInfo, PublishType, PublisherInfo, StreamHubEvent, StreamHubEventSender, + SubscribeType, + }, + stream::StreamIdentifier, + utils::{RandomDigitCount, Uuid}, +}; +use tokio::net::TcpStream; +use tokio::sync::Mutex; + +use super::server_session::RtspStreamHandler; + +use bytesio::bytesio::new_udpio_pair; + +pub struct RtspClientSession { + address: String, + stream_name: String, + + io: Arc>>, + reader: BytesReader, + writer: AsyncBytesWriter, + + protocol_type: ProtocolType, + tracks: HashMap, + sdp: Sdp, + pub session_id: Option, + pub client_type: super::define::ClientSessionType, + cseq: u16, + stream_handler: Arc, + + event_producer: StreamHubEventSender, + pub is_running: Arc, +} + +impl RtspClientSession { + pub async fn new( + address: String, + stream_name: String, + protocol_type: ProtocolType, + event_producer: StreamHubEventSender, + client_type: ClientSessionType, + ) -> Result { + let stream = TcpStream::connect(address.clone()).await?; + + let net_io: Box = Box::new(TcpIO::new(stream)); + let io = Arc::new(Mutex::new(net_io)); + + Ok(Self { + address, + stream_name, + io: io.clone(), + reader: BytesReader::new(BytesMut::default()), + writer: AsyncBytesWriter::new(io), + protocol_type, + tracks: HashMap::new(), + sdp: Sdp::default(), + session_id: None, + client_type, + event_producer, + + cseq: 1, + + stream_handler: Arc::new(RtspStreamHandler::new()), + is_running: Arc::new(AtomicBool::new(true)), + }) + } + + //publish stream: OPTIONS->ANNOUNCE->SETUP->RECORD->TEARDOWN + //subscribe stream: OPTIONS->DESCRIBE->SETUP->PLAY->TEARDOWN + pub async fn run(&mut self) -> Result<(), SessionError> { + self.send_options().await?; + + match self.client_type { + ClientSessionType::Pull => { + self.send_describe().await?; + self.send_setup().await?; + self.send_play().await?; + } + ClientSessionType::Push => { + self.send_announce().await?; + self.send_setup().await?; + self.send_record().await?; + } + } + + while self.is_running.load(std::sync::atomic::Ordering::Acquire) { + while self.reader.len() < 4 { + let data = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data[..]); + } + + if let Ok(Some(a)) = InterleavedBinaryData::new(&mut self.reader) { + if self.reader.len() < a.length as usize { + let data = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data[..]); + } + self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize) + .await?; + } + } + + self.send_teardown().await?; + + Ok(()) + } + + async fn on_rtp_over_rtsp_message( + &mut self, + channel_identifier: u8, + length: usize, + ) -> Result<(), SessionError> { + let mut cur_reader = BytesReader::new(self.reader.read_bytes(length)?); + + for track in self.tracks.values_mut() { + if let Some(interleaveds) = track.transport.interleaved { + let rtp_identifier = interleaveds[0]; + let rtcp_identifier = interleaveds[1]; + + if channel_identifier == rtp_identifier { + track.on_rtp(&mut cur_reader).await?; + } else if channel_identifier == rtcp_identifier { + track.on_rtcp(&mut cur_reader, self.io.clone()).await; + } + } + } + Ok(()) + } + async fn send_options(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_options"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let request = self.gen_request(rtsp_method_name::OPTIONS, uri_path); + self.send_resquest(&request).await?; + self.receive_response(rtsp_method_name::OPTIONS).await + } + + async fn send_announce(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_announce"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let request = self.gen_request(rtsp_method_name::ANNOUNCE, uri_path); + self.send_resquest(&request).await + } + + async fn send_describe(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_describe"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let mut request = self.gen_request(rtsp_method_name::DESCRIBE, uri_path); + request + .headers + .insert("Accept".to_string(), "application/sdp".to_string()); + self.send_resquest(&request).await?; + self.receive_response(rtsp_method_name::DESCRIBE).await + } + + async fn send_setup(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_setup"); + let sdp_medias = self.sdp.medias.clone(); + + for media in sdp_medias { + let media_control = if let Some(media_control_val) = media.attributes.get("control") { + media_control_val.clone() + } else { + log::error!("cannot get media control!!"); + String::from("") + }; + + let uri_path = format!( + "rtsp://{}/{}/{}", + self.address, self.stream_name, media_control + ); + + let mut request = self.gen_request(rtsp_method_name::SETUP, uri_path); + + match self.protocol_type { + ProtocolType::TCP => { + let kv: Vec<&str> = media_control.trim().splitn(2, '=').collect(); + + let mut media_transport = RtspTransport::default(); + if let Ok(interleaved_idx) = kv[1].parse::() { + media_transport.interleaved = + Some([interleaved_idx * 2, interleaved_idx * 2 + 1]); + } else { + log::error!("cannot get interleaved_idx: {}", kv[1]); + } + + media_transport.protocol_type = ProtocolType::TCP; + media_transport.cast_type = CastType::Unicast; + request + .headers + .insert("Transport".to_string(), media_transport.marshal()); + + if media.media_type == "audio" { + if let Some(track) = self.tracks.get_mut(&TrackType::Audio) { + track.transport.interleaved = media_transport.interleaved; + } + } else if media.media_type == "video" { + if let Some(track) = self.tracks.get_mut(&TrackType::Video) { + track.transport.interleaved = media_transport.interleaved; + } + } + } + ProtocolType::UDP => { + if let Some((socket_rtp, socket_rtcp)) = new_udpio_pair().await { + let media_transport = RtspTransport { + protocol_type: ProtocolType::UDP, + cast_type: CastType::Unicast, + client_port: Some([ + socket_rtp.get_local_port().unwrap(), + socket_rtcp.get_local_port().unwrap(), + ]), + ..Default::default() + }; + + request + .headers + .insert("Transport".to_string(), media_transport.marshal()); + + if media.media_type == "audio" { + if let Some(track) = self.tracks.get_mut(&TrackType::Audio) { + let box_rtp_io: Box = + Box::new(socket_rtp); + track.rtp_receive_loop(box_rtp_io).await; + + let box_rtcp_io: Arc>> = + Arc::new(Mutex::new(Box::new(socket_rtcp))); + track.rtcp_receive_loop(box_rtcp_io).await; + } + } else if media.media_type == "video" { + if let Some(track) = self.tracks.get_mut(&TrackType::Video) { + let box_rtp_io: Box = + Box::new(socket_rtp); + track.rtp_receive_loop(box_rtp_io).await; + + let box_rtcp_io: Arc>> = + Arc::new(Mutex::new(Box::new(socket_rtcp))); + track.rtcp_receive_loop(box_rtcp_io).await; + } + } + } + } + } + + self.send_resquest(&request).await?; + self.receive_response(rtsp_method_name::SETUP).await?; + } + Ok(()) + } + + async fn send_play(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_play"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let mut request = self.gen_request(rtsp_method_name::PLAY, uri_path); + request + .headers + .insert("Range".to_string(), "npt=0.000".to_string()); + + self.send_resquest(&request).await?; + self.receive_response(rtsp_method_name::PLAY).await?; + + Ok(()) + } + + async fn send_record(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_record"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let mut request = self.gen_request(rtsp_method_name::RECORD, uri_path); + request + .headers + .insert("Transport".to_string(), "application/sdp".to_string()); + self.send_resquest(&request).await + } + + async fn send_teardown(&mut self) -> Result<(), SessionError> { + log::info!("rtsp client: send_teardown"); + let uri_path = format!("rtsp://{}/{}", self.address, self.stream_name); + let request = self.gen_request(rtsp_method_name::TEARDOWN, uri_path); + self.send_resquest(&request).await?; + self.exit() + } + + fn gen_request(&mut self, method_name: &str, uri_path: String) -> RtspRequest { + let uri = Uri::unmarshal(&uri_path).unwrap(); + + let mut request = RtspRequest { + method: method_name.to_string(), + uri, + version: "RTSP/1.0".to_string(), + ..Default::default() + }; + + request + .headers + .insert("CSeq".to_string(), self.cseq.to_string()); + self.cseq += 1; + request + .headers + .insert("User-Agent".to_string(), USER_AGENT.to_string()); + + if let Some(session_id) = self.session_id { + request + .headers + .insert("Session".to_string(), session_id.to_string()); + } + + request + } + + fn get_subscriber_info(&mut self) -> SubscriberInfo { + let id = if let Some(session_id) = &self.session_id { + *session_id + } else { + Uuid::new(RandomDigitCount::Zero) + }; + + SubscriberInfo { + id, + sub_type: SubscribeType::RtspRelay, + sub_data_type: streamhub::define::SubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + } + } + + fn get_publisher_info(&mut self) -> PublisherInfo { + let id = if let Some(session_id) = &self.session_id { + *session_id + } else { + Uuid::new(RandomDigitCount::Zero) + }; + + PublisherInfo { + id, + pub_type: PublishType::RtspRelay, + pub_data_type: streamhub::define::PubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + } + } + + fn new_tracks(&mut self) -> Result<(), SessionError> { + for media in &self.sdp.medias { + let media_control = if let Some(media_control_val) = media.attributes.get("control") { + media_control_val.clone() + } else { + String::from("") + }; + + let media_name = &media.media_type; + match media_name.as_str() { + "audio" => { + let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID + .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) + .unwrap() + .clone(); + let codec_info = RtspCodecInfo { + codec_id, + payload_type: media.rtpmap.payload_type as u8, + sample_rate: media.rtpmap.clock_rate, + channel_count: media.rtpmap.encoding_param.parse().unwrap(), + }; + + log::info!("audio codec info: {:?}", codec_info); + + let track = RtspTrack::new(TrackType::Audio, codec_info, media_control); + self.tracks.insert(TrackType::Audio, track); + } + "video" => { + let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID + .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) + .unwrap() + .clone(); + let codec_info = RtspCodecInfo { + codec_id, + payload_type: media.rtpmap.payload_type as u8, + sample_rate: media.rtpmap.clock_rate, + ..Default::default() + }; + log::info!("video codec info: {:?}", codec_info); + let track = RtspTrack::new(TrackType::Video, codec_info, media_control); + self.tracks.insert(TrackType::Video, track); + } + _ => {} + } + } + Ok(()) + } + + async fn send_resquest(&mut self, request: &RtspRequest) -> Result<(), SessionError> { + self.writer.write(request.marshal().as_bytes())?; + self.writer.flush().await?; + + Ok(()) + } + + async fn receive_response(&mut self, method_name: &str) -> Result<(), SessionError> { + let data = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data[..]); + + let mut retry_count = 0; + let rtsp_response; + + loop { + let data = self.reader.get_remaining_bytes(); + if let Some(rtsp_response_data) = RtspResponse::unmarshal(std::str::from_utf8(&data)?) { + // TCP packet sticking issue, if have content_length in header. + // should check the body + if let Some(content_length) = + rtsp_response_data.get_header(&String::from("Content-Length")) + { + if let Ok(uint_num) = content_length.parse::() { + if rtsp_response_data.body.is_none() + || uint_num > rtsp_response_data.body.clone().unwrap().len() + { + if retry_count >= 5 { + log::error!( + "corrupted rtsp message={}", + std::str::from_utf8(&data)? + ); + return Ok(()); + } + retry_count += 1; + let data_recv = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data_recv[..]); + continue; + } + } + } + rtsp_response = rtsp_response_data; + self.reader.extract_remaining_bytes(); + break; + } else { + log::error!("corrupted rtsp message={}", std::str::from_utf8(&data)?); + return Ok(()); + } + } + + if rtsp_response.status_code != http::StatusCode::OK { + log::error!("rtsp response error: {}", rtsp_response.marshal()); + return Err(SessionError { + value: SessionErrorValue::RtspResponseStatusError, + }); + } + + match method_name { + rtsp_method_name::OPTIONS => { + if let Some(public) = rtsp_response.get_header(&"Public".to_string()) { + log::info!("support methods: {}", public); + } + } + rtsp_method_name::ANNOUNCE => {} + rtsp_method_name::DESCRIBE => { + if let Some(request_body) = &rtsp_response.body { + if let Some(sdp) = Sdp::unmarshal(request_body) { + self.sdp = sdp.clone(); + self.stream_handler.set_sdp(sdp).await; + + self.new_tracks()?; + + let (event_result_sender, event_result_receiver) = oneshot::channel(); + let identifier = StreamIdentifier::Rtsp { + stream_path: self.stream_name.clone(), + }; + + let publish_event = StreamHubEvent::Publish { + identifier, + result_sender: event_result_sender, + info: self.get_publisher_info(), + stream_handler: self.stream_handler.clone(), + }; + + if self.event_producer.send(publish_event).is_err() { + return Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }); + } + + let sender = event_result_receiver.await??.0.unwrap(); + + for track in self.tracks.values_mut() { + let sender_out = sender.clone(); + + let mut rtp_channel_guard = track.rtp_channel.lock().await; + rtp_channel_guard.on_frame_handler(Box::new( + move |msg: FrameData| -> Result<(), UnPackerError> { + if let Err(err) = sender_out.send(msg) { + log::error!("send frame error: {}", err); + } + Ok(()) + }, + )); + + let rtcp_channel = Arc::clone(&track.rtcp_channel); + rtp_channel_guard.on_packet_for_rtcp_handler(Box::new( + move |packet: RtpPacket| { + let rtcp_channel_in = Arc::clone(&rtcp_channel); + Box::pin(async move { + rtcp_channel_in.lock().await.on_packet(packet); + }) + }, + )); + } + } + } + } + rtsp_method_name::SETUP => { + if self.session_id.is_none() { + if let Some(session_id) = rtsp_response.get_header(&"Session".to_string()) { + self.session_id = Uuid::from_str2(session_id); + } + } + + if let Some(transport_str) = rtsp_response.get_header(&"Transport".to_string()) { + log::info!("setup response: transport {}", transport_str); + } + } + rtsp_method_name::PLAY => {} + rtsp_method_name::RECORD => {} + _ => {} + } + Ok(()) + } + + pub fn exit(&mut self) -> Result<(), SessionError> { + let identifier = StreamIdentifier::Rtsp { + stream_path: self.stream_name.clone(), + }; + let event = match self.client_type { + define::ClientSessionType::Push => StreamHubEvent::UnSubscribe { + identifier, + info: self.get_subscriber_info(), + }, + define::ClientSessionType::Pull => StreamHubEvent::UnPublish { + identifier, + info: self.get_publisher_info(), + }, + }; + + let event_json_str = serde_json::to_string(&event).unwrap(); + + let rv = self.event_producer.send(event); + match rv { + Err(err) => { + log::error!("session exit: send event error: {err} for event: {event_json_str}"); + Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }) + } + Ok(()) => { + log::info!("session exit: send event success: {event_json_str}"); + Ok(()) + } + } + } +} diff --git a/protocol/rtsp/src/session/define.rs b/protocol/rtsp/src/session/define.rs index 8e13d9f5..1b57da3f 100644 --- a/protocol/rtsp/src/session/define.rs +++ b/protocol/rtsp/src/session/define.rs @@ -28,17 +28,24 @@ pub mod rtsp_method_name { ]; } -pub enum SessionType { - Client, - Server, +pub enum ServerSessionType { + Pull, + Push, } -impl fmt::Display for SessionType { +pub enum ClientSessionType { + Pull, + Push, +} + +impl fmt::Display for ServerSessionType { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let client_type = match self { - SessionType::Client => String::from("client"), - SessionType::Server => String::from("server"), + ServerSessionType::Pull => String::from("pull"), + ServerSessionType::Push => String::from("push"), }; write!(f, "{client_type}") } } + +pub const USER_AGENT: &str = "xiu 0.12.8"; diff --git a/protocol/rtsp/src/session/errors.rs b/protocol/rtsp/src/session/errors.rs index d594de6a..d359e6e4 100644 --- a/protocol/rtsp/src/session/errors.rs +++ b/protocol/rtsp/src/session/errors.rs @@ -5,6 +5,7 @@ use { commonlib::errors::AuthError, failure::{Backtrace, Fail}, std::fmt, + std::io::Error, std::str::Utf8Error, streamhub::errors::StreamHubError, tokio::sync::oneshot::error::RecvError, @@ -41,6 +42,10 @@ pub enum SessionErrorValue { AuthError(#[cause] AuthError), #[fail(display = "Channel receive error")] ChannelRecvError, + #[fail(display = "io error")] + IOError(#[cause] Error), + #[fail(display = "RTSP response status error")] + RtspResponseStatusError, } impl From for SessionError { @@ -115,6 +120,14 @@ impl From for SessionError { } } +impl From for SessionError { + fn from(error: Error) -> Self { + SessionError { + value: SessionErrorValue::IOError(error), + } + } +} + impl fmt::Display for SessionError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.value, f) diff --git a/protocol/rtsp/src/session/mod.rs b/protocol/rtsp/src/session/mod.rs index 5694ddb9..3092b9e2 100644 --- a/protocol/rtsp/src/session/mod.rs +++ b/protocol/rtsp/src/session/mod.rs @@ -1,910 +1,4 @@ +pub mod client_session; pub mod define; pub mod errors; -use super::rtsp_codec; -use crate::global_trait::Marshal; -use crate::global_trait::Unmarshal; - -use crate::rtp::define::ANNEXB_NALU_START_CODE; -use crate::rtp::utils::Marshal as RtpMarshal; - -use commonlib::auth::SecretCarrier; -use commonlib::http::HttpRequest as RtspRequest; -use commonlib::http::HttpResponse as RtspResponse; -use commonlib::http::Marshal as RtspMarshal; -use commonlib::http::Unmarshal as RtspUnmarshal; - -use crate::rtp::RtpPacket; -use crate::rtsp_range::RtspRange; - -use crate::sdp::fmtp::Fmtp; - -use crate::rtsp_codec::RtspCodecInfo; -use crate::rtsp_track::RtspTrack; -use crate::rtsp_track::TrackType; -use crate::rtsp_transport::ProtocolType; -use crate::rtsp_transport::RtspTransport; - -use byteorder::BigEndian; -use bytes::BytesMut; -use bytesio::bytes_reader::BytesReader; -use bytesio::bytes_writer::AsyncBytesWriter; - -use bytesio::bytes_writer::BytesWriter; -use bytesio::bytesio::UdpIO; -use errors::SessionError; -use errors::SessionErrorValue; -use http::StatusCode; -use streamhub::define::DataSender; -use streamhub::define::MediaInfo; -use streamhub::define::VideoCodecType; -use tokio::sync::oneshot; - -use super::rtp::errors::UnPackerError; -use super::sdp::Sdp; - -use async_trait::async_trait; -use bytesio::bytesio::TNetIO; -use bytesio::bytesio::TcpIO; -use define::rtsp_method_name; - -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::mpsc; - -use commonlib::auth::Auth; -use streamhub::{ - define::{ - FrameData, Information, InformationSender, NotifyInfo, PublishType, PublisherInfo, - StreamHubEvent, StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, - }, - errors::{StreamHubError, StreamHubErrorValue}, - statistics::StatisticsStream, - stream::StreamIdentifier, - utils::{RandomDigitCount, Uuid}, -}; -use tokio::net::TcpStream; -use tokio::sync::Mutex; - -pub struct RtspServerSession { - io: Arc>>, - reader: BytesReader, - writer: AsyncBytesWriter, - - tracks: HashMap, - sdp: Sdp, - pub session_id: Option, - pub session_type: define::SessionType, - - stream_handler: Arc, - event_producer: StreamHubEventSender, - - auth: Option, - - pub stream_identifier: Option, - pub is_normal_exit: bool, -} - -pub struct InterleavedBinaryData { - channel_identifier: u8, - length: u16, -} - -impl InterleavedBinaryData { - // 10.12 Embedded (Interleaved) Binary Data - // Stream data such as RTP packets is encapsulated by an ASCII dollar - // sign (24 hexadecimal), followed by a one-byte channel identifier, - // followed by the length of the encapsulated binary data as a binary, - // two-byte integer in network byte order - fn new(reader: &mut BytesReader) -> Result, SessionError> { - let is_dollar_sign = reader.advance_u8()? == 0x24; - log::debug!("dollar sign: {}", is_dollar_sign); - if is_dollar_sign { - reader.read_u8()?; - let channel_identifier = reader.read_u8()?; - log::debug!("channel_identifier: {}", channel_identifier); - let length = reader.read_u16::()?; - log::debug!("length: {}", length); - return Ok(Some(InterleavedBinaryData { - channel_identifier, - length, - })); - } - Ok(None) - } -} - -impl RtspServerSession { - pub fn new( - stream: TcpStream, - event_producer: StreamHubEventSender, - auth: Option, - ) -> Self { - // let remote_addr = if let Ok(addr) = stream.peer_addr() { - // log::info!("server session: {}", addr.to_string()); - // Some(addr) - // } else { - // None - // }; - - let net_io: Box = Box::new(TcpIO::new(stream)); - let io = Arc::new(Mutex::new(net_io)); - - Self { - io: io.clone(), - reader: BytesReader::new(BytesMut::default()), - writer: AsyncBytesWriter::new(io), - tracks: HashMap::new(), - sdp: Sdp::default(), - session_id: None, - session_type: define::SessionType::Server, - event_producer, - stream_handler: Arc::new(RtspStreamHandler::new()), - auth, - stream_identifier: None, - is_normal_exit: false, - } - } - - pub async fn run(&mut self) -> Result<(), SessionError> { - loop { - while self.reader.len() < 4 { - let data = self.io.lock().await.read().await?; - self.reader.extend_from_slice(&data[..]); - } - - if let Ok(data) = InterleavedBinaryData::new(&mut self.reader) { - match data { - Some(a) => { - if self.reader.len() < a.length as usize { - let data = self.io.lock().await.read().await?; - self.reader.extend_from_slice(&data[..]); - } - self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize) - .await?; - } - None => { - self.on_rtsp_message().await?; - } - } - } - } - } - - async fn on_rtp_over_rtsp_message( - &mut self, - channel_identifier: u8, - length: usize, - ) -> Result<(), SessionError> { - let mut cur_reader = BytesReader::new(self.reader.read_bytes(length)?); - - for track in self.tracks.values_mut() { - if let Some(interleaveds) = track.transport.interleaved { - let rtp_identifier = interleaveds[0]; - let rtcp_identifier = interleaveds[1]; - - if channel_identifier == rtp_identifier { - track.on_rtp(&mut cur_reader).await?; - } else if channel_identifier == rtcp_identifier { - track.on_rtcp(&mut cur_reader, self.io.clone()).await; - } - } - } - Ok(()) - } - - //publish stream: OPTIONS->ANNOUNCE->SETUP->RECORD->TEARDOWN - //subscribe stream: OPTIONS->DESCRIBE->SETUP->PLAY->TEARDOWN - async fn on_rtsp_message(&mut self) -> Result<(), SessionError> { - let rtsp_request: RtspRequest; - let mut retry_count = 0; - loop { - // TODO(all) : shoud check if have '\r\n\r\n' firstly. - let data = self.reader.get_remaining_bytes(); - if let Some(rtsp_request_data) = RtspRequest::unmarshal(std::str::from_utf8(&data)?) { - // TCP packet sticking issue, if have content_length in header. - // should check the body - if let Some(content_length) = - rtsp_request_data.get_header(&String::from("Content-Length")) - { - if let Ok(uint_num) = content_length.parse::() { - if rtsp_request_data.body.is_none() - || uint_num > rtsp_request_data.body.clone().unwrap().len() - { - if retry_count >= 5 { - log::error!( - "corrupted rtsp message={}", - std::str::from_utf8(&data)? - ); - return Ok(()); - } - retry_count += 1; - let data_recv = self.io.lock().await.read().await?; - self.reader.extend_from_slice(&data_recv[..]); - continue; - } - } - } - rtsp_request = rtsp_request_data; - self.reader.extract_remaining_bytes(); - } else { - log::error!("corrupted rtsp message={}", std::str::from_utf8(&data)?); - return Ok(()); - } - break; - } - - match rtsp_request.method.as_str() { - rtsp_method_name::OPTIONS => { - self.handle_options(&rtsp_request).await?; - } - rtsp_method_name::DESCRIBE => { - self.handle_describe(&rtsp_request).await?; - } - rtsp_method_name::ANNOUNCE => { - self.handle_announce(&rtsp_request).await?; - } - rtsp_method_name::SETUP => { - self.handle_setup(&rtsp_request).await?; - } - rtsp_method_name::PLAY => { - if let Err(err) = self.handle_play(&rtsp_request).await { - log::info!("handle_play error: {}", err); - } - } - rtsp_method_name::RECORD => { - self.handle_record(&rtsp_request).await?; - } - rtsp_method_name::TEARDOWN => { - self.handle_teardown(&rtsp_request)?; - } - rtsp_method_name::PAUSE => {} - rtsp_method_name::GET_PARAMETER => {} - rtsp_method_name::SET_PARAMETER => {} - rtsp_method_name::REDIRECT => {} - - _ => {} - } - Ok(()) - } - - async fn handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let status_code = http::StatusCode::OK; - let mut response = Self::gen_response(status_code, rtsp_request); - let public_str = rtsp_method_name::ARRAY.join(","); - response.headers.insert("Public".to_string(), public_str); - self.send_response(&response).await?; - - Ok(()) - } - - async fn handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let status_code = http::StatusCode::OK; - - // The sender is used for sending sdp information from the server session to client session - // receiver is used to receive the sdp information - let (sender, mut receiver) = mpsc::unbounded_channel(); - - let identifier = StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }; - self.stream_identifier = Some(identifier.clone()); - - let request_event = StreamHubEvent::Request { identifier, sender }; - - if self.event_producer.send(request_event).is_err() { - return Err(SessionError { - value: SessionErrorValue::StreamHubEventSendErr, - }); - } - - if let Some(Information::Sdp { data }) = receiver.recv().await { - if let Some(sdp) = Sdp::unmarshal(&data) { - self.sdp = sdp; - //it can new tracks when get the sdp information; - self.new_tracks()?; - } - } - - let mut response = Self::gen_response(status_code, rtsp_request); - let sdp = self.sdp.marshal(); - log::debug!("sdp: {}", sdp); - response.body = Some(sdp); - response - .headers - .insert("Content-Type".to_string(), "application/sdp".to_string()); - self.send_response(&response).await?; - - Ok(()) - } - - async fn handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - if let Some(auth) = &self.auth { - let stream_name = rtsp_request.uri.path.clone(); - auth.authenticate( - &stream_name, - &rtsp_request - .uri - .query - .as_ref() - .map(|q| SecretCarrier::Query(q.to_string())), - false, - )?; - } - - if let Some(request_body) = &rtsp_request.body { - if let Some(sdp) = Sdp::unmarshal(request_body) { - self.sdp = sdp.clone(); - self.stream_handler.set_sdp(sdp).await; - } - } - - //new tracks for publish session - self.new_tracks()?; - - let (event_result_sender, event_result_receiver) = oneshot::channel(); - - let identifier = StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }; - self.stream_identifier = Some(identifier.clone()); - - let publish_event = StreamHubEvent::Publish { - identifier, - result_sender: event_result_sender, - info: self.get_publisher_info(), - stream_handler: self.stream_handler.clone(), - }; - - if self.event_producer.send(publish_event).is_err() { - return Err(SessionError { - value: SessionErrorValue::StreamHubEventSendErr, - }); - } - - let sender = event_result_receiver.await??.0.unwrap(); - - for track in self.tracks.values_mut() { - let sender_out = sender.clone(); - let mut rtp_channel_guard = track.rtp_channel.lock().await; - - rtp_channel_guard.on_frame_handler(Box::new( - move |msg: FrameData| -> Result<(), UnPackerError> { - if let Err(err) = sender_out.send(msg) { - log::error!("send frame error: {}", err); - } - Ok(()) - }, - )); - - let rtcp_channel = Arc::clone(&track.rtcp_channel); - rtp_channel_guard.on_packet_for_rtcp_handler(Box::new(move |packet: RtpPacket| { - let rtcp_channel_in = Arc::clone(&rtcp_channel); - Box::pin(async move { - rtcp_channel_in.lock().await.on_packet(packet); - }) - })); - } - - let status_code = http::StatusCode::OK; - let response = Self::gen_response(status_code, rtsp_request); - self.send_response(&response).await?; - - Ok(()) - } - - async fn handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let status_code = http::StatusCode::OK; - let mut response = Self::gen_response(status_code, rtsp_request); - - for track in self.tracks.values_mut() { - if !rtsp_request.uri.marshal().contains(&track.media_control) { - continue; - } - - if let Some(transport_data) = rtsp_request.get_header(&"Transport".to_string()) { - if self.session_id.is_none() { - self.session_id = Some(Uuid::new(RandomDigitCount::Zero)); - } - - let transport = RtspTransport::unmarshal(transport_data); - - if let Some(mut trans) = transport { - let mut rtp_server_port: Option = None; - let mut rtcp_server_port: Option = None; - - match trans.protocol_type { - ProtocolType::TCP => { - track.create_packer(self.io.clone()).await; - } - ProtocolType::UDP => { - let (rtp_port, rtcp_port) = - if let Some(client_ports) = trans.client_port { - (client_ports[0], client_ports[1]) - } else { - log::error!("should not be here!!"); - (0, 0) - }; - - let address = rtsp_request.uri.host.clone(); - if let Some(rtp_io) = UdpIO::new(address.clone(), rtp_port, 0).await { - rtp_server_port = rtp_io.get_local_port(); - - let box_udp_io: Box = Box::new(rtp_io); - //if mode is empty then it is a player session. - if trans.transport_mod.is_none() { - track.create_packer(Arc::new(Mutex::new(box_udp_io))).await; - } else { - track.rtp_receive_loop(box_udp_io).await; - } - } - - if let Some(rtcp_io) = - UdpIO::new(address.clone(), rtcp_port, rtp_server_port.unwrap() + 1) - .await - { - rtcp_server_port = rtcp_io.get_local_port(); - let box_rtcp_io: Arc>> = - Arc::new(Mutex::new(Box::new(rtcp_io))); - track.rtcp_receive_loop(box_rtcp_io).await; - } - } - } - - //tell client the udp ports of server side - let mut server_ports: [u16; 2] = [0, 0]; - if let Some(rtp_port) = rtp_server_port { - server_ports[0] = rtp_port; - } - if let Some(rtcp_server_port) = rtcp_server_port { - server_ports[1] = rtcp_server_port; - trans.server_port = Some(server_ports); - } - - let new_transport_data = trans.marshal(); - response - .headers - .insert("Transport".to_string(), new_transport_data); - response - .headers - .insert("Session".to_string(), self.session_id.unwrap().to_string()); - - track.set_transport(trans).await; - } - } - break; - } - - self.send_response(&response).await?; - - Ok(()) - } - - async fn handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - if let Some(auth) = &self.auth { - let stream_name = rtsp_request.uri.path.clone(); - auth.authenticate( - &stream_name, - &rtsp_request - .uri - .query - .as_ref() - .map(|q| SecretCarrier::Query(q.to_string())), - true, - )?; - } - - for track in self.tracks.values_mut() { - let protocol_type = track.transport.protocol_type.clone(); - - match protocol_type { - ProtocolType::TCP => { - let channel_identifer = if let Some(interleaveds) = track.transport.interleaved - { - interleaveds[0] - } else { - log::error!("handle_play:should not be here!!!"); - 0 - }; - - track.rtp_channel.lock().await.on_packet_handler(Box::new( - move |io: Arc>>, packet: RtpPacket| { - Box::pin(async move { - let msg = packet.marshal()?; - let mut bytes_writer = AsyncBytesWriter::new(io); - bytes_writer.write_u8(0x24)?; - bytes_writer.write_u8(channel_identifer)?; - bytes_writer.write_u16::(msg.len() as u16)?; - bytes_writer.write(&msg)?; - bytes_writer.flush().await?; - Ok(()) - }) - }, - )); - } - ProtocolType::UDP => { - track.rtp_channel.lock().await.on_packet_handler(Box::new( - move |io: Arc>>, packet: RtpPacket| { - Box::pin(async move { - let mut bytes_writer = AsyncBytesWriter::new(io); - - let msg = packet.marshal()?; - bytes_writer.write(&msg)?; - bytes_writer.flush().await?; - Ok(()) - }) - }, - )); - } - } - } - - let status_code = http::StatusCode::OK; - let response = Self::gen_response(status_code, rtsp_request); - - self.send_response(&response).await?; - - self.session_type = define::SessionType::Client; - - let (event_result_sender, event_result_receiver) = oneshot::channel(); - - let subscribe_event = StreamHubEvent::Subscribe { - identifier: StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }, - info: self.get_subscriber_info(), - result_sender: event_result_sender, - }; - - if self.event_producer.send(subscribe_event).is_err() { - return Err(SessionError { - value: SessionErrorValue::StreamHubEventSendErr, - }); - } - - let mut receiver = event_result_receiver.await??.0.frame_receiver.unwrap(); - - let mut retry_times = 0; - loop { - if let Some(frame_data) = receiver.recv().await { - match frame_data { - FrameData::Audio { - timestamp, - mut data, - } => { - if let Some(audio_track) = self.tracks.get_mut(&TrackType::Audio) { - audio_track - .rtp_channel - .lock() - .await - .on_frame(&mut data, timestamp) - .await?; - } - } - FrameData::Video { - timestamp, - mut data, - } => { - if let Some(video_track) = self.tracks.get_mut(&TrackType::Video) { - video_track - .rtp_channel - .lock() - .await - .on_frame(&mut data, timestamp) - .await?; - } - } - _ => {} - } - } else { - retry_times += 1; - log::info!( - "send_channel_data: no data receives ,retry {} times!", - retry_times - ); - - if retry_times > 10 { - return Err(SessionError { - value: SessionErrorValue::CannotReceiveFrameData, - }); - } - } - } - } - - async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let status_code = http::StatusCode::OK; - let mut response = Self::gen_response(status_code, rtsp_request); - - //A stream published by gstreamer does not support the Range header - //https://github.com/harlanc/xiu/issues/135 - if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) { - if let Some(range) = RtspRange::unmarshal(range_str) { - response - .headers - .insert(String::from("Range"), range.marshal()); - } - } - - response - .headers - .insert("Session".to_string(), self.session_id.unwrap().to_string()); - - self.send_response(&response).await?; - - Ok(()) - } - - fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let identifier = StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }; - log::info!("handle_teardown..."); - self.exit(identifier) - } - - pub fn exit(&mut self, identifier: StreamIdentifier) -> Result<(), SessionError> { - let event = match self.session_type { - define::SessionType::Client => StreamHubEvent::UnSubscribe { - identifier, - info: self.get_subscriber_info(), - }, - define::SessionType::Server => StreamHubEvent::UnPublish { - identifier, - info: self.get_publisher_info(), - }, - }; - - let event_json_str = serde_json::to_string(&event).unwrap(); - - let rv = self.event_producer.send(event); - match rv { - Err(err) => { - log::error!("session exit: send event error: {err} for event: {event_json_str}"); - Err(SessionError { - value: SessionErrorValue::StreamHubEventSendErr, - }) - } - Ok(()) => { - self.is_normal_exit = true; - log::info!("session exit: send event success: {event_json_str}"); - Ok(()) - } - } - } - - fn new_tracks(&mut self) -> Result<(), SessionError> { - for media in &self.sdp.medias { - let media_control = if let Some(media_control_val) = media.attributes.get("control") { - media_control_val.clone() - } else { - String::from("") - }; - - let media_name = &media.media_type; - log::info!("media_name: {}", media_name); - match media_name.as_str() { - "audio" => { - let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID - .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) - .unwrap() - .clone(); - let codec_info = RtspCodecInfo { - codec_id, - payload_type: media.rtpmap.payload_type as u8, - sample_rate: media.rtpmap.clock_rate, - channel_count: media.rtpmap.encoding_param.parse().unwrap(), - }; - - log::info!("audio codec info: {:?}", codec_info); - - let track = RtspTrack::new(TrackType::Audio, codec_info, media_control); - self.tracks.insert(TrackType::Audio, track); - } - "video" => { - let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID - .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) - .unwrap() - .clone(); - let codec_info = RtspCodecInfo { - codec_id, - payload_type: media.rtpmap.payload_type as u8, - sample_rate: media.rtpmap.clock_rate, - ..Default::default() - }; - let track = RtspTrack::new(TrackType::Video, codec_info, media_control); - self.tracks.insert(TrackType::Video, track); - } - _ => {} - } - } - Ok(()) - } - - fn gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse { - let reason_phrase = if let Some(reason) = status_code.canonical_reason() { - reason.to_string() - } else { - "".to_string() - }; - - let mut response = RtspResponse { - version: "RTSP/1.0".to_string(), - status_code: status_code.as_u16(), - reason_phrase, - ..Default::default() - }; - - if let Some(cseq) = rtsp_request.headers.get("CSeq") { - response - .headers - .insert("CSeq".to_string(), cseq.to_string()); - } - - response - } - - fn get_subscriber_info(&mut self) -> SubscriberInfo { - let id = if let Some(session_id) = &self.session_id { - *session_id - } else { - Uuid::new(RandomDigitCount::Zero) - }; - - SubscriberInfo { - id, - sub_type: SubscribeType::PlayerRtsp, - sub_data_type: streamhub::define::SubDataType::Frame, - notify_info: NotifyInfo { - request_url: String::from(""), - remote_addr: String::from(""), - }, - } - } - - fn get_publisher_info(&mut self) -> PublisherInfo { - let id = if let Some(session_id) = &self.session_id { - *session_id - } else { - Uuid::new(RandomDigitCount::Zero) - }; - - PublisherInfo { - id, - pub_type: PublishType::PushRtsp, - pub_data_type: streamhub::define::PubDataType::Frame, - notify_info: NotifyInfo { - request_url: String::from(""), - remote_addr: String::from(""), - }, - } - } - - async fn send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError> { - self.writer.write(response.marshal().as_bytes())?; - self.writer.flush().await?; - - Ok(()) - } -} - -#[derive(Default)] -pub struct RtspStreamHandler { - sdp: Mutex, -} - -impl RtspStreamHandler { - pub fn new() -> Self { - Self { - sdp: Mutex::new(Sdp::default()), - } - } - pub async fn set_sdp(&self, sdp: Sdp) { - *self.sdp.lock().await = sdp; - } -} - -#[async_trait] -impl TStreamHandler for RtspStreamHandler { - async fn send_prior_data( - &self, - data_sender: DataSender, - sub_type: SubscribeType, - ) -> Result<(), StreamHubError> { - let sender = match data_sender { - DataSender::Frame { sender } => sender, - DataSender::Packet { sender: _ } => { - return Err(StreamHubError { - value: StreamHubErrorValue::NotCorrectDataSenderType, - }); - } - }; - match sub_type { - SubscribeType::PlayerRtmp => { - let sdp_info = self.sdp.lock().await; - let mut video_clock_rate: u32 = 0; - let mut audio_clock_rate: u32 = 0; - - let mut vcodec: VideoCodecType = VideoCodecType::H264; - - for media in &sdp_info.medias { - let mut bytes_writer = BytesWriter::new(); - if let Some(fmtp) = &media.fmtp { - match fmtp { - Fmtp::H264(data) => { - bytes_writer.write(&ANNEXB_NALU_START_CODE)?; - bytes_writer.write(&data.sps)?; - bytes_writer.write(&ANNEXB_NALU_START_CODE)?; - bytes_writer.write(&data.pps)?; - - let frame_data = FrameData::Video { - timestamp: 0, - data: bytes_writer.extract_current_bytes(), - }; - if let Err(err) = sender.send(frame_data) { - log::error!("send sps/pps error: {}", err); - } - video_clock_rate = media.rtpmap.clock_rate; - } - Fmtp::H265(data) => { - bytes_writer.write(&ANNEXB_NALU_START_CODE)?; - bytes_writer.write(&data.sps)?; - bytes_writer.write(&ANNEXB_NALU_START_CODE)?; - bytes_writer.write(&data.pps)?; - bytes_writer.write(&ANNEXB_NALU_START_CODE)?; - bytes_writer.write(&data.vps)?; - - let frame_data = FrameData::Video { - timestamp: 0, - data: bytes_writer.extract_current_bytes(), - }; - if let Err(err) = sender.send(frame_data) { - log::error!("send sps/pps/vps error: {}", err); - } - - vcodec = VideoCodecType::H265; - } - Fmtp::Mpeg4(data) => { - let frame_data = FrameData::Audio { - timestamp: 0, - data: data.asc.clone(), - }; - - if let Err(err) = sender.send(frame_data) { - log::error!("send asc error: {}", err); - } - - audio_clock_rate = media.rtpmap.clock_rate; - } - } - } - } - - if let Err(err) = sender.send(FrameData::MediaInfo { - media_info: MediaInfo { - audio_clock_rate, - video_clock_rate, - - vcodec, - }, - }) { - log::error!("send media info error: {}", err); - } - } - SubscribeType::PlayerHls => {} - _ => {} - } - - Ok(()) - } - async fn get_statistic_data(&self) -> Option { - None - } - - async fn send_information(&self, sender: InformationSender) { - if let Err(err) = sender.send(Information::Sdp { - data: self.sdp.lock().await.marshal(), - }) { - log::error!("send_information of rtsp error: {}", err); - } - } -} +pub mod server_session; diff --git a/protocol/rtsp/src/session/server_session.rs b/protocol/rtsp/src/session/server_session.rs new file mode 100644 index 00000000..19c7dadb --- /dev/null +++ b/protocol/rtsp/src/session/server_session.rs @@ -0,0 +1,913 @@ +use crate::global_trait::Marshal; +use crate::global_trait::Unmarshal; +use crate::rtsp_codec; + +use crate::rtp::define::ANNEXB_NALU_START_CODE; +use crate::rtp::utils::Marshal as RtpMarshal; + +use commonlib::auth::SecretCarrier; +use commonlib::http::HttpRequest as RtspRequest; +use commonlib::http::HttpResponse as RtspResponse; +use commonlib::http::Marshal as RtspMarshal; +use commonlib::http::Unmarshal as RtspUnmarshal; + +use crate::rtp::RtpPacket; +use crate::rtsp_range::RtspRange; + +use crate::sdp::fmtp::Fmtp; + +use crate::rtsp_codec::RtspCodecInfo; +use crate::rtsp_track::RtspTrack; +use crate::rtsp_track::TrackType; +use crate::rtsp_transport::ProtocolType; +use crate::rtsp_transport::RtspTransport; + +use byteorder::BigEndian; +use bytes::BytesMut; +use bytesio::bytes_reader::BytesReader; +use bytesio::bytes_writer::AsyncBytesWriter; + +use super::errors::SessionError; +use super::errors::SessionErrorValue; +use bytesio::bytes_writer::BytesWriter; +use bytesio::bytesio::UdpIO; +use http::StatusCode; +use streamhub::define::DataSender; +use streamhub::define::MediaInfo; +use streamhub::define::VideoCodecType; +use tokio::sync::oneshot; + +use crate::rtp::errors::UnPackerError; +use crate::sdp::Sdp; + +use super::define; +use super::define::rtsp_method_name; +use async_trait::async_trait; +use bytesio::bytesio::TNetIO; +use bytesio::bytesio::TcpIO; + +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::mpsc; + +use commonlib::auth::Auth; +use streamhub::{ + define::{ + FrameData, Information, InformationSender, NotifyInfo, PublishType, PublisherInfo, + StreamHubEvent, StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, + }, + errors::{StreamHubError, StreamHubErrorValue}, + statistics::StatisticsStream, + stream::StreamIdentifier, + utils::{RandomDigitCount, Uuid}, +}; +use tokio::net::TcpStream; +use tokio::sync::Mutex; + +pub struct RtspServerSession { + io: Arc>>, + reader: BytesReader, + writer: AsyncBytesWriter, + + tracks: HashMap, + sdp: Sdp, + pub session_id: Option, + pub session_type: define::ServerSessionType, + + stream_handler: Arc, + event_producer: StreamHubEventSender, + + auth: Option, + + pub stream_identifier: Option, + pub is_normal_exit: bool, +} + +pub struct InterleavedBinaryData { + pub channel_identifier: u8, + pub length: u16, +} + +impl InterleavedBinaryData { + // 10.12 Embedded (Interleaved) Binary Data + // Stream data such as RTP packets is encapsulated by an ASCII dollar + // sign (24 hexadecimal), followed by a one-byte channel identifier, + // followed by the length of the encapsulated binary data as a binary, + // two-byte integer in network byte order + pub fn new(reader: &mut BytesReader) -> Result, SessionError> { + let is_dollar_sign = reader.advance_u8()? == 0x24; + log::debug!("dollar sign: {}", is_dollar_sign); + if is_dollar_sign { + reader.read_u8()?; + let channel_identifier = reader.read_u8()?; + log::debug!("channel_identifier: {}", channel_identifier); + let length = reader.read_u16::()?; + log::debug!("length: {}", length); + return Ok(Some(InterleavedBinaryData { + channel_identifier, + length, + })); + } + Ok(None) + } +} + +impl RtspServerSession { + pub fn new( + stream: TcpStream, + event_producer: StreamHubEventSender, + auth: Option, + ) -> Self { + // let remote_addr = if let Ok(addr) = stream.peer_addr() { + // log::info!("server session: {}", addr.to_string()); + // Some(addr) + // } else { + // None + // }; + + let net_io: Box = Box::new(TcpIO::new(stream)); + let io = Arc::new(Mutex::new(net_io)); + + Self { + io: io.clone(), + reader: BytesReader::new(BytesMut::default()), + writer: AsyncBytesWriter::new(io), + tracks: HashMap::new(), + sdp: Sdp::default(), + session_id: None, + session_type: define::ServerSessionType::Push, + event_producer, + stream_handler: Arc::new(RtspStreamHandler::new()), + auth, + stream_identifier: None, + is_normal_exit: false, + } + } + + pub async fn run(&mut self) -> Result<(), SessionError> { + loop { + while self.reader.len() < 4 { + let data = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data[..]); + } + // If delivering media data using RTP over RTSP(TCP), then it should use InterleavedBinaryData + // to distinguish RTP from RTSP messges; If delivering media data over UDP, it will establish + // separate udp channels for audio RTP data and video RTP data. + + // TODO: Here, some optimizations can be made since it's not necessary to use InterleavedBinaryData + // in all cases. + if let Ok(data) = InterleavedBinaryData::new(&mut self.reader) { + match data { + Some(a) => { + if self.reader.len() < a.length as usize { + let data = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data[..]); + } + self.on_rtp_over_rtsp_message(a.channel_identifier, a.length as usize) + .await?; + } + None => { + self.on_rtsp_message().await?; + } + } + } + } + } + + async fn on_rtp_over_rtsp_message( + &mut self, + channel_identifier: u8, + length: usize, + ) -> Result<(), SessionError> { + let mut cur_reader = BytesReader::new(self.reader.read_bytes(length)?); + + for track in self.tracks.values_mut() { + if let Some(interleaveds) = track.transport.interleaved { + let rtp_identifier = interleaveds[0]; + let rtcp_identifier = interleaveds[1]; + + if channel_identifier == rtp_identifier { + track.on_rtp(&mut cur_reader).await?; + } else if channel_identifier == rtcp_identifier { + track.on_rtcp(&mut cur_reader, self.io.clone()).await; + } + } + } + Ok(()) + } + + //publish stream: OPTIONS->ANNOUNCE->SETUP->RECORD->TEARDOWN + //subscribe stream: OPTIONS->DESCRIBE->SETUP->PLAY->TEARDOWN + async fn on_rtsp_message(&mut self) -> Result<(), SessionError> { + let rtsp_request: RtspRequest; + let mut retry_count = 0; + loop { + // TODO(all) : shoud check if have '\r\n\r\n' firstly. + let data = self.reader.get_remaining_bytes(); + if let Some(rtsp_request_data) = RtspRequest::unmarshal(std::str::from_utf8(&data)?) { + // TCP packet sticking issue, if have content_length in header. + // should check the body + if let Some(content_length) = + rtsp_request_data.get_header(&String::from("Content-Length")) + { + if let Ok(uint_num) = content_length.parse::() { + if rtsp_request_data.body.is_none() + || uint_num > rtsp_request_data.body.clone().unwrap().len() + { + if retry_count >= 5 { + log::error!( + "corrupted rtsp message={}", + std::str::from_utf8(&data)? + ); + return Ok(()); + } + retry_count += 1; + let data_recv = self.io.lock().await.read().await?; + self.reader.extend_from_slice(&data_recv[..]); + continue; + } + } + } + rtsp_request = rtsp_request_data; + self.reader.extract_remaining_bytes(); + } else { + log::error!("corrupted rtsp message={}", std::str::from_utf8(&data)?); + return Ok(()); + } + break; + } + + match rtsp_request.method.as_str() { + rtsp_method_name::OPTIONS => { + self.handle_options(&rtsp_request).await?; + } + rtsp_method_name::DESCRIBE => { + self.handle_describe(&rtsp_request).await?; + } + rtsp_method_name::ANNOUNCE => { + self.handle_announce(&rtsp_request).await?; + } + rtsp_method_name::SETUP => { + self.handle_setup(&rtsp_request).await?; + } + rtsp_method_name::PLAY => { + if let Err(err) = self.handle_play(&rtsp_request).await { + log::info!("handle_play error: {}", err); + } + } + rtsp_method_name::RECORD => { + self.handle_record(&rtsp_request).await?; + } + rtsp_method_name::TEARDOWN => { + self.handle_teardown(&rtsp_request)?; + } + rtsp_method_name::PAUSE => {} + rtsp_method_name::GET_PARAMETER => {} + rtsp_method_name::SET_PARAMETER => {} + rtsp_method_name::REDIRECT => {} + + _ => {} + } + Ok(()) + } + + async fn handle_options(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let status_code = http::StatusCode::OK; + let mut response = Self::gen_response(status_code, rtsp_request); + let public_str = rtsp_method_name::ARRAY.join(","); + response.headers.insert("Public".to_string(), public_str); + self.send_response(&response).await?; + + Ok(()) + } + + async fn handle_describe(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let status_code = http::StatusCode::OK; + + // The sender is used for sending sdp information from the server session to client session + // receiver is used to receive the sdp information + let (sender, mut receiver) = mpsc::unbounded_channel(); + + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }; + self.stream_identifier = Some(identifier.clone()); + + let request_event = StreamHubEvent::Request { identifier, sender }; + + if self.event_producer.send(request_event).is_err() { + return Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }); + } + + if let Some(Information::Sdp { data }) = receiver.recv().await { + if let Some(sdp) = Sdp::unmarshal(&data) { + self.sdp = sdp; + //it can new tracks when get the sdp information; + self.new_tracks()?; + } + } + + let mut response = Self::gen_response(status_code, rtsp_request); + let sdp = self.sdp.marshal(); + response.body = Some(sdp); + response + .headers + .insert("Content-Type".to_string(), "application/sdp".to_string()); + self.send_response(&response).await?; + + Ok(()) + } + + async fn handle_announce(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + if let Some(auth) = &self.auth { + let stream_name = rtsp_request.uri.path.clone(); + auth.authenticate( + &stream_name, + &rtsp_request + .uri + .query + .as_ref() + .map(|q| SecretCarrier::Query(q.to_string())), + false, + )?; + } + + if let Some(request_body) = &rtsp_request.body { + if let Some(sdp) = Sdp::unmarshal(request_body) { + self.sdp = sdp.clone(); + self.stream_handler.set_sdp(sdp).await; + } + } + + //new tracks for publish session + self.new_tracks()?; + + let (event_result_sender, event_result_receiver) = oneshot::channel(); + + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }; + self.stream_identifier = Some(identifier.clone()); + + let publish_event = StreamHubEvent::Publish { + identifier, + result_sender: event_result_sender, + info: self.get_publisher_info(), + stream_handler: self.stream_handler.clone(), + }; + + if self.event_producer.send(publish_event).is_err() { + return Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }); + } + + let sender = event_result_receiver.await??.0.unwrap(); + + for track in self.tracks.values_mut() { + let sender_out = sender.clone(); + let mut rtp_channel_guard = track.rtp_channel.lock().await; + + rtp_channel_guard.on_frame_handler(Box::new( + move |msg: FrameData| -> Result<(), UnPackerError> { + if let Err(err) = sender_out.send(msg) { + log::error!("send frame error: {}", err); + } + Ok(()) + }, + )); + + let rtcp_channel = Arc::clone(&track.rtcp_channel); + rtp_channel_guard.on_packet_for_rtcp_handler(Box::new(move |packet: RtpPacket| { + let rtcp_channel_in = Arc::clone(&rtcp_channel); + Box::pin(async move { + rtcp_channel_in.lock().await.on_packet(packet); + }) + })); + } + + let status_code = http::StatusCode::OK; + let response = Self::gen_response(status_code, rtsp_request); + self.send_response(&response).await?; + + Ok(()) + } + + async fn handle_setup(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let status_code = http::StatusCode::OK; + let mut response = Self::gen_response(status_code, rtsp_request); + + for track in self.tracks.values_mut() { + if !rtsp_request.uri.marshal().contains(&track.media_control) { + continue; + } + + if let Some(transport_data) = rtsp_request.get_header(&"Transport".to_string()) { + if self.session_id.is_none() { + self.session_id = Some(Uuid::new(RandomDigitCount::Zero)); + } + + let transport = RtspTransport::unmarshal(transport_data); + + if let Some(mut trans) = transport { + let mut rtp_server_port: Option = None; + let mut rtcp_server_port: Option = None; + + match trans.protocol_type { + ProtocolType::TCP => { + track.create_packer(self.io.clone()).await; + } + ProtocolType::UDP => { + let (rtp_port, rtcp_port) = + if let Some(client_ports) = trans.client_port { + (client_ports[0], client_ports[1]) + } else { + log::error!("should not be here!!"); + (0, 0) + }; + + let address = rtsp_request.uri.host.clone(); + if let Some(rtp_io) = UdpIO::new(address.clone(), rtp_port, 0).await { + rtp_server_port = rtp_io.get_local_port(); + + let box_udp_io: Box = Box::new(rtp_io); + //if mode is empty then it is a player session. + if trans.transport_mod.is_none() { + track.create_packer(Arc::new(Mutex::new(box_udp_io))).await; + } else { + track.rtp_receive_loop(box_udp_io).await; + } + } + + if let Some(rtcp_io) = + UdpIO::new(address.clone(), rtcp_port, rtp_server_port.unwrap() + 1) + .await + { + rtcp_server_port = rtcp_io.get_local_port(); + let box_rtcp_io: Arc>> = + Arc::new(Mutex::new(Box::new(rtcp_io))); + track.rtcp_receive_loop(box_rtcp_io).await; + } + } + } + + //tell client the udp ports of server side + let mut server_ports: [u16; 2] = [0, 0]; + if let Some(rtp_port) = rtp_server_port { + server_ports[0] = rtp_port; + } + if let Some(rtcp_server_port) = rtcp_server_port { + server_ports[1] = rtcp_server_port; + trans.server_port = Some(server_ports); + } + + let new_transport_data = trans.marshal(); + response + .headers + .insert("Transport".to_string(), new_transport_data); + response + .headers + .insert("Session".to_string(), self.session_id.unwrap().to_string()); + + track.set_transport(trans).await; + } + } + break; + } + + self.send_response(&response).await?; + + Ok(()) + } + + async fn handle_play(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + if let Some(auth) = &self.auth { + let stream_name = rtsp_request.uri.path.clone(); + auth.authenticate( + &stream_name, + &rtsp_request + .uri + .query + .as_ref() + .map(|q| SecretCarrier::Query(q.to_string())), + true, + )?; + } + + for track in self.tracks.values_mut() { + let protocol_type = track.transport.protocol_type.clone(); + + match protocol_type { + ProtocolType::TCP => { + let channel_identifer = if let Some(interleaveds) = track.transport.interleaved + { + interleaveds[0] + } else { + log::error!("handle_play:should not be here!!!"); + 0 + }; + + track.rtp_channel.lock().await.on_packet_handler(Box::new( + move |io: Arc>>, packet: RtpPacket| { + Box::pin(async move { + let msg = packet.marshal()?; + let mut bytes_writer = AsyncBytesWriter::new(io); + bytes_writer.write_u8(0x24)?; + bytes_writer.write_u8(channel_identifer)?; + bytes_writer.write_u16::(msg.len() as u16)?; + bytes_writer.write(&msg)?; + bytes_writer.flush().await?; + Ok(()) + }) + }, + )); + } + ProtocolType::UDP => { + track.rtp_channel.lock().await.on_packet_handler(Box::new( + move |io: Arc>>, packet: RtpPacket| { + Box::pin(async move { + let mut bytes_writer = AsyncBytesWriter::new(io); + + let msg = packet.marshal()?; + bytes_writer.write(&msg)?; + bytes_writer.flush().await?; + Ok(()) + }) + }, + )); + } + } + } + + let status_code = http::StatusCode::OK; + let response = Self::gen_response(status_code, rtsp_request); + + self.send_response(&response).await?; + + self.session_type = define::ServerSessionType::Pull; + + let (event_result_sender, event_result_receiver) = oneshot::channel(); + + let subscribe_event = StreamHubEvent::Subscribe { + identifier: StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }, + info: self.get_subscriber_info(), + result_sender: event_result_sender, + }; + + if self.event_producer.send(subscribe_event).is_err() { + return Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }); + } + + let mut receiver = event_result_receiver.await??.0.frame_receiver.unwrap(); + + let mut retry_times = 0; + loop { + if let Some(frame_data) = receiver.recv().await { + match frame_data { + FrameData::Audio { + timestamp, + mut data, + } => { + if let Some(audio_track) = self.tracks.get_mut(&TrackType::Audio) { + audio_track + .rtp_channel + .lock() + .await + .on_frame(&mut data, timestamp) + .await?; + } + } + FrameData::Video { + timestamp, + mut data, + } => { + if let Some(video_track) = self.tracks.get_mut(&TrackType::Video) { + video_track + .rtp_channel + .lock() + .await + .on_frame(&mut data, timestamp) + .await?; + } + } + _ => {} + } + } else { + retry_times += 1; + log::info!( + "send_channel_data: no data receives ,retry {} times!", + retry_times + ); + + if retry_times > 10 { + return Err(SessionError { + value: SessionErrorValue::CannotReceiveFrameData, + }); + } + } + } + } + + async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let status_code = http::StatusCode::OK; + let mut response = Self::gen_response(status_code, rtsp_request); + + //A stream published by gstreamer does not support the Range header + //https://github.com/harlanc/xiu/issues/135 + if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) { + if let Some(range) = RtspRange::unmarshal(range_str) { + response + .headers + .insert(String::from("Range"), range.marshal()); + } + } + + response + .headers + .insert("Session".to_string(), self.session_id.unwrap().to_string()); + + self.send_response(&response).await?; + + Ok(()) + } + + fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }; + log::info!("handle_teardown..."); + self.exit(identifier) + } + + pub fn exit(&mut self, identifier: StreamIdentifier) -> Result<(), SessionError> { + let event = match self.session_type { + define::ServerSessionType::Pull => StreamHubEvent::UnSubscribe { + identifier, + info: self.get_subscriber_info(), + }, + define::ServerSessionType::Push => StreamHubEvent::UnPublish { + identifier, + info: self.get_publisher_info(), + }, + }; + + let event_json_str = serde_json::to_string(&event).unwrap(); + + let rv = self.event_producer.send(event); + match rv { + Err(err) => { + log::error!("session exit: send event error: {err} for event: {event_json_str}"); + Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }) + } + Ok(()) => { + self.is_normal_exit = true; + log::info!("session exit: send event success: {event_json_str}"); + Ok(()) + } + } + } + + fn new_tracks(&mut self) -> Result<(), SessionError> { + for media in &self.sdp.medias { + let media_control = if let Some(media_control_val) = media.attributes.get("control") { + media_control_val.clone() + } else { + String::from("") + }; + + let media_name = &media.media_type; + log::info!("media_name: {}", media_name); + match media_name.as_str() { + "audio" => { + let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID + .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) + .unwrap() + .clone(); + let codec_info = RtspCodecInfo { + codec_id, + payload_type: media.rtpmap.payload_type as u8, + sample_rate: media.rtpmap.clock_rate, + channel_count: media.rtpmap.encoding_param.parse().unwrap(), + }; + + log::info!("audio codec info: {:?}", codec_info); + + let track = RtspTrack::new(TrackType::Audio, codec_info, media_control); + self.tracks.insert(TrackType::Audio, track); + } + "video" => { + let codec_id = rtsp_codec::RTSP_CODEC_NAME_2_ID + .get(&media.rtpmap.encoding_name.to_lowercase().as_str()) + .unwrap() + .clone(); + let codec_info = RtspCodecInfo { + codec_id, + payload_type: media.rtpmap.payload_type as u8, + sample_rate: media.rtpmap.clock_rate, + ..Default::default() + }; + let track = RtspTrack::new(TrackType::Video, codec_info, media_control); + self.tracks.insert(TrackType::Video, track); + } + _ => {} + } + } + Ok(()) + } + + fn gen_response(status_code: StatusCode, rtsp_request: &RtspRequest) -> RtspResponse { + let reason_phrase = if let Some(reason) = status_code.canonical_reason() { + reason.to_string() + } else { + "".to_string() + }; + + let mut response = RtspResponse { + version: "RTSP/1.0".to_string(), + status_code: status_code.as_u16(), + reason_phrase, + ..Default::default() + }; + + if let Some(cseq) = rtsp_request.headers.get("CSeq") { + response + .headers + .insert("CSeq".to_string(), cseq.to_string()); + } + + response + } + + fn get_subscriber_info(&mut self) -> SubscriberInfo { + let id = if let Some(session_id) = &self.session_id { + *session_id + } else { + Uuid::new(RandomDigitCount::Zero) + }; + + SubscriberInfo { + id, + sub_type: SubscribeType::RtspPull, + sub_data_type: streamhub::define::SubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + } + } + + fn get_publisher_info(&mut self) -> PublisherInfo { + let id = if let Some(session_id) = &self.session_id { + *session_id + } else { + Uuid::new(RandomDigitCount::Zero) + }; + + PublisherInfo { + id, + pub_type: PublishType::RtspPush, + pub_data_type: streamhub::define::PubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + } + } + + async fn send_response(&mut self, response: &RtspResponse) -> Result<(), SessionError> { + self.writer.write(response.marshal().as_bytes())?; + self.writer.flush().await?; + + Ok(()) + } +} + +#[derive(Default)] +pub struct RtspStreamHandler { + sdp: Mutex, +} + +impl RtspStreamHandler { + pub fn new() -> Self { + Self { + sdp: Mutex::new(Sdp::default()), + } + } + pub async fn set_sdp(&self, sdp: Sdp) { + *self.sdp.lock().await = sdp; + } +} + +#[async_trait] +impl TStreamHandler for RtspStreamHandler { + async fn send_prior_data( + &self, + data_sender: DataSender, + sub_type: SubscribeType, + ) -> Result<(), StreamHubError> { + let sender = match data_sender { + DataSender::Frame { sender } => sender, + DataSender::Packet { sender: _ } => { + return Err(StreamHubError { + value: StreamHubErrorValue::NotCorrectDataSenderType, + }); + } + }; + match sub_type { + SubscribeType::RtspRemux2Rtmp => { + let sdp_info = self.sdp.lock().await; + let mut video_clock_rate: u32 = 0; + let mut audio_clock_rate: u32 = 0; + + let mut vcodec: VideoCodecType = VideoCodecType::H264; + + for media in &sdp_info.medias { + let mut bytes_writer = BytesWriter::new(); + if let Some(fmtp) = &media.fmtp { + match fmtp { + Fmtp::H264(data) => { + bytes_writer.write(&ANNEXB_NALU_START_CODE)?; + bytes_writer.write(&data.sps)?; + bytes_writer.write(&ANNEXB_NALU_START_CODE)?; + bytes_writer.write(&data.pps)?; + + let frame_data = FrameData::Video { + timestamp: 0, + data: bytes_writer.extract_current_bytes(), + }; + if let Err(err) = sender.send(frame_data) { + log::error!("send sps/pps error: {}", err); + } + video_clock_rate = media.rtpmap.clock_rate; + } + Fmtp::H265(data) => { + bytes_writer.write(&ANNEXB_NALU_START_CODE)?; + bytes_writer.write(&data.sps)?; + bytes_writer.write(&ANNEXB_NALU_START_CODE)?; + bytes_writer.write(&data.pps)?; + bytes_writer.write(&ANNEXB_NALU_START_CODE)?; + bytes_writer.write(&data.vps)?; + + let frame_data = FrameData::Video { + timestamp: 0, + data: bytes_writer.extract_current_bytes(), + }; + if let Err(err) = sender.send(frame_data) { + log::error!("send sps/pps/vps error: {}", err); + } + + vcodec = VideoCodecType::H265; + } + Fmtp::Mpeg4(data) => { + let frame_data = FrameData::Audio { + timestamp: 0, + data: data.asc.clone(), + }; + + if let Err(err) = sender.send(frame_data) { + log::error!("send asc error: {}", err); + } + + audio_clock_rate = media.rtpmap.clock_rate; + } + } + } + } + + if let Err(err) = sender.send(FrameData::MediaInfo { + media_info: MediaInfo { + audio_clock_rate, + video_clock_rate, + + vcodec, + }, + }) { + log::error!("send media info error: {}", err); + } + } + SubscribeType::RtmpRemux2Hls => {} + _ => {} + } + + Ok(()) + } + async fn get_statistic_data(&self) -> Option { + None + } + + async fn send_information(&self, sender: InformationSender) { + if let Err(err) = sender.send(Information::Sdp { + data: self.sdp.lock().await.marshal(), + }) { + log::error!("send_information of rtsp error: {}", err); + } + } +} diff --git a/protocol/webrtc/CHANGELOG.md b/protocol/webrtc/CHANGELOG.md index ee422095..f47cecf7 100644 --- a/protocol/webrtc/CHANGELOG.md +++ b/protocol/webrtc/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.3.5] - 2021-08-11 +- Implement Authorization header support for WebRTC. + ## [0.3.4] - 2021-05-18 - Reference bytesio v0.3.3. diff --git a/protocol/webrtc/src/session/mod.rs b/protocol/webrtc/src/session/mod.rs index 79364488..d70cd567 100644 --- a/protocol/webrtc/src/session/mod.rs +++ b/protocol/webrtc/src/session/mod.rs @@ -488,7 +488,7 @@ impl WebRTCServerSession { SubscriberInfo { id, - sub_type: SubscribeType::PlayerWebrtc, + sub_type: SubscribeType::WhepPull, sub_data_type: streamhub::define::SubDataType::Packet, notify_info: NotifyInfo { request_url: String::from(""), @@ -506,7 +506,7 @@ impl WebRTCServerSession { PublisherInfo { id, - pub_type: PublishType::PushWebRTC, + pub_type: PublishType::WhipPush, pub_data_type: streamhub::define::PubDataType::Both, notify_info: NotifyInfo { request_url: String::from(""),