diff --git a/Cargo.lock b/Cargo.lock index 90d26efc..79a7982c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,6 +1127,35 @@ dependencies = [ "slab", ] +[[package]] +name = "gb28181" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "axum", + "base64 0.21.2", + "byteorder", + "bytes", + "bytesio 0.3.0", + "chrono", + "failure", + "hex", + "http", + "indexmap", + "lazy_static", + "log", + "rand 0.8.5", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "streamhub 0.1.1", + "tokio", + "xmpegts", + "xrtsp", +] + [[package]] name = "generic-array" version = "0.14.6" @@ -1982,8 +2011,8 @@ dependencies = [ "clap", "env_logger", "log", - "rtmp 0.4.0", - "streamhub 0.1.0", + "rtmp 0.4.2", + "streamhub 0.1.2", "tokio", ] @@ -2224,17 +2253,16 @@ dependencies = [ [[package]] name = "rtmp" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93e619cfc092d254ad1b5f08f99b88af0721a10ab257822109adc4f7a16e0db1" +version = "0.4.1" dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.0", "chrono", "failure", - "h264-decoder 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "h264-decoder 0.2.0", + "hex", "hmac 0.11.0", "indexmap", "log", @@ -2243,23 +2271,24 @@ dependencies = [ "serde", "serde_json", "sha2 0.9.9", - "streamhub 0.1.0", + "streamhub 0.1.1", "tokio", - "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "xflv 0.3.0", ] [[package]] name = "rtmp" -version = "0.4.1" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a416cf84b0982b7356a4af93f02bf1ea46b4e0b57cd8601a9da73a0bdfeb0dc7" dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.0", + "bytesio 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono", "failure", - "h264-decoder 0.2.0", - "hex", + "h264-decoder 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "hmac 0.11.0", "indexmap", "log", @@ -2268,9 +2297,9 @@ dependencies = [ "serde", "serde_json", "sha2 0.9.9", - "streamhub 0.1.1", + "streamhub 0.1.2", "tokio", - "xflv 0.3.0", + "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2593,14 +2622,12 @@ dependencies = [ [[package]] name = "streamhub" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2ed21d266e3a637f461a706bb1bfa6eb01b7acf57c1b42f8c0aa5bb05f00d71" +version = "0.1.1" dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.0", "chrono", "failure", "indexmap", @@ -2610,17 +2637,19 @@ dependencies = [ "serde", "serde_json", "tokio", - "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "xflv 0.3.0", ] [[package]] name = "streamhub" -version = "0.1.1" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "011995bde7e1273e01650a117f13e3d8ae0e7532a19b1ccf54d56919601a4251" dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.0", + "bytesio 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono", "failure", "indexmap", @@ -2630,7 +2659,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "xflv 0.3.0", + "xflv 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3695,6 +3724,7 @@ dependencies = [ "clap", "env_logger_extend", "failure", + "gb28181", "hls", "httpflv", "libc", @@ -3719,6 +3749,7 @@ dependencies = [ "bytes", "bytesio 0.3.0", "failure", + "log", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 622e62f8..a79f7574 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "protocol/hls", "protocol/rtsp", "protocol/webrtc", + "protocol/gb28181", "library/bytesio", "application/xiu", "application/http-server", diff --git a/application/pprtmp/Cargo.toml b/application/pprtmp/Cargo.toml index 64009460..7af702b7 100644 --- a/application/pprtmp/Cargo.toml +++ b/application/pprtmp/Cargo.toml @@ -11,7 +11,7 @@ log = "0.4.0" env_logger = "0.10.0" clap = "4.1.4" -rtmp = "0.4.0" +rtmp = "0.4.1" streamhub = "0.1.0" [dependencies.tokio] diff --git a/application/xiu/Cargo.toml b/application/xiu/Cargo.toml index 52f4e828..0b9c4652 100644 --- a/application/xiu/Cargo.toml +++ b/application/xiu/Cargo.toml @@ -31,6 +31,7 @@ streamhub = { path = "../../library/streamhub/" } rtmp = { path = "../../protocol/rtmp/" } xrtsp = { path = "../../protocol/rtsp/" } xwebrtc = { path = "../../protocol/webrtc/" } +gb28181 = { path = "../../protocol/gb28181/" } httpflv = { path = "../../protocol/httpflv/" } hls = { path = "../../protocol/hls/" } diff --git a/application/xiu/src/config/mod.rs b/application/xiu/src/config/mod.rs index a2757f54..7e91aae8 100644 --- a/application/xiu/src/config/mod.rs +++ b/application/xiu/src/config/mod.rs @@ -10,6 +10,7 @@ pub struct Config { pub rtmp: Option, pub rtsp: Option, pub webrtc: Option, + pub gb28181: Option, pub httpflv: Option, pub hls: Option, pub httpapi: Option, @@ -22,6 +23,7 @@ impl Config { rtmp_port: usize, rtsp_port: usize, webrtc_port: usize, + gb28181_port: usize, httpflv_port: usize, hls_port: usize, log_level: String, @@ -53,6 +55,14 @@ impl Config { }); } + let mut gb28181_config: Option = None; + if gb28181_port > 0 { + gb28181_config = Some(GB28181Config { + enabled: true, + api_port: gb28181_port, + }); + } + let mut httpflv_config: Option = None; if httpflv_port > 0 { httpflv_config = Some(HttpFlvConfig { @@ -79,6 +89,7 @@ impl Config { rtmp: rtmp_config, rtsp: rtsp_config, webrtc: webrtc_config, + gb28181: gb28181_config, httpflv: httpflv_config, hls: hls_config, httpapi: None, @@ -121,6 +132,12 @@ pub struct WebRTCConfig { pub port: usize, } +#[derive(Debug, Deserialize, Clone)] +pub struct GB28181Config { + pub enabled: bool, + pub api_port: usize, +} + #[derive(Debug, Deserialize, Clone)] pub struct HttpFlvConfig { pub enabled: bool, @@ -184,9 +201,7 @@ fn test_toml_parse() { Err(err) => println!("{}", err), } - let str = fs::read_to_string( - "./src/config/config.toml", - ); + let str = fs::read_to_string("./src/config/config.toml"); match str { Ok(val) => { diff --git a/application/xiu/src/main.rs b/application/xiu/src/main.rs index a1a7b645..08b22aa9 100644 --- a/application/xiu/src/main.rs +++ b/application/xiu/src/main.rs @@ -54,6 +54,15 @@ async fn main() -> Result<()> { .value_parser(value_parser!(usize)) .conflicts_with("config_file_path"), ) + .arg( + Arg::new("gb28181") + .long("gb28181") + .short('g') + .value_name("port") + .help("Specify the gb28181 listening port.(e.g.:3000)") + .value_parser(value_parser!(usize)) + .conflicts_with("config_file_path"), + ) .arg( Arg::new("httpflv") .long("httpflv") @@ -119,8 +128,13 @@ async fn main() -> Result<()> { let rtmp_port_o = matches.get_one::("rtmp"); let rtsp_port_o = matches.get_one::("rtsp"); let webrtc_port_o = matches.get_one::("webrtc"); + let gb28181_port_o = matches.get_one::("gb28181"); - if rtmp_port_o.is_none() && rtsp_port_o.is_none() && webrtc_port_o.is_none() { + if rtmp_port_o.is_none() + && rtsp_port_o.is_none() + && webrtc_port_o.is_none() + && gb28181_port_o.is_none() + { println!("If you do not specify the config Options, you must enable at least one protocol from RTSP and RTMP."); return Ok(()); } @@ -140,6 +154,11 @@ async fn main() -> Result<()> { None => 0, }; + let gb28181_port = match gb28181_port_o { + Some(val) => *val, + None => 0, + }; + let httpflv_port = match matches.get_one::("httpflv") { Some(val) => *val, None => 0, @@ -157,6 +176,7 @@ async fn main() -> Result<()> { rtmp_port, rtsp_port, webrtc_port, + gb28181_port, httpflv_port, hls_port, log_level, diff --git a/application/xiu/src/service.rs b/application/xiu/src/service.rs index 298d7238..4e5923d9 100644 --- a/application/xiu/src/service.rs +++ b/application/xiu/src/service.rs @@ -5,6 +5,7 @@ use { super::config::Config, //https://rustcc.cn/article?id=6dcbf032-0483-4980-8bfe-c64a7dfb33c7 anyhow::Result, + gb28181::api_service, hls::remuxer::HlsRemuxer, hls::server as hls_server, httpflv::server as httpflv_server, @@ -50,6 +51,7 @@ impl Service { self.start_rtmp(&mut stream_hub).await?; self.start_rtsp(&mut stream_hub).await?; self.start_webrtc(&mut stream_hub).await?; + self.start_gb28181(&mut stream_hub).await?; self.start_http_api_server(&mut stream_hub).await?; self.start_rtmp_remuxer(&mut stream_hub).await?; @@ -158,14 +160,21 @@ impl Service { } async fn start_rtmp_remuxer(&mut self, stream_hub: &mut StreamsHub) -> Result<()> { - //The remuxer now is used for rtsp2rtmp, so both rtsp/rtmp cfg need to be enabled. + //The remuxer now is used for rtsp2rtmp/gb281812rtmp, so both rtsp(or gb28181)/rtmp cfg need to be enabled. let mut rtsp_enabled = false; if let Some(rtsp_cfg_value) = &self.cfg.rtsp { if rtsp_cfg_value.enabled { rtsp_enabled = true; } } - if !rtsp_enabled { + + let mut gb28181_enabled = false; + if let Some(gb28181_cfg_value) = &self.cfg.gb28181 { + if gb28181_cfg_value.enabled { + gb28181_enabled = true; + } + } + if !rtsp_enabled && !gb28181_enabled { return Ok(()); } @@ -240,6 +249,24 @@ impl Service { Ok(()) } + async fn start_gb28181(&mut self, stream_hub: &mut StreamsHub) -> Result<()> { + let gb28181_cfg = &self.cfg.gb28181; + + if let Some(gb28181_cfg_value) = gb28181_cfg { + if !gb28181_cfg_value.enabled { + return Ok(()); + } + + let producer = stream_hub.get_hub_event_sender(); + let listen_port = gb28181_cfg_value.api_port; + tokio::spawn(async move { + api_service::run(producer, listen_port).await; + }); + } + + Ok(()) + } + async fn start_httpflv(&mut self, stream_hub: &mut StreamsHub) -> Result<()> { let httpflv_cfg = &self.cfg.httpflv; diff --git a/library/bytesio/src/bits_reader.rs b/library/bytesio/src/bits_reader.rs index dbcfb5fa..8912e3d9 100644 --- a/library/bytesio/src/bits_reader.rs +++ b/library/bytesio/src/bits_reader.rs @@ -51,6 +51,14 @@ impl BitsReader { Ok((self.cur_byte >> self.cur_bit_left) & 0x01) } + pub fn advance_bit(&mut self) -> Result { + if self.cur_bit_left == 0 { + self.cur_byte = self.reader.read_u8()?; + self.cur_bit_left = 8; + } + Ok((self.cur_byte >> self.cur_bit_left) & 0x01) + } + pub fn read_n_bits(&mut self, n: usize) -> Result { let mut result: u64 = 0; for _ in 0..n { diff --git a/library/bytesio/src/bytes_reader.rs b/library/bytesio/src/bytes_reader.rs index 19c2a16e..7739f889 100644 --- a/library/bytesio/src/bytes_reader.rs +++ b/library/bytesio/src/bytes_reader.rs @@ -11,10 +11,14 @@ use { pub struct BytesReader { buffer: BytesMut, + backup: BytesMut, } impl BytesReader { pub fn new(input: BytesMut) -> Self { - Self { buffer: input } + Self { + buffer: input, + backup: BytesMut::new(), + } } pub fn extend_from_slice(&mut self, extend: &[u8]) { @@ -144,9 +148,18 @@ impl BytesReader { pub fn extract_remaining_bytes(&mut self) -> BytesMut { self.buffer.split_to(self.buffer.len()) } + pub fn get_remaining_bytes(&self) -> BytesMut { self.buffer.clone() } + + pub fn backup(&mut self) { + self.backup = self.buffer.clone(); + } + + pub fn restore(&mut self) { + self.buffer = self.backup.clone(); + } } pub struct AsyncBytesReader { pub bytes_reader: BytesReader, diff --git a/library/bytesio/src/bytesio.rs b/library/bytesio/src/bytesio.rs index 99862519..0a27ba1d 100644 --- a/library/bytesio/src/bytesio.rs +++ b/library/bytesio/src/bytesio.rs @@ -32,19 +32,27 @@ pub struct UdpIO { } impl UdpIO { - pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option { - let remote_address = format!("{remote_domain}:{remote_port}"); - log::info!("remote address: {}", remote_address); + pub async fn new(local_port: u16, remote_address: Option) -> Option { let local_address = format!("0.0.0.0:{local_port}"); - if let Ok(local_socket) = UdpSocket::bind(local_address).await { - if let Ok(remote_socket_addr) = remote_address.parse::() { - if let Err(err) = local_socket.connect(remote_socket_addr).await { - log::info!("connect to remote udp socket error: {}", err); + match UdpSocket::bind(local_address).await { + Ok(local_socket) => { + if let Some(remote_address_value) = remote_address { + log::info!("remote address: {}", remote_address_value); + + if let Ok(remote_socket_addr) = remote_address_value.parse::() { + if let Err(err) = local_socket.connect(remote_socket_addr).await { + log::error!("connect to remote udp socket error: {}", err); + } + } } + + return Some(Self { + socket: local_socket, + }); + } + Err(err) => { + log::error!("bind udp socket error: {}", err); } - return Some(Self { - socket: local_socket, - }); } None diff --git a/library/container/mpegts/Cargo.toml b/library/container/mpegts/Cargo.toml index da5ede52..bda1ae4b 100644 --- a/library/container/mpegts/Cargo.toml +++ b/library/container/mpegts/Cargo.toml @@ -14,4 +14,5 @@ edition = "2018" byteorder = "1.4.2" bytes = "1.0.0" failure = "0.1.1" +log = "0.4" bytesio = { path = "../../bytesio/" } diff --git a/library/container/mpegts/src/define.rs b/library/container/mpegts/src/define.rs index 9c1bc548..2d76d831 100644 --- a/library/container/mpegts/src/define.rs +++ b/library/container/mpegts/src/define.rs @@ -12,16 +12,43 @@ pub mod epsi_stream_type { pub const PSI_STREAM_MP3: u8 = 0x04; // ISO/IEC 13818-3 Audio pub const PSI_STREAM_PRIVATE_DATA: u8 = 0x06; pub const PSI_STREAM_H264: u8 = 0x1b; // H.264 + pub const PSI_STREAM_H265: u8 = 0x24; pub const PSI_STREAM_AAC: u8 = 0x0f; pub const PSI_STREAM_MPEG4_AAC: u8 = 0x1c; + pub const PSI_STREAM_AUDIO_G711A: u8 = 0x90; // GBT 25724-2010 SVAC(2014) + pub const PSI_STREAM_AUDIO_G711U: u8 = 0x91; pub const PSI_STREAM_AUDIO_OPUS: u8 = 0x9c; } pub mod epes_stream_id { + pub const PES_SID_EXTENSION: u8 = 0xB7; // PS system_header extension(p81) + pub const PES_SID_END: u8 = 0xB9; // MPEG_program_end_code + pub const PES_SID_START: u8 = 0xBA; // Pack start code + pub const PES_SID_SYS: u8 = 0xBB; // System header start code + + pub const PES_SID_PSM: u8 = 0xBC; // program_stream_map + pub const PES_SID_PRIVATE_1: u8 = 0xBD; // private_stream_1 + pub const PES_SID_PADDING: u8 = 0xBE; // padding_stream + pub const PES_SID_PRIVATE_2: u8 = 0xBF; // private_stream_2 pub const PES_SID_AUDIO: u8 = 0xC0; // ISO/IEC 13818-3/11172-3/13818-7/14496-3 audio stream '110x xxxx' pub const PES_SID_VIDEO: u8 = 0xE0; // H.262 | H.264 | H.265 | ISO/IEC 13818-2/11172-2/14496-2/14496-10 video stream '1110 xxxx' - pub const PES_SID_PRIVATE_1: u8 = 0xBD; // private_stream_1 + pub const PES_SID_ECM: u8 = 0xF0; // ECM_stream + pub const PES_SID_EMM: u8 = 0xF1; // EMM_stream + pub const PES_SID_DSMCC: u8 = 0xF2; // H.222.0 | ISO/IEC 13818-1/13818-6_DSMCC_stream + pub const PES_SID_13522: u8 = 0xF3; // ISO/IEC_13522_stream + pub const PES_SID_H222_A: u8 = 0xF4; // Rec. ITU-T H.222.1 type A + pub const PES_SID_H222_B: u8 = 0xF5; // Rec. ITU-T H.222.1 type B + pub const PES_SID_H222_C: u8 = 0xF6; // Rec. ITU-T H.222.1 type C + pub const PES_SID_H222_D: u8 = 0xF7; // Rec. ITU-T H.222.1 type D + pub const PES_SID_H222_E: u8 = 0xF8; // Rec. ITU-T H.222.1 type E + pub const PES_SID_ANCILLARY: u8 = 0xF9; // ancillary_stream + pub const PES_SID_MPEG4_SL: u8 = 0xFA; // ISO/IEC 14496-1_SL_packetized_stream + pub const PES_SID_MPEG4_FLEX: u8 = 0xFB; // ISO/IEC 14496-1_FlexMux_stream + pub const PES_SID_META: u8 = 0xFC; // metadata stream + pub const PES_SID_EXTEND: u8 = 0xFD; // extended_stream_id + pub const PES_SID_RESERVED: u8 = 0xFE; // reserved data stream + pub const PES_SID_PSD: u8 = 0xFF; // program_stream_directory } pub const AF_FLAG_PCR: u8 = 0x10; @@ -38,4 +65,4 @@ pub const TS_PACKET_SIZE: usize = 188; pub const MPEG_FLAG_IDR_FRAME: u16 = 0x0001; pub const MPEG_FLAG_H264_H265_WITH_AUD: u16 = 0x8000; -pub const PAT_PERIOD: i64 = 400 * 90; +pub const PAT_PERIOD: u64 = 400 * 90; diff --git a/library/container/mpegts/src/errors.rs b/library/container/mpegts/src/errors.rs index 6602b8a4..f93b514b 100644 --- a/library/container/mpegts/src/errors.rs +++ b/library/container/mpegts/src/errors.rs @@ -1,3 +1,5 @@ +use crate::ps::errors::MpegPsError; + use { bytesio::bytes_errors::{BytesReadError, BytesWriteError}, failure::{Backtrace, Fail}, @@ -6,8 +8,8 @@ use { }; #[derive(Debug, Fail)] -pub enum MpegTsErrorValue { - #[fail(display = "bytes read error")] +pub enum MpegErrorValue { + #[fail(display = "bytes read error\n")] BytesReadError(BytesReadError), #[fail(display = "bytes write error")] @@ -27,43 +29,54 @@ pub enum MpegTsErrorValue { #[fail(display = "stream not found")] StreamNotFound, + + #[fail(display = "mpeg ps error\n")] + MpegPsError(MpegPsError), } #[derive(Debug)] -pub struct MpegTsError { - pub value: MpegTsErrorValue, +pub struct MpegError { + pub value: MpegErrorValue, } -impl From for MpegTsError { +impl From for MpegError { fn from(error: BytesReadError) -> Self { - MpegTsError { - value: MpegTsErrorValue::BytesReadError(error), + MpegError { + value: MpegErrorValue::BytesReadError(error), } } } -impl From for MpegTsError { +impl From for MpegError { fn from(error: BytesWriteError) -> Self { - MpegTsError { - value: MpegTsErrorValue::BytesWriteError(error), + MpegError { + value: MpegErrorValue::BytesWriteError(error), } } } -impl From for MpegTsError { +impl From for MpegError { fn from(error: Error) -> Self { - MpegTsError { - value: MpegTsErrorValue::IOError(error), + MpegError { + value: MpegErrorValue::IOError(error), + } + } +} + +impl From for MpegError { + fn from(error: MpegPsError) -> Self { + MpegError { + value: MpegErrorValue::MpegPsError(error), } } } -impl fmt::Display for MpegTsError { +impl fmt::Display for MpegError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.value, f) } } -impl Fail for MpegTsError { +impl Fail for MpegError { fn cause(&self) -> Option<&dyn Fail> { self.value.cause() } diff --git a/library/container/mpegts/src/lib.rs b/library/container/mpegts/src/lib.rs index 4a76f6f2..35ea3914 100644 --- a/library/container/mpegts/src/lib.rs +++ b/library/container/mpegts/src/lib.rs @@ -1,8 +1,9 @@ pub mod crc32; pub mod define; pub mod errors; -pub mod ts; pub mod pat; pub mod pes; pub mod pmt; +pub mod ps; +pub mod ts; pub mod utils; diff --git a/library/container/mpegts/src/pat.rs b/library/container/mpegts/src/pat.rs index 4a21ecf4..765c7882 100644 --- a/library/container/mpegts/src/pat.rs +++ b/library/container/mpegts/src/pat.rs @@ -1,5 +1,5 @@ use { - super::{crc32, define::epat_pid, errors::MpegTsError, pmt}, + super::{crc32, define::epat_pid, errors::MpegError, pmt}, byteorder::{BigEndian, LittleEndian}, bytes::BytesMut, bytesio::bytes_writer::BytesWriter, @@ -49,7 +49,7 @@ impl PatMuxer { } } - pub fn write(&mut self, pat: Pat) -> Result { + pub fn write(&mut self, pat: Pat) -> Result { /*table id*/ self.bytes_writer.write_u8(epat_pid::PAT_TID_PAS as u8)?; diff --git a/library/container/mpegts/src/pes.rs b/library/container/mpegts/src/pes.rs index b76cecb4..52908493 100644 --- a/library/container/mpegts/src/pes.rs +++ b/library/container/mpegts/src/pes.rs @@ -1,54 +1,368 @@ +use crate::ps::errors::{MpegPsError, MpegPsErrorValue}; +use byteorder::BigEndian; use { - super::{define, errors::MpegTsError}, - bytes::BytesMut, + super::define, super::errors::MpegError, bytes::BytesMut, bytesio::bytes_reader::BytesReader, bytesio::bytes_writer::BytesWriter, }; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] #[allow(dead_code)] pub struct Pes { - pub program_number: u16, + pub stream_id: u8, //8 + pub pes_packet_length: u16, //16 + pub pes_scrambling_control: u8, //2 + pub pes_priority: u8, //2 + pub data_alignment_indicator: u8, //1 + pub copyright: u8, //1 + pub original_or_copy: u8, //1 + pub pts_dts_flags: u8, //2 + pub escr_flag: u8, //1 + pub es_rate_flag: u8, //1 + pub dsm_trick_mode_flag: u8, //1 + pub additional_copy_info_flag: u8, //1 + + pub pes_crc_flag: u8, //1 + pub pes_extension_flag: u8, //1 + pub pes_header_data_length: u8, //8 + + pub pts: u64, + pub dts: u64, + escr_base: u64, + escr_extension: u32, + es_rate: u32, + + pub trick_mode_control: u8, + pub trick_value: u8, + pub additional_copy_info: u8, + pub previous_pes_packet_crc: u16, + pub payload: BytesMut, + pub pid: u16, - pub stream_id: u8, pub codec_id: u8, pub continuity_counter: u8, - pub esinfo: BytesMut, - pub esinfo_length: usize, +} - pub data_alignment_indicator: u8, //1 +impl Pes { + // T-REC-H.222.0-201703-S!!PDF-E.pdf Table 2-21 P37 + // PES_packet() { + // packet_start_code_prefix 24 bslbf + // stream_id 8 uimsbf + // PES_packet_length 16 uimsbf + + // if (stream_id != program_stream_map + // && stream_id != padding_stream + // && stream_id != private_stream_2 + // && stream_id != ECM + // && stream_id != EMM + // && stream_id != program_stream_directory + // && stream_id != DSMCC_stream + // && stream_id != ITU-T Rec. H.222.1 type E stream) { + // '10' 2 bslbf + // PES_scrambling_control 2 bslbf + // PES_priority 1 bslbf + // data_alignment_indicator 1 bslbf + // copyright 1 bslbf + // original_or_copy 1 bslbf + // PTS_DTS_flags 2 bslbf + // ESCR_flag 1 bslbf + // ES_rate_flag 1 bslbf + // DSM_trick_mode_flag 1 bslbf + // additional_copy_info_flag 1 bslbf + // PES_CRC_flag 1 bslbf + // PES_extension_flag 1 bslbf + // PES_header_data_length 8 uimsbf + + // if (PTS_DTS_flags == '10') { + // '0010' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // PTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + // } + + // if (PTS_DTS_flags == '11') { + // '0011' 4 bslbf + // PTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // PTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // PTS [14..0] 15 bslbf + // marker_bit 1 bslbf + // '0001' 4 bslbf + // DTS [32..30] 3 bslbf + // marker_bit 1 bslbf + // DTS [29..15] 15 bslbf + // marker_bit 1 bslbf + // DTS [14..0] 15 bslbf + // marker_bit 1 bslbf + // } + + // if (ESCR_flag == '1') { + // reserved 2 bslbf + // ESCR_base[32..30] 3 bslbf + // marker_bit 1 bslbf + // ESCR_base[29..15] 15 bslbf + // marker_bit 1 bslbf + // ESCR_base[14..0] 15 bslbf + // marker_bit 1 bslbf + // ESCR_extension 9 uimsbf + // marker_bit 1 bslbf + // } + + // if (ES_rate_flag == '1') { + // marker_bit 1 bslbf + // ES_rate 22 uimsbf + // marker_bit 1 bslbf + // } + + // if (DSM_trick_mode_flag == '1') { + // trick_mode_control 3 uimsbf + // if ( trick_mode_control == fast_forward ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + // } + // else if ( trick_mode_control == slow_motion ) { + // rep_cntrl 5 uimsbf + // } + // else if ( trick_mode_control == freeze_frame ) { + // field_id 2 uimsbf + // reserved 3 bslbf + // } + // else if ( trick_mode_control == fast_reverse ) { + // field_id 2 bslbf + // intra_slice_refresh 1 bslbf + // frequency_truncation 2 bslbf + // else if ( trick_mode_control == slow_reverse ) { + // rep_cntrl 5 uimsbf + // } + // else + // reserved 5 bslbf + // } + + // if ( additional_copy_info_flag == '1') { + // marker_bit 1 bslbf + // additional_copy_info 7 bslbf + // } + + // if ( PES_CRC_flag == '1') { + // previous_PES_packet_CRC 16 bslbf + // } + + // if ( PES_extension_flag == '1') { + // PES_private_data_flag 1 bslbf + // pack_header_field_flag 1 bslbf + // program_packet_sequence_counter_flag 1 bslbf + // P-STD_buffer_flag 1 bslbf + // reserved 3 bslbf + // PES_extension_flag_2 1 bslbf + // if ( PES_private_data_flag == '1') { + // PES_private_data 128 bslbf + // } + // if (pack_header_field_flag == '1') { + // pack_field_length 8 uimsbf + // pack_header() + // } + // if (program_packet_sequence_counter_flag == '1') { + // marker_bit 1 bslbf + // program_packet_sequence_counter 7 uimsbf + // marker_bit 1 bslbf + // MPEG1_MPEG2_identifier 1 bslbf + // original_stuff_length 6 uimsbf + // } + + // if ( P-STD_buffer_flag == '1') { + // '01' 2 bslbf + // P-STD_buffer_scale 1 bslbf + // P-STD_buffer_size 13 uimsbf + // } + + // if ( PES_extension_flag_2 == '1') { + // marker_bit 1 bslbf + // PES_extension_field_length 7 uimsbf + // stream_id_extension_flag 1 bslbf + // if ( stream_id_extension_flag == '0') { + // stream_id_extension 7 uimsbf + // } else { + // reserved 6 bslbf + // tref_extension_flag 1 bslbf + // if ( tref_extension_flag  '0' ) { + // reserved 4 bslbf + // TREF[32..30] 3 bslbf + // marker_bit 1 bslbf + // TREF[29..15] 15 bslbf + // marker_bit 1 bslbf + // TREF[14..0] 15 bslbf + // marker_bit 1 bslbf + // } + // } + + // for ( i  0; i  N3; i) { + // reserved 8 bslbf + // } + // } + // } + // for (i < 0; i < N1; i++) { + // stuffing_byte 8 bslbf + // } + // for (i < 0; i < N2; i++) { + // PES_packet_data_byte 8 bslbf + // } + // } + pub fn parse_mpeg2(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegError> { + bytes_reader.backup(); + // log::info!("parse 0 : length: {}", bytes_reader.len()); + bytes_reader.read_bytes(3)?; + self.stream_id = bytes_reader.read_u8()?; + self.pes_packet_length = bytes_reader.read_u16::()?; + + if self.pes_packet_length as usize > bytes_reader.len() { + bytes_reader.restore(); + let not_enouth_bytes_err = MpegPsError { + value: MpegPsErrorValue::NotEnoughBytes, + }; + return Err(MpegError { + value: crate::errors::MpegErrorValue::MpegPsError(not_enouth_bytes_err), + }); + } - pub pts: i64, - pub dts: i64, - escr_base: u64, - escr_extension: u32, - es_rate: u32, -} + let bytes_5 = bytes_reader.read_u8()?; + assert!(bytes_5 >> 6 == 0b10); + self.pes_scrambling_control = (bytes_5 >> 4) & 0x03; + self.pes_priority = (bytes_5 >> 3) & 0x01; + self.data_alignment_indicator = (bytes_5 >> 2) & 0x01; + self.copyright = (bytes_5 >> 1) & 0x01; + self.original_or_copy = bytes_5 & 0x01; + // log::info!("parse 1"); + let bytes_6 = bytes_reader.read_u8()?; + self.pts_dts_flags = (bytes_6 >> 6) & 0x03; + self.escr_flag = (bytes_6 >> 5) & 0x01; + self.es_rate_flag = (bytes_6 >> 4) & 0x01; + self.dsm_trick_mode_flag = (bytes_6 >> 3) & 0x01; + self.additional_copy_info_flag = (bytes_6 >> 2) & 0x01; + self.pes_crc_flag = (bytes_6 >> 1) & 0x01; + self.pes_extension_flag = bytes_6 & 0x01; + + self.pes_header_data_length = bytes_reader.read_u8()?; + // log::info!("parse 2: {}", self.pes_header_data_length); + let cur_bytes_len = bytes_reader.len(); + + if self.pts_dts_flags == 0x02 { + let next_byte = bytes_reader.read_u8()?; + assert!(next_byte >> 4 == 0b0010); + self.pts = (next_byte as u64 >> 1) & 0x07; + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + } else if self.pts_dts_flags == 0x03 { + let next_byte = bytes_reader.read_u8()?; + assert!(next_byte >> 4 == 0b0011); + self.pts = (next_byte as u64 >> 1) & 0x07; + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + + let next_byte_1 = bytes_reader.read_u8()?; + assert!(next_byte_1 >> 4 == 0b0011); + self.dts = (next_byte_1 as u64 >> 1) & 0x07; + self.dts = (self.dts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.dts = (self.dts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + } + // log::info!("parse 3"); + if self.escr_flag == 0x01 { + let next_byte = bytes_reader.read_u8()?; + self.escr_base = (next_byte as u64 >> 3) & 0x07; + self.escr_base = (self.escr_base << 2) | (next_byte as u64 & 0x03); + + let next_2_bytes = bytes_reader.read_u16::()? as u64; + self.escr_base = (self.escr_base << 13) | (next_2_bytes >> 3); + self.escr_base = (self.escr_base << 2) | (next_2_bytes & 0x03); + + let next_2_bytes_2 = bytes_reader.read_u16::()? as u64; + self.escr_base = (self.escr_base << 13) | (next_2_bytes_2 >> 3); + + self.escr_extension = next_2_bytes as u32 & 0x03; + self.escr_extension = + (self.escr_extension << 7) | (bytes_reader.read_u8()? as u32 >> 1); + } + // log::info!("parse 4"); + if self.es_rate_flag == 0x01 { + self.es_rate = (bytes_reader.read_u24::()? >> 1) & 0x3FFFFF; + } -impl Default for Pes { - fn default() -> Self { - Self::new() + if self.dsm_trick_mode_flag == 0x01 { + let next_byte = bytes_reader.read_u8()?; + self.trick_mode_control = next_byte >> 5; + } + // log::info!("parse 5"); + if self.additional_copy_info_flag == 0x01 { + self.additional_copy_info = bytes_reader.read_u8()? & 0x7F; + } + + if self.pes_crc_flag == 0x01 { + self.previous_pes_packet_crc = bytes_reader.read_u16::()?; + } + + if self.pes_extension_flag == 0x01 { + //todo + } + + let left_pes_header_len = + self.pes_header_data_length as usize - (cur_bytes_len - bytes_reader.len()); + //log::info!("parse 6: {}", left_pes_header_len); + if left_pes_header_len > 0 { + bytes_reader.read_bytes(left_pes_header_len)?; + } + + let payload_len = + self.pes_packet_length as usize - self.pes_header_data_length as usize - 3; + // log::info!("parse 7 : {}", payload_len); + self.payload = bytes_reader.read_bytes(payload_len)?; + + // log::info!("pes pts: {},dts: {}", self.pts / 90, self.dts / 90); + + Ok(()) } -} -impl Pes { - pub fn new() -> Self { - Self { - program_number: 0, - pid: 0, - stream_id: 0, - codec_id: 0, - continuity_counter: 0, - esinfo: BytesMut::new(), - esinfo_length: 0, - - data_alignment_indicator: 0, //1 - - pts: 0, - dts: 0, - escr_base: 0, - escr_extension: 0, - es_rate: 0, + pub fn parse_mpeg1(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegError> { + bytes_reader.read_bytes(3)?; + self.stream_id = bytes_reader.read_u8()?; + self.pes_packet_length = bytes_reader.read_u16::()?; + + let cur_bytes_len = bytes_reader.len(); + + while bytes_reader.advance_u8()? == 0xFF { + bytes_reader.read_u8()?; + } + + if (bytes_reader.advance_u8()? >> 6) == 0x01 { + bytes_reader.read_u16::()?; + } + + let next_byte = bytes_reader.read_u8()?; + let first_4_bits = next_byte >> 4; + + if first_4_bits == 0x02 { + self.pts = (next_byte as u64 >> 1) & 0x07; + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + } else if first_4_bits == 0x03 { + self.pts = (next_byte as u64 >> 1) & 0x07; + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.pts = (self.pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + + let next_byte_2 = bytes_reader.read_u8()?; + self.dts = (next_byte_2 as u64 >> 1) & 0x07; + self.dts = (self.dts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + self.dts = (self.dts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + } else { + assert_eq!(next_byte, 0x0F); } + + let payload_len = self.pes_packet_length as usize - (cur_bytes_len - bytes_reader.len()); + self.payload = bytes_reader.read_bytes(payload_len)?; + + Ok(()) } } @@ -83,7 +397,7 @@ impl PesMuxer { payload_data_length: usize, stream_data: &Pes, h264_h265_with_aud: bool, - ) -> Result<(), MpegTsError> { + ) -> Result<(), MpegError> { /*pes start code 3 bytes*/ self.bytes_writer.write_u8(0x00)?; //0 self.bytes_writer.write_u8(0x00)?; //1 @@ -105,12 +419,12 @@ impl PesMuxer { let mut flags: u8 = 0x00; let mut length: u8 = 0x00; - if define::PTS_NO_VALUE != stream_data.pts { + if define::PTS_NO_VALUE != stream_data.pts as i64 { flags |= 0x80; length += 5; } - if define::PTS_NO_VALUE != stream_data.dts && stream_data.dts != stream_data.pts { + if define::PTS_NO_VALUE != stream_data.dts as i64 && stream_data.dts != stream_data.pts { flags |= 0x40; length += 5; } diff --git a/library/container/mpegts/src/pmt.rs b/library/container/mpegts/src/pmt.rs index 17a2d614..c3bd727d 100644 --- a/library/container/mpegts/src/pmt.rs +++ b/library/container/mpegts/src/pmt.rs @@ -2,7 +2,7 @@ use { super::{ crc32, define::{epat_pid, epsi_stream_type}, - errors::MpegTsError, + errors::MpegError, pes, }, byteorder::{BigEndian, LittleEndian}, @@ -57,7 +57,7 @@ impl PmtMuxer { } } - pub fn write(&mut self, pmt: &Pmt) -> Result { + pub fn write(&mut self, pmt: &Pmt) -> Result { /*table id*/ self.bytes_writer.write_u8(epat_pid::PAT_TID_PMS as u8)?; @@ -108,7 +108,7 @@ impl PmtMuxer { Ok(self.bytes_writer.extract_current_bytes()) } - pub fn write_descriptor(&mut self) -> Result<(), MpegTsError> { + pub fn write_descriptor(&mut self) -> Result<(), MpegError> { Ok(()) } } diff --git a/library/container/mpegts/src/ps/errors.rs b/library/container/mpegts/src/ps/errors.rs new file mode 100644 index 00000000..500f7663 --- /dev/null +++ b/library/container/mpegts/src/ps/errors.rs @@ -0,0 +1,80 @@ +use { + bytesio::bits_errors::BitError, + bytesio::bytes_errors::{BytesReadError, BytesWriteError}, + failure::{Backtrace, Fail}, + std::fmt, + std::io::Error, +}; + +#[derive(Debug, Fail)] +pub enum MpegPsErrorValue { + #[fail(display = "bytes read error\n")] + BytesReadError(BytesReadError), + + #[fail(display = "bytes write error\n")] + BytesWriteError(BytesWriteError), + + #[fail(display = "bits error\n")] + BitError(BitError), + + #[fail(display = "io error\n")] + IOError(Error), + + #[fail(display = "start code not correct.\n")] + StartCodeNotCorrect, + + #[fail(display = "not enough bytes\n")] + NotEnoughBytes, +} +#[derive(Debug)] +pub struct MpegPsError { + pub value: MpegPsErrorValue, +} + +impl From for MpegPsError { + fn from(error: BytesReadError) -> Self { + MpegPsError { + value: MpegPsErrorValue::BytesReadError(error), + } + } +} + +impl From for MpegPsError { + fn from(error: BytesWriteError) -> Self { + MpegPsError { + value: MpegPsErrorValue::BytesWriteError(error), + } + } +} + +impl From for MpegPsError { + fn from(error: BitError) -> Self { + MpegPsError { + value: MpegPsErrorValue::BitError(error), + } + } +} + +impl From for MpegPsError { + fn from(error: Error) -> Self { + MpegPsError { + value: MpegPsErrorValue::IOError(error), + } + } +} + +impl fmt::Display for MpegPsError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.value, f) + } +} + +impl Fail for MpegPsError { + fn cause(&self) -> Option<&dyn Fail> { + self.value.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.value.backtrace() + } +} diff --git a/library/container/mpegts/src/ps/mod.rs b/library/container/mpegts/src/ps/mod.rs new file mode 100644 index 00000000..6186bd7c --- /dev/null +++ b/library/container/mpegts/src/ps/mod.rs @@ -0,0 +1,6 @@ +pub mod errors; +pub mod pack_header; +pub mod psd; +pub mod psm; +pub mod system_header; +pub mod ps_demuxer; diff --git a/library/container/mpegts/src/ps/pack_header.rs b/library/container/mpegts/src/ps/pack_header.rs new file mode 100644 index 00000000..da9a7ad6 --- /dev/null +++ b/library/container/mpegts/src/ps/pack_header.rs @@ -0,0 +1,76 @@ +use super::errors::MpegPsError; +use crate::define::epes_stream_id::PES_SID_START; +use byteorder::BigEndian; +use bytesio::bytes_reader::BytesReader; + +#[derive(Default)] +pub enum MpegType { + Mpeg1, + #[default] + Mpeg2, + Unknown, +} +#[derive(Default)] +pub struct PsPackHeader { + pub mpeg_type: MpegType, + system_clock_reference_base: u64, + system_clock_reference_extension: u16, + program_mux_rate: u32, + pack_stuffing_length: u8, +} + +impl PsPackHeader { + pub fn parse(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegPsError> { + let start = bytes_reader.read_bytes(4)?; + + if start.to_vec() != [0x00, 0x00, 0x01, PES_SID_START] { + return Err(MpegPsError { + value: super::errors::MpegPsErrorValue::StartCodeNotCorrect, + }); + } + let byte_5 = bytes_reader.read_u8()?; + + //mpeg1 + if (byte_5 >> 4) == 0b0010 { + self.mpeg_type = MpegType::Mpeg1; + + self.system_clock_reference_base = (byte_5 as u64 >> 1) & 0x07; + self.system_clock_reference_base = (self.system_clock_reference_base << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + self.system_clock_reference_base = (self.system_clock_reference_base << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + + self.system_clock_reference_extension = 1; + + let byte_10 = bytes_reader.read_u8()?; + self.program_mux_rate = (byte_10 as u32) >> 1; + self.program_mux_rate = + (self.program_mux_rate << 15) | (bytes_reader.read_u16::()? as u32 >> 1); + } + //mpeg2 + else if (byte_5 >> 6) == 0b01 { + self.mpeg_type = MpegType::Mpeg2; + + self.system_clock_reference_base = (byte_5 as u64 >> 3) & 0x07; + self.system_clock_reference_base = + (self.system_clock_reference_base << 2) | (byte_5 as u64 & 0x03); + let next_two_bytes = bytes_reader.read_u16::()?; + self.system_clock_reference_base = + (self.system_clock_reference_base << 13) | (next_two_bytes as u64 >> 3); + self.system_clock_reference_base = + (self.system_clock_reference_base << 2) | (next_two_bytes as u64 & 0x03); + let next_two_bytes_2 = bytes_reader.read_u16::()?; + self.system_clock_reference_base = + (self.system_clock_reference_base << 13) | (next_two_bytes_2 as u64 >> 3); + + self.system_clock_reference_extension = next_two_bytes_2 & 0x03; + self.system_clock_reference_extension = (self.system_clock_reference_extension << 7) + | (bytes_reader.read_u8()? as u16 >> 1); + + self.program_mux_rate = bytes_reader.read_u24::()? >> 2; //bits_reader.read_n_bits(22)? as u32; + self.pack_stuffing_length = bytes_reader.read_u8()? & 0x07; + } + + Ok(()) + } +} diff --git a/library/container/mpegts/src/ps/ps_demuxer.rs b/library/container/mpegts/src/ps/ps_demuxer.rs new file mode 100644 index 00000000..0268a367 --- /dev/null +++ b/library/container/mpegts/src/ps/ps_demuxer.rs @@ -0,0 +1,216 @@ +use super::{ + errors::MpegPsError, + pack_header::{MpegType, PsPackHeader}, + psd::ProgramStreamDirectory, + psm::ProgramStreamMap, + system_header::PsSystemHeader, +}; +use byteorder::BigEndian; +use std::collections::HashMap; +use {bytes::BytesMut, bytesio::bytes_reader::BytesReader}; + +use crate::{ + define::{ + epes_stream_id::{self}, + epsi_stream_type, + }, + errors::MpegError, + pes::Pes, +}; +//(pts: u64,dts:u64, stream_type: u8, payload: BytesMut) +pub type OnFrameFn = Box Result<(), MpegPsError> + Send + Sync>; + +#[derive(Default)] +pub struct AVStream { + pub stream_id: u8, + stream_type: u8, + pts: u64, + dts: u64, + buffer: BytesMut, +} + +pub struct PsDemuxer { + reader: BytesReader, + pack_header: PsPackHeader, + psm: ProgramStreamMap, + psd: ProgramStreamDirectory, + system_header: PsSystemHeader, + pes: Pes, + streams: HashMap, + on_frame_handler: OnFrameFn, +} + +pub fn find_start_code(nalus: &[u8]) -> Option { + let pattern = [0x00, 0x00, 0x01]; + nalus.windows(pattern.len()).position(|w| w == pattern) +} + +impl PsDemuxer { + pub fn new(on_frame_handler: OnFrameFn) -> Self { + Self { + reader: BytesReader::new(BytesMut::default()), + pack_header: PsPackHeader::default(), + psm: ProgramStreamMap::default(), + psd: ProgramStreamDirectory::default(), + system_header: PsSystemHeader::default(), + pes: Pes::default(), + streams: HashMap::default(), + on_frame_handler, + } + } + pub fn demux(&mut self, data: BytesMut) -> Result<(), MpegError> { + self.reader.extend_from_slice(&data[..]); + //log::info!("demux: {}", self.reader.len()); + + while !self.reader.is_empty() { + let prefix_code = self.reader.advance_bytes(4)?; + + if prefix_code[0] != 0x00 || prefix_code[1] != 0x00 || prefix_code[2] != 0x01 { + self.reader.read_u8()?; + continue; + } + + match prefix_code[3] { + epes_stream_id::PES_SID_START => { + log::trace!(" epes_stream_id::PES_SID_START"); + self.pack_header.parse(&mut self.reader)?; + } + epes_stream_id::PES_SID_SYS => { + log::trace!(" epes_stream_id::PES_SID_SYS"); + self.system_header.parse(&mut self.reader)?; + } + epes_stream_id::PES_SID_PSM => { + log::trace!(" epes_stream_id::PES_SID_PSM"); + self.psm.parse(&mut self.reader)?; + for stream in &self.psm.stream_map { + self.streams + .entry(stream.elementary_stream_id) + .or_insert(AVStream { + stream_id: stream.elementary_stream_id, + stream_type: stream.stream_type, + ..Default::default() + }); + } + } + epes_stream_id::PES_SID_PSD => { + log::trace!(" epes_stream_id::PES_SID_PSD"); + self.psd.parse(&mut self.reader)?; + } + epes_stream_id::PES_SID_PRIVATE_1 + | epes_stream_id::PES_SID_PADDING + | epes_stream_id::PES_SID_PRIVATE_2 + | epes_stream_id::PES_SID_ECM + | epes_stream_id::PES_SID_EMM + | epes_stream_id::PES_SID_DSMCC + | epes_stream_id::PES_SID_13522 + | epes_stream_id::PES_SID_H222_A + | epes_stream_id::PES_SID_H222_B + | epes_stream_id::PES_SID_H222_C + | epes_stream_id::PES_SID_H222_D + | epes_stream_id::PES_SID_H222_E + | epes_stream_id::PES_SID_ANCILLARY + | epes_stream_id::PES_SID_MPEG4_SL + | epes_stream_id::PES_SID_MPEG4_FLEX => { + self.parse_packet()?; + } + + epes_stream_id::PES_SID_AUDIO | epes_stream_id::PES_SID_VIDEO => { + // if prefix_code[3] != 224 { + // log::info!("code;{}", prefix_code[3]); + // } + //log::info!("stream_id: {}", prefix_code[3]); + match self.pack_header.mpeg_type { + MpegType::Mpeg1 => { + // log::info!("mpeg1"); + self.pes.parse_mpeg1(&mut self.reader)?; + } + MpegType::Mpeg2 => { + // log::info!("mpeg2: {:?}",self.reader.get_remaining_bytes()); + self.pes.parse_mpeg2(&mut self.reader)?; + } + MpegType::Unknown => { + log::warn!("unknow mpeg type"); + } + } + self.parse_avstream()?; + } + + _ => {} + } + } + + Ok(()) + } + + fn parse_packet(&mut self) -> Result<(), MpegPsError> { + //start code + stream_id + self.reader.read_bytes(4)?; + let packet_length = self.reader.read_u16::()?; + self.reader.read_bytes(packet_length as usize)?; + + Ok(()) + } + + fn parse_avstream(&mut self) -> Result<(), MpegPsError> { + if let Some(stream) = self.streams.get_mut(&self.pes.stream_id) { + match stream.stream_type { + epsi_stream_type::PSI_STREAM_H264 | epsi_stream_type::PSI_STREAM_H265 => { + stream.buffer.extend_from_slice(&self.pes.payload[..]); + + while !stream.buffer.is_empty() { + /* 0x02,...,0x00,0x00,0x01,0x02..,0x00,0x00,0x01 */ + /* | | | | */ + /* ----------- -------- */ + /* first_pos distance_to_first_pos */ + if let Some(first_pos) = find_start_code(&stream.buffer[..]) { + let mut nalu_with_start_code = if let Some(distance_to_first_pos) = + find_start_code(&stream.buffer[first_pos + 3..]) + { + let mut second_pos = first_pos + 3 + distance_to_first_pos; + //judge if the start code is [0x00,0x00,0x00,0x01] + if second_pos > 0 && stream.buffer[second_pos - 1] == 0 { + second_pos -= 1; + } + stream.buffer.split_to(second_pos) + } else { + break; + }; + + let nalu = nalu_with_start_code.split_off(first_pos + 3); + (self.on_frame_handler)( + stream.pts, + stream.dts, + stream.stream_type, + nalu, + )?; + } else { + break; + } + } + } + epsi_stream_type::PSI_STREAM_AAC => { + log::info!(" receive aac"); + if stream.dts != self.pes.dts && !stream.buffer.is_empty() { + (self.on_frame_handler)( + stream.pts, + stream.dts, + stream.stream_type, + self.pes.payload.clone(), + )?; + log::info!(" receive aac 2"); + stream.buffer.clear(); + } + + stream.buffer.extend_from_slice(&self.pes.payload[..]); + } + _ => { + log::error!("unprocessed codec type: {}", stream.stream_type); + } + } + stream.pts = self.pes.pts; + stream.dts = self.pes.dts; + } + + Ok(()) + } +} diff --git a/library/container/mpegts/src/ps/psd.rs b/library/container/mpegts/src/ps/psd.rs new file mode 100644 index 00000000..27409432 --- /dev/null +++ b/library/container/mpegts/src/ps/psd.rs @@ -0,0 +1,147 @@ +use super::errors::MpegPsError; +use crate::define::epes_stream_id::PES_SID_PSD; +use byteorder::BigEndian; +use bytesio::bytes_reader::BytesReader; + +// Syntax No. of bits Mnemonic +// directory_PES_packet(){ +// packet_start_code_prefix 24 bslbf +// directory_stream_id 8 uimsbf +// PES_packet_length 16 uimsbf +// number_of_access_units 15 uimsbf +// marker_bit 1 bslbf +// prev_directory_offset[44..30] 15 uimsbf +// marker_bit 1 bslbf +// prev_directory_offset[29..15] 15 uimsbf +// marker_bit 1 bslbf +// prev_directory_offset[14..0] 15 uimsbf +// marker_bit 1 bslbf +// next_directory_offset[44..30] 15 uimsbf +// marker_bit 1 bslbf +// next_directory_offset[29..15] 15 uimsbf +// marker_bit 1 bslbf +// next_directory_offset[14..0] 15 uimsbf +// marker_bit 1 bslbf +// for (i = 0; i < number_of_access_units; i++) { +// packet_stream_id 8 uimsbf +// PES_header_position_offset_sign 1 tcimsbf +// PES_header_position_offset[43..30] 14 uimsbf +// marker_bit 1 bslbf +// PES_header_position_offset[29..15] 15 uimsbf +// marker_bit 1 bslbf +// PES_header_position_offset[14..0] 15 uimsbf +// marker_bit 1 bslbf +// reference_offset 16 uimsbf + +// marker_bit 1 bslbf +// reserved 3 bslbf +// PTS[32..30] 3 uimsbf +// marker_bit 1 bslbf + +// PTS[29..15] 15 uimsbf +// marker_bit 1 bslbf + +// PTS[14..0] 15 uimsbf +// marker_bit 1 bslbf + +// bytes_to_read[22..8] 15 uimsbf +// marker_bit 1 bslbf + +// bytes_to_read[7..0] 8 uimsbf + +// marker_bit 1 bslbf +// intra_coded_indicator 1 bslbf +// coding_parameters_indicator 2 bslbf +// reserved 4 bslbf +// } +// } + +#[derive(Default)] +pub struct AccessUnit { + pub packet_stream_id: u8, + pub pes_header_position_offset_sign: u8, + pub pes_header_position_offset: u64, + pub reference_offset: u16, + + pub pts: u64, + pub bytes_to_read: u32, + pub intra_coded_indicator: u8, + pub coding_parameters_indicator: u8, +} + +#[derive(Default)] +pub struct ProgramStreamDirectory { + directory_stream_id: u8, + pes_packet_length: u16, + number_of_access_units: u16, + prev_directory_offset: u64, + next_directory_offset: u64, + access_units: Vec, +} + +impl ProgramStreamDirectory { + pub fn parse(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegPsError> { + let start = bytes_reader.read_bytes(4)?; + + if start.to_vec() != [0x00, 0x00, 0x01, PES_SID_PSD] { + return Err(MpegPsError { + value: super::errors::MpegPsErrorValue::StartCodeNotCorrect, + }); + } + + self.directory_stream_id = PES_SID_PSD; + self.pes_packet_length = bytes_reader.read_u16::()?; + self.number_of_access_units = bytes_reader.read_u16::()? >> 1; + + self.prev_directory_offset = bytes_reader.read_u16::()? as u64 >> 1; + self.prev_directory_offset = (self.prev_directory_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + self.prev_directory_offset = (self.prev_directory_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + + self.next_directory_offset = bytes_reader.read_u16::()? as u64 >> 1; + self.next_directory_offset = (self.next_directory_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + self.next_directory_offset = (self.next_directory_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + + for _ in 0..self.number_of_access_units { + let packet_stream_id = bytes_reader.read_u8()?; + + let next_2_bytes = bytes_reader.read_u16::()?; + let pes_header_position_offset_sign = (next_2_bytes >> 15) as u8; + //0b11 1111 1111 1111; + let mut pes_header_position_offset = (next_2_bytes >> 1) as u64 & 0x3FFF; + pes_header_position_offset = (pes_header_position_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + pes_header_position_offset = (pes_header_position_offset << 15) + | (bytes_reader.read_u16::()? as u64 >> 1); + + let reference_offset = bytes_reader.read_u16::()?; + + let mut pts = (bytes_reader.read_u8()? as u64 >> 1) & 0x07; + pts = (pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + pts = (pts << 15) | (bytes_reader.read_u16::()? as u64 >> 1); + + let mut bytes_to_read = bytes_reader.read_u16::()? as u32 >> 1; + bytes_to_read = (bytes_to_read << 15) | bytes_reader.read_u8()? as u32; + + let next_byte = bytes_reader.read_u8()?; + let intra_coded_indicator = (next_byte >> 6) & 0x01; + let coding_parameters_indicator = (next_byte >> 4) & 0x03; + + self.access_units.push(AccessUnit { + packet_stream_id, + pes_header_position_offset_sign, + pes_header_position_offset, + reference_offset, + pts, + bytes_to_read, + intra_coded_indicator, + coding_parameters_indicator, + }); + } + + Ok(()) + } +} diff --git a/library/container/mpegts/src/ps/psm.rs b/library/container/mpegts/src/ps/psm.rs new file mode 100644 index 00000000..20ccf4df --- /dev/null +++ b/library/container/mpegts/src/ps/psm.rs @@ -0,0 +1,197 @@ +use super::errors::MpegPsError; +use crate::define::epes_stream_id::PES_SID_PSM; +use byteorder::BigEndian; +use {bytes::BytesMut, bytesio::bytes_reader::BytesReader}; + +#[derive(Default)] +pub struct ElementaryStreamMap { + pub stream_type: u8, + pub elementary_stream_id: u8, + pub elementary_stream_info_length: u16, + pub pseudo_descriptor_tag: u8, + pub pseudo_descriptor_length: u8, + pub elementary_stream_id_extension: u8, +} +//T-REC-H.222.0-201703-S!!PDF-E.pdf Table 2-41 P69 +// program_stream_map() { +// packet_start_code_prefix 24 bslbf +// map_stream_id 8 uimsbf +// program_stream_map_length 16 uimsbf +// current_next_indicator 1 bslbf +// single_extension_stream_flag 1 bslbf +// reserved 1 bslbf +// program_stream_map_version 5 uimsbf +// reserved 7 bslbf +// marker_bit 1 bslbf +// program_stream_info_length 16 uimsbf +// for (i = 0; i < N; i++) { +// descriptor() +// } +// elementary_stream_map_length 16 uimsbf +// for (i = 0; i < N1; i++) { +// stream_type 8 uimsbf +// elementary_stream_id 8 uimsbf +// elementary_stream_info_length 16 Uimsbf +// if ( elementary_stream_id = = 0xFD && +// single_extension_stream_flag == 0) { +// pseudo_descriptor_tag 8 Uimsbf +// pseudo_descriptor_length 8 Uimsbf +// marker_bit 1 Bslbf +// elementary_stream_id_extension 7 Uimsbf +// for (i = 3; i < N2; i++) { +// descriptor() +// } +// } +// else { +// for (i = 0; i < N2; i++) { +// descriptor() +// } +// } +// } +// CRC_32 32 rpchof +// } + +#[derive(Default)] +pub struct ProgramStreamMap { + map_stream_id: u8, + program_stream_map_length: u16, + current_next_indicator: u8, + single_extension_stream_flag: u8, + + program_stream_map_version: u8, + program_stream_info_length: u16, + elementary_stream_map_length: u16, + pub stream_map: Vec, +} + +pub fn print(data: BytesMut) { + println!("==========={}", data.len()); + let mut idx = 0; + for i in data { + print!("{i:02X} "); + idx += 1; + if idx % 16 == 0 { + println!() + } + } + + println!("===========") +} + +impl ProgramStreamMap { + pub fn parse(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegPsError> { + // let psm_length = (bytes_reader.get(4)? as usize) << 8 | bytes_reader.get(5)? as usize; + // log::info!( + // "current pes packet length: {} : {}", + // psm_length, + // bytes_reader.len() - 6 + // ); + // print(bytes_reader.get_remaining_bytes()); + // if psm_length > bytes_reader.len() - 6 { + // return Err(MpegPsError { + // value: crate::ps::errors::MpegPsErrorValue::NotEnoughBytes, + // }); + // } + + // 00 00 01 BC + // 00 12 + // E0 + // FF 00 00 00 08 1B E0 00 00 + // 90 C0 00 00 00 00 00 00 + + // 00 00 01 E0 00 1D 84 80 + // 05 21 00 55 D4 79 00 00 00 01 67 42 C0 16 DA 82 + // 80 F4 9A 81 01 01 03 C2 01 0A 80 00 00 01 E0 00 + // 10 84 80 05 21 00 55 D4 79 00 00 00 01 68 CE 3C + // 80 + + bytes_reader.backup(); + + let start = bytes_reader.read_bytes(4)?; + + // log::info!("psm start"); + + if start.to_vec() != [0x00, 0x00, 0x01, PES_SID_PSM] { + return Err(MpegPsError { + value: super::errors::MpegPsErrorValue::StartCodeNotCorrect, + }); + } + // log::info!("psm start1"); + self.map_stream_id = PES_SID_PSM; + self.program_stream_map_length = bytes_reader.read_u16::()?; + + if self.program_stream_map_length as usize > bytes_reader.len() { + bytes_reader.restore(); + return Err(MpegPsError { + value: crate::ps::errors::MpegPsErrorValue::NotEnoughBytes, + }); + } + + let byte_7 = bytes_reader.read_u8()?; + self.current_next_indicator = byte_7 >> 7; + self.single_extension_stream_flag = (byte_7 >> 6) & 0x01; + self.program_stream_map_version = byte_7 & 0x1F; + bytes_reader.read_u8()?; + + self.program_stream_info_length = bytes_reader.read_u16::()?; + // log::info!("psm start2 : {}", self.program_stream_info_length); + if self.program_stream_info_length as usize + 2 > bytes_reader.len() { + bytes_reader.restore(); + return Err(MpegPsError { + value: crate::ps::errors::MpegPsErrorValue::NotEnoughBytes, + }); + } + + bytes_reader.read_bytes(self.program_stream_info_length as usize)?; + + self.elementary_stream_map_length = bytes_reader.read_u16::()?; + + // log::info!( + // "elementary_stream_map_length: {}", + // self.elementary_stream_map_length + // ); + + if self.elementary_stream_map_length as usize + 4 > bytes_reader.len() { + bytes_reader.restore(); + return Err(MpegPsError { + value: crate::ps::errors::MpegPsErrorValue::NotEnoughBytes, + }); + } + + let remaining_bytes = bytes_reader.len() - self.elementary_stream_map_length as usize; + + while bytes_reader.len() > remaining_bytes { + let stream_type = bytes_reader.read_u8()?; + let elementary_stream_id = bytes_reader.read_u8()?; + let elementary_stream_info_length = bytes_reader.read_u16::()?; + + let (pseudo_descriptor_tag, pseudo_descriptor_length, elementary_stream_id_extension) = + if elementary_stream_id == 0xFD && self.single_extension_stream_flag == 0 { + let pseudo_descriptor_tag = bytes_reader.read_u8()?; + let pseudo_descriptor_length = bytes_reader.read_u8()?; + let elementary_stream_id_extension = bytes_reader.read_u8()? & 0x7F; + bytes_reader.read_bytes(elementary_stream_info_length as usize - 3)?; + ( + pseudo_descriptor_tag, + pseudo_descriptor_length, + elementary_stream_id_extension, + ) + } else { + bytes_reader.read_bytes(elementary_stream_info_length as usize)?; + (0, 0, 0) + }; + + self.stream_map.push(ElementaryStreamMap { + stream_type, + elementary_stream_id, + elementary_stream_info_length, + pseudo_descriptor_tag, + pseudo_descriptor_length, + elementary_stream_id_extension, + }); + } + bytes_reader.read_bytes(4)?; + // log::info!("psm end"); + Ok(()) + } +} diff --git a/library/container/mpegts/src/ps/system_header.rs b/library/container/mpegts/src/ps/system_header.rs new file mode 100644 index 00000000..8c764407 --- /dev/null +++ b/library/container/mpegts/src/ps/system_header.rs @@ -0,0 +1,152 @@ +use super::errors::MpegPsError; +use crate::define::epes_stream_id::PES_SID_SYS; +use byteorder::BigEndian; +use bytesio::bytes_reader::BytesReader; + +#[derive(Default)] +pub struct PsStream { + pub stream_id: u8, + pub stream_id_extension: u8, + pub buffer_bound_scale: u8, + pub buffer_size_bound: u16, +} + +#[derive(Default)] +pub struct PsSystemHeader { + header_length: u16, + rate_bound: u32, + audio_bound: u8, + fixed_flag: u8, + csps_flag: u8, + system_audio_lock_flag: u8, + system_video_lock_flag: u8, + video_bound: u8, + packet_rate_restriction_flag: u8, + streams: Vec, +} + +impl PsSystemHeader { + //T-REC-H.222.0-201703-S!!PDF-E.pdf Table 2-40 P66 + // system_header () { + // system_header_start_code 32 bslbf + // header_length 16 uimsbf + // marker_bit 1 bslbf + // rate_bound 22 uimsbf + // marker_bit 1 bslbf + // audio_bound 6 uimsbf + // fixed_flag 1 bslbf + // CSPS_flag 1 bslbf + // system_audio_lock_flag 1 bslbf + // system_video_lock_flag 1 bslbf + // marker_bit 1 bslbf + // video_bound 5 uimsbf + // packet_rate_restriction_flag 1 bslbf + // reserved_bits 7 bslbf + // while (nextbits () == '1') { + // stream_id 8 uimsbf + // if (stream_id == '1011 0111') { + // '11' 2 bslbf + // '000 0000' 7 bslbf + // stream_id_extension 7 uimsbf + // '1011 0110' 8 bslbf + // '11' 2 bslbf + // P-STD_buffer_bound_scale 1 bslbf + // P-STD_buffer_size_bound 13 uimsbf + // } + // else { + // '11' 2 bslbf + // P-STD_buffer_bound_scale 1 bslbf + // P-STD_buffer_size_bound 13 uimsbf + // } + // } + // } + pub fn parse(&mut self, bytes_reader: &mut BytesReader) -> Result<(), MpegPsError> { + let start = bytes_reader.read_bytes(4)?; + + if start.to_vec() != [0x00, 0x00, 0x01, PES_SID_SYS] { + return Err(MpegPsError { + value: super::errors::MpegPsErrorValue::StartCodeNotCorrect, + }); + } + + self.header_length = bytes_reader.read_u16::()?; + self.rate_bound = (bytes_reader.read_u24::()? & 0x7FFFFF) >> 1; + + let byte_10 = bytes_reader.read_u8()?; + self.audio_bound = byte_10 >> 2; + self.fixed_flag = (byte_10 >> 1) & 0x01; + self.csps_flag = byte_10 & 0x01; + + let byte_11 = bytes_reader.read_u8()?; + self.system_audio_lock_flag = byte_11 >> 7; + self.system_video_lock_flag = (byte_11 >> 6) & 0x01; + self.video_bound = byte_11 & 0x1F; + + let byte_12 = bytes_reader.read_u8()?; + self.packet_rate_restriction_flag = byte_12 >> 7; + + while !bytes_reader.is_empty() && (bytes_reader.advance_u8()? >> 7) == 0x01 { + let stream_id = bytes_reader.read_u8()?; + + let stream_id_extension = if stream_id == 0xB7 { + let next_byte = bytes_reader.read_u8()?; + assert!(next_byte >> 6 == 0b11); + assert!(next_byte & 0x3F == 0b0); + + let next_byte_2 = bytes_reader.read_u8()?; + assert!(next_byte_2 >> 7 == 0b0); + let stream_id_extension = next_byte_2 & 0x7F; + + let next_byte_3 = bytes_reader.read_u8()?; + assert!(next_byte_3 == 0b10110110); + stream_id_extension + } else { + 0 + }; + + let next_2bytes = bytes_reader.read_u16::()?; + assert!(next_2bytes >> 14 == 0b11); + let buffer_bound_scale = (next_2bytes >> 13) as u8 & 0x01; + let buffer_size_bound = next_2bytes & 0x1FFF; + + self.streams.push(PsStream { + stream_id, + stream_id_extension, + buffer_bound_scale, + buffer_size_bound, + }); + } + + Ok(()) + } +} +#[cfg(test)] +mod tests { + use byteorder::BigEndian; + + use { + bytes::BytesMut, + bytesio::{bits_reader::BitsReader, bytes_reader::BytesReader}, + }; + + #[test] + pub fn test_bytes_reader() { + let v = [0xFF, 0x01, 0x02]; + let mut b = BytesMut::new(); + b.extend_from_slice(&v); + let reader1 = BytesReader::new(b.clone()); + + let mut read2 = BytesReader::new(b); + + let mut bits_reader = BitsReader::new(reader1); + + println!("{}", bits_reader.read_bit().unwrap()); + + println!("{}", bits_reader.read_n_bits(22).unwrap()); + println!("{}", bits_reader.read_bit().unwrap()); + + let aa = read2.read_u24::().unwrap(); + + println!("{}", (aa & 0x7FFFFF) >> 1); + } +} diff --git a/library/container/mpegts/src/ts.rs b/library/container/mpegts/src/ts.rs index af6507af..d7bcb8b7 100644 --- a/library/container/mpegts/src/ts.rs +++ b/library/container/mpegts/src/ts.rs @@ -2,7 +2,7 @@ use { super::{ define, define::{epat_pid, epes_stream_id, ts}, - errors::{MpegTsError, MpegTsErrorValue}, + errors::{MpegError, MpegErrorValue}, pat, pes, pes::PesMuxer, pmt, utils, @@ -17,7 +17,7 @@ pub struct TsMuxer { pmt_continuity_counter: u8, h264_h265_with_aud: bool, pid: u16, - pat_period: i64, + pat_period: u64, pcr_period: i64, pcr_clock: i64, pat: pat::Pat, @@ -65,11 +65,11 @@ impl TsMuxer { pub fn write( &mut self, pid: u16, - pts: i64, - dts: i64, + pts: u64, + dts: u64, flags: u16, payload: BytesMut, - ) -> Result<(), MpegTsError> { + ) -> Result<(), MpegError> { self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0; //print!("pes payload length {}\n", payload.len()); @@ -137,7 +137,7 @@ impl TsMuxer { pid: u16, payload: BytesMut, continuity_counter: u8, - ) -> Result<(), MpegTsError> { + ) -> Result<(), MpegError> { /*sync byte*/ self.bytes_writer.write_u8(0x47)?; //0 /*PID 13 bits*/ @@ -161,7 +161,7 @@ impl TsMuxer { Ok(()) } //2.4.3.6 PES packet P35 - pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError> { + pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegError> { let mut is_start: bool = true; let mut payload_reader = BytesReader::new(payload); @@ -208,7 +208,7 @@ impl TsMuxer { pes_header_length: usize, payload_data_length: usize, is_start: bool, - ) -> Result { + ) -> Result { let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap(); let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap(); @@ -241,7 +241,7 @@ impl TsMuxer { if (stream_data.pid == pcr_pid) || ((stream_data.data_alignment_indicator > 0) - && define::PTS_NO_VALUE != stream_data.pts) + && define::PTS_NO_VALUE != stream_data.pts as i64) { /*adaption field control*/ ts_header.or_u8_at(3, 0x20)?; @@ -256,7 +256,7 @@ impl TsMuxer { /*adaption field flags*/ ts_header.or_u8_at(5, define::AF_FLAG_PCR)?; - let pcr = if define::PTS_NO_VALUE == stream_data.dts { + let pcr = if define::PTS_NO_VALUE == stream_data.dts as i64 { stream_data.pts } else { stream_data.dts @@ -269,7 +269,7 @@ impl TsMuxer { } if (stream_data.data_alignment_indicator > 0) - && define::PTS_NO_VALUE != stream_data.pts + && define::PTS_NO_VALUE != stream_data.pts as i64 { /*adaption field flags*/ ts_header.or_u8_at(5, define::AF_FLAG_RANDOM_ACCESS_INDICATOR)?; @@ -319,7 +319,7 @@ impl TsMuxer { Ok(payload_data_length) } - pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegTsError> { + pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegError> { // let mut pmt_index: usize = 0; let mut stream_index: usize = 0; @@ -348,34 +348,29 @@ impl TsMuxer { // pmt_index += 1; // } - Err(MpegTsError { - value: MpegTsErrorValue::StreamNotFound, + Err(MpegError { + value: MpegErrorValue::StreamNotFound, }) } - pub fn add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result { + pub fn add_stream(&mut self, codecid: u8) -> Result { if self.pat.pmt.is_empty() { self.add_program(1, BytesMut::new())?; } - self.pmt_add_stream(0, codecid, extra_data) + self.pmt_add_stream(0, codecid) } - pub fn pmt_add_stream( - &mut self, - pmt_index: usize, - codecid: u8, - extra_data: BytesMut, - ) -> Result { + pub fn pmt_add_stream(&mut self, pmt_index: usize, codecid: u8) -> Result { let pmt = &mut self.pat.pmt[pmt_index]; if pmt.streams.len() == 4 { - return Err(MpegTsError { - value: MpegTsErrorValue::StreamCountExeceed, + return Err(MpegError { + value: MpegErrorValue::StreamCountExeceed, }); } - let mut cur_stream = pes::Pes::new(); //&mut pmt.streams[pmt.stream_count]; + let mut cur_stream = pes::Pes::default(); //&mut pmt.streams[pmt.stream_count]; cur_stream.codec_id = codecid; cur_stream.pid = self.pid; @@ -389,9 +384,9 @@ impl TsMuxer { cur_stream.stream_id = epes_stream_id::PES_SID_PRIVATE_1; } - if !extra_data.is_empty() { - cur_stream.esinfo.put(extra_data); - } + // if !extra_data.is_empty() { + // cur_stream.esinfo.put(extra_data); + // } pmt.streams.push(cur_stream); pmt.version_number = (pmt.version_number + 1) % 32; @@ -401,18 +396,18 @@ impl TsMuxer { Ok(self.pid - 1) } - pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError> { + pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegError> { for cur_pmt in self.pat.pmt.iter() { if cur_pmt.program_number == program_number { - return Err(MpegTsError { - value: MpegTsErrorValue::ProgramNumberExists, + return Err(MpegError { + value: MpegErrorValue::ProgramNumberExists, }); } } if self.pat.pmt.len() == 4 { - return Err(MpegTsError { - value: MpegTsErrorValue::PmtCountExeceed, + return Err(MpegError { + value: MpegErrorValue::PmtCountExeceed, }); } let mut cur_pmt = pmt::Pmt::new(); //&mut self.pat.pmt[self.pat.pmt_count]; diff --git a/library/container/mpegts/src/utils.rs b/library/container/mpegts/src/utils.rs index e73d3715..d92d09d0 100644 --- a/library/container/mpegts/src/utils.rs +++ b/library/container/mpegts/src/utils.rs @@ -3,9 +3,9 @@ use { bytesio::{bytes_errors::BytesWriteError, bytes_writer::BytesWriter}, }; -pub fn pcr_write(pcr_result: &mut BytesWriter, pcr: i64) -> Result<(), BytesWriteError> { - let pcr_base: i64 = pcr / 300; - let pcr_ext: i64 = pcr % 300; +pub fn pcr_write(pcr_result: &mut BytesWriter, pcr: u64) -> Result<(), BytesWriteError> { + let pcr_base: u64 = pcr / 300; + let pcr_ext: u64 = pcr % 300; pcr_result.write_u8((pcr_base >> 25) as u8)?; pcr_result.write_u8((pcr_base >> 17) as u8)?; diff --git a/library/streamhub/src/define.rs b/library/streamhub/src/define.rs index a389c961..da702ceb 100644 --- a/library/streamhub/src/define.rs +++ b/library/streamhub/src/define.rs @@ -50,6 +50,8 @@ pub enum PublishType { PushWebRTC, /* It used for publishing raw rtp data of rtsp/whbrtc(whip) */ PushRtp, + /* Receive ps stream from remote push client(GB28181), */ + PushPsStream, } #[derive(Debug, Serialize, Clone)] diff --git a/library/streamhub/src/stream.rs b/library/streamhub/src/stream.rs index 38321c1d..346b1353 100644 --- a/library/streamhub/src/stream.rs +++ b/library/streamhub/src/stream.rs @@ -16,6 +16,9 @@ pub enum StreamIdentifier { app_name: String, stream_name: String, }, + GB28181 { + stream_name: String, + }, } impl fmt::Display for StreamIdentifier { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -40,6 +43,9 @@ impl fmt::Display for StreamIdentifier { "WebRTC - app_name: {app_name}, stream_name: {stream_name}" ) } + StreamIdentifier::GB28181 { stream_name } => { + write!(f, "GB28181 - stream_name: {stream_name}") + } StreamIdentifier::Unkonwn => { write!(f, "Unkonwn") } diff --git a/protocol/gb28181/Cargo.toml b/protocol/gb28181/Cargo.toml new file mode 100644 index 00000000..05388237 --- /dev/null +++ b/protocol/gb28181/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "gb28181" +version = "0.1.0" +description = "gb28181 library." +edition = "2021" +authors = ["HarlanC "] +license = "MIT" +repository = "https://github.com/harlanc/xiu" +categories = ["multimedia", "multimedia::video", 'multimedia::audio'] +keywords = ["gb28181", "video", "streaming"] +readme = "README.md" + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rand = "0.8" +byteorder = "1.4.2" +tokio = "1.4.0" +bytes = "1.0.0" +log = "0.4" +failure = "0.1.1" +http = "0.2.9" +indexmap = "1.9.3" +lazy_static = "1.4.0" +chrono = "0.4" +async-trait = "0.1.70" +base64 = "0.21.2" +hex = "0.4.3" +axum = "0.6.10" +serde_derive = "1.0" +serde = { version = "1.0.101", optional = true, features = ["derive"] } +anyhow = "^1.0" +serde_json = { version = "1", default-features = false, features = [ + "alloc", + "raw_value", + "std", +] } +reqwest = "0.11.14" + + +bytesio = { path = "../../library/bytesio/" } +streamhub = { path = "../../library/streamhub/" } +xmpegts = { path = "../../library/container/mpegts/" } +xrtsp = { path = "../rtsp/" } + +[features] +default = ["std"] +std = ["serde"] + + diff --git a/protocol/gb28181/README.md b/protocol/gb28181/README.md new file mode 100644 index 00000000..20efa0d2 --- /dev/null +++ b/protocol/gb28181/README.md @@ -0,0 +1,35 @@ +A gb28181 library. + + + +- 启动 RTMP 和 GB28181 api server + + xiu -r 1935 -g 3000 + +- 使用如下命令开启UDP端口,创建UDP server socket,接收PS数据,并自动转封装成RTMP + + curl -X POST -d "stream_id=test&secret=xiu&port=30000" "http://localhost:3000/index/api/openRtpServer" + + 会返回本地upd端口: + + {"code":0,"port":30000} + + 所以port参数也可以传递0,server端会随机选择一个端口并返回。 + + 可以开启dump录制文件,用于重现问题。 + + curl -X POST -d "stream_id=test&secret=xiu&port=30000&need_dump=true" "http://localhost:3000/index/api/openRtpServer" + +- 使用如下命令关闭udp socket + + curl -X POST -d "stream_id=test&secret=xiu" "http://localhost:3000/index/api/clostRtpServer" + + +- RTMP拉流地址: + + rtmp://localhost:1935/gb28181/{stream_id} + +比如上面的例子就是 : + + rtmp://localhost:1935/gb28181/test + diff --git a/protocol/gb28181/src/api_service.rs b/protocol/gb28181/src/api_service.rs new file mode 100644 index 00000000..a1e3a876 --- /dev/null +++ b/protocol/gb28181/src/api_service.rs @@ -0,0 +1,258 @@ +use tokio::sync::Mutex; + +use { + super::gb28181::GB28181Server, + anyhow::Result, + axum::{routing::post, Json, Router}, + serde::Deserialize, + std::sync::Arc, + streamhub::define::StreamHubEventSender, + tokio, +}; + +use axum::http::StatusCode; +use serde::Serialize; + +#[derive(Deserialize)] +pub struct RequestFormData { + secret: String, + pub schema: Option, + stream_id: Option, + pub re_use_port: Option, + port: Option, + pub tcp_mode: Option, + need_dump: Option, //vhost : String, +} + +#[derive(Serialize)] +struct HttpResponse { + code: i16, + changed: Option, + stream_id: Option, + schema: Option, +} + +struct ApiService { + secret: String, + gb_server: GB28181Server, +} + +impl ApiService { + async fn get_server_config( + &self, + request_data: axum::extract::Form, + ) -> Result { + let response_body = if request_data.secret == self.secret { + HttpResponse { + code: 0, + changed: None, + stream_id: None, + schema: None, + } + } else { + HttpResponse { + code: -1, + changed: None, + stream_id: None, + schema: None, + } + }; + + Ok(serde_json::to_string(&response_body)?) + } + + async fn set_server_config( + &self, + request_data: axum::extract::Form, + ) -> Json { + let response_body = if request_data.secret == self.secret { + serde_json::json!({ + "code": 0, + "changed": 0, + }) + } else { + serde_json::json!({ + "code": -1, + }) + }; + + Json(response_body) + } + + async fn get_media_list( + &self, + request_data: axum::extract::Form, + ) -> (StatusCode, Json) { + let response_body = if request_data.secret == self.secret { + HttpResponse { + code: 0, + changed: None, + stream_id: None, + schema: None, + } + } else { + HttpResponse { + code: -1, + changed: None, + stream_id: None, + schema: None, + } + }; + + (StatusCode::OK, Json(response_body)) + } + + async fn get_rtp_info( + &self, + request_data: axum::extract::Form, + ) -> Json { + let response_body = if request_data.secret == self.secret { + serde_json::json!({ + "code": 0, + "exist":false + }) + } else { + serde_json::json!({ + "code": -1, + }) + }; + + Json(response_body) + } + + async fn open_rtp_server( + &mut self, + request_data: axum::extract::Form, + ) -> Json { + log::info!("open rtp server"); + if request_data.secret != self.secret { + return Json(serde_json::json!({ + "code": -1, + "err_msg": "The secret is not correct" + })); + }; + + if request_data.stream_id.is_none() { + return Json(serde_json::json!({ + "code": -2, + "err_msg": "The stream name should not be empty" + })); + } + + let local_port = if let Some(port) = request_data.port { + port + } else { + 0 + }; + + let need_dump = if let Some(dump) = request_data.need_dump { + dump + } else { + false + }; + + match self + .gb_server + .start_session( + local_port, + request_data.stream_id.clone().unwrap(), + need_dump, + ) + .await + { + Ok(port) => { + let response_body = serde_json::json!({ + "code": 0, + "port":port, + }); + + Json(response_body) + } + Err(err) => { + let response_body = serde_json::json!({ + "code": -3, + "err_msg": format!("{:?}",err) + }); + + Json(response_body) + } + } + } + + async fn close_rtp_server( + &mut self, + request_data: axum::extract::Form, + ) -> Json { + let response_body = if request_data.secret == self.secret { + serde_json::json!({ + "code": 0, + }) + } else { + serde_json::json!({ + "code": -1, + }) + }; + + self.gb_server + .stop_session(request_data.stream_id.clone().unwrap()) + .await; + + Json(response_body) + } +} + +pub async fn run(producer: StreamHubEventSender, port: usize) { + let api = Arc::new(Mutex::new(ApiService { + secret: String::from("xiu"), + gb_server: GB28181Server::new(producer), + })); + + let api_0 = api.clone(); + let get_server_config = move |request_data: axum::extract::Form| async move { + match api_0.lock().await.get_server_config(request_data).await { + Ok(response) => response, + Err(_) => "error".to_owned(), + } + }; + + let api_1 = api.clone(); + let set_server_config = move |request_data: axum::extract::Form| async move { + api_1.lock().await.set_server_config(request_data).await + }; + + let api_2 = api.clone(); + let get_media_list = move |request_data: axum::extract::Form| async move { + api_2.lock().await.get_media_list(request_data).await + }; + + let api_3 = api.clone(); + let get_rtp_info = move |request_data: axum::extract::Form| async move { + api_3.lock().await.get_rtp_info(request_data).await + }; + + let api_4 = api.clone(); + let open_rtp_server = move |request_data: axum::extract::Form| async move { + api_4.lock().await.open_rtp_server(request_data).await + }; + + let api_5 = api.clone(); + let close_rtp_server = move |request_data: axum::extract::Form| async move { + api_5.lock().await.close_rtp_server(request_data).await + }; + + let app = Router::new() + .route("/index/api/getServerConfig", post(get_server_config)) + .route("/index/api/setServerConfig", post(set_server_config)) + .route("/index/api/getMediaList", post(get_media_list)) + .route("/index/api/getRtpInfo", post(get_rtp_info)) + .route("/index/api/openRtpServer", post(open_rtp_server)) + .route("/index/api/clostRtpServer", post(close_rtp_server)); + + log::info!("GB28181 api server listening on http://:{}", port); + axum::Server::bind(&([127, 0, 0, 1], port as u16).into()) + .serve(app.into_make_service()) + .await + .unwrap(); + + log::info!("GB28181 api server end..."); +} diff --git a/protocol/gb28181/src/errors.rs b/protocol/gb28181/src/errors.rs new file mode 100644 index 00000000..d8fbf912 --- /dev/null +++ b/protocol/gb28181/src/errors.rs @@ -0,0 +1,33 @@ +use { + failure::{Backtrace, Fail}, + std::fmt, +}; + +#[derive(Debug)] +pub struct Gb28181Error { + pub value: Gb28181ErrorValue, +} + +#[derive(Debug, Fail)] +pub enum Gb28181ErrorValue { + #[fail(display = "The session name alreay exists.")] + SessionExists, + #[fail(display = "New server session failed.")] + NewSessionFailed, +} + +impl fmt::Display for Gb28181Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.value, f) + } +} + +impl Fail for Gb28181Error { + fn cause(&self) -> Option<&dyn Fail> { + self.value.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.value.backtrace() + } +} diff --git a/protocol/gb28181/src/gb28181.rs b/protocol/gb28181/src/gb28181.rs new file mode 100644 index 00000000..3b18f607 --- /dev/null +++ b/protocol/gb28181/src/gb28181.rs @@ -0,0 +1,119 @@ +use super::errors::Gb28181Error; +use super::session::GB28181ServerSession; +use std::collections::HashMap; +use streamhub::define::StreamHubEventSender; +use tokio::sync::mpsc::UnboundedSender; + +pub type SessionExistSender = UnboundedSender<()>; + +pub struct GB28181Server { + event_producer: StreamHubEventSender, + session_name_2_exist_sender: HashMap, +} + +impl GB28181Server { + pub fn new(event_producer: StreamHubEventSender) -> Self { + Self { + event_producer, + session_name_2_exist_sender: HashMap::new(), + } + } + + pub async fn start_session( + &mut self, + local_port: u16, + stream_name: String, + need_dump: bool, + ) -> Result { + if self.session_name_2_exist_sender.contains_key(&stream_name) { + return Err(Gb28181Error { + value: crate::errors::Gb28181ErrorValue::SessionExists, + }); + } + + if let Some(mut session) = GB28181ServerSession::new( + local_port, + self.event_producer.clone(), + stream_name.clone(), + need_dump, + ) + .await + { + let local_port = session.local_port; + let exist_sender = session.exit_sender.clone(); + + log::info!("GB28181 server session listening on udp://{}", local_port); + tokio::spawn(async move { + if let Err(err) = session.run().await { + log::error!("session run error, err: {}", err); + } + }); + + self.session_name_2_exist_sender + .insert(stream_name, exist_sender); + return Ok(local_port); + } + + Err(Gb28181Error { + value: crate::errors::Gb28181ErrorValue::NewSessionFailed, + }) + } + + pub async fn stop_session(&mut self, stream_name: String) { + if let Some(exist_sender) = self.session_name_2_exist_sender.get_mut(&stream_name) { + if let Err(err) = exist_sender.send(()) { + log::error!("exist sender send error: {:?}", err); + } + self.session_name_2_exist_sender.remove(&stream_name); + } else { + log::warn!("The session with stream name: {stream_name} does not exist.") + } + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn send_dump_file() { + // let file_path = ""; // 替换为实际的文件路径 + // let mut file = File::open(file_path).unwrap(); + + // // 创建 UDP socket + // let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); // 绑定到任意可用端口 + + // loop { + // // let time_delta = match file.read_u16::() { + // // Ok(value) => value, + // // Err(err) => { + // // log::error!("file read error: {}", err); + // // break; + // // } // 文件已读取完毕或发生错误 + // // }; + // // sleep(Duration::from_millis(time_delta as u64)); + + // // 读取 10 个字节 + // // 读取 4 个字节作为大端 u32 + // let length = match file.read_u16::() { + // Ok(value) => value, + // Err(err) => { + // log::error!("file read error: {}", err); + // break; + // } // 文件已读取完毕或发生错误 + // }; + // println!("length:{}", length); + + // // 读取指定长度的字节 + // let mut buffer = vec![0u8; length as usize]; + // if let Err(err) = file.read_exact(&mut buffer) { + // log::error!("read file err: {err}"); + // } + + // // 发送数据到 UDP 端口 + // let addr = "127.0.0.1:30000"; // UDP 目标地址 + // let _sent_bytes = socket.send_to(&buffer, addr).unwrap(); + // // println!("Sent {} bytes to {}: {:?}", sent_bytes, addr, buffer); + // thread::sleep(Duration::from_millis(2)); + // } + } +} diff --git a/protocol/gb28181/src/lib.rs b/protocol/gb28181/src/lib.rs new file mode 100644 index 00000000..57e67c66 --- /dev/null +++ b/protocol/gb28181/src/lib.rs @@ -0,0 +1,5 @@ +#![recursion_limit = "2560"] +pub mod api_service; +pub mod errors; +pub mod gb28181; +pub mod session; diff --git a/protocol/gb28181/src/session/errors.rs b/protocol/gb28181/src/session/errors.rs new file mode 100644 index 00000000..7c35df4e --- /dev/null +++ b/protocol/gb28181/src/session/errors.rs @@ -0,0 +1,109 @@ +use { + bytesio::bytes_errors::BytesReadError, + bytesio::{bytes_errors::BytesWriteError, bytesio_errors::BytesIOError}, + failure::{Backtrace, Fail}, + std::fmt, + std::str::Utf8Error, + streamhub::errors::ChannelError, + tokio::sync::oneshot::error::RecvError, + xmpegts::errors::MpegError, +}; + +#[derive(Debug)] +pub struct SessionError { + pub value: SessionErrorValue, +} + +#[derive(Debug, Fail)] +pub enum SessionErrorValue { + #[fail(display = "net io error: {}", _0)] + BytesIOError(#[cause] BytesIOError), + #[fail(display = "bytes read error: {}", _0)] + BytesReadError(#[cause] BytesReadError), + #[fail(display = "bytes write error: {}", _0)] + BytesWriteError(#[cause] BytesWriteError), + #[fail(display = "Utf8Error: {}", _0)] + Utf8Error(#[cause] Utf8Error), + #[fail(display = "MpegError: {}", _0)] + MpegError(#[cause] MpegError), + #[fail(display = "event execute error: {}", _0)] + ChannelError(#[cause] ChannelError), + #[fail(display = "tokio: oneshot receiver err: {}", _0)] + RecvError(#[cause] RecvError), + #[fail(display = "stream hub event send error\n")] + StreamHubEventSendErr, + #[fail(display = "cannot receive frame data from stream hub\n")] + CannotReceiveFrameData, +} + +impl From for SessionError { + fn from(error: BytesIOError) -> Self { + SessionError { + value: SessionErrorValue::BytesIOError(error), + } + } +} + +impl From for SessionError { + fn from(error: BytesReadError) -> Self { + SessionError { + value: SessionErrorValue::BytesReadError(error), + } + } +} + +impl From for SessionError { + fn from(error: BytesWriteError) -> Self { + SessionError { + value: SessionErrorValue::BytesWriteError(error), + } + } +} + +impl From for SessionError { + fn from(error: MpegError) -> Self { + SessionError { + value: SessionErrorValue::MpegError(error), + } + } +} + +impl From for SessionError { + fn from(error: Utf8Error) -> Self { + SessionError { + value: SessionErrorValue::Utf8Error(error), + } + } +} + +impl From for SessionError { + fn from(error: ChannelError) -> Self { + SessionError { + value: SessionErrorValue::ChannelError(error), + } + } +} + +impl From for SessionError { + fn from(error: RecvError) -> Self { + SessionError { + value: SessionErrorValue::RecvError(error), + } + } +} + +impl fmt::Display for SessionError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.value, f) + } +} + +impl Fail for SessionError { + fn cause(&self) -> Option<&dyn Fail> { + self.value.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.value.backtrace() + } +} diff --git a/protocol/gb28181/src/session/mod.rs b/protocol/gb28181/src/session/mod.rs new file mode 100644 index 00000000..346cc607 --- /dev/null +++ b/protocol/gb28181/src/session/mod.rs @@ -0,0 +1,323 @@ +pub mod errors; + +use streamhub::{ + define::{ + FrameData, FrameDataSender, InformationSender, NotifyInfo, PublishType, PublisherInfo, + StreamHubEvent, StreamHubEventSender, SubscribeType, TStreamHandler, + }, + errors::ChannelError, + statistics::StreamStatistics, + stream::StreamIdentifier, + utils::{RandomDigitCount, Uuid}, +}; + +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +use bytesio::bytesio::UdpIO; +use errors::SessionError; +use errors::SessionErrorValue; + +use std::fs::File; +use std::{sync::Arc, time::SystemTime}; +use streamhub::define::DataSender; +use tokio::sync::oneshot; + +use tokio::sync::mpsc; + +use async_trait::async_trait; +use bytesio::bytesio::TNetIO; + +use bytes::BytesMut; +use bytesio::bytes_reader::BytesReader; +use xmpegts::ps::errors::MpegPsError; +use xmpegts::ps::errors::MpegPsErrorValue; +use xmpegts::ps::ps_demuxer::PsDemuxer; +use xmpegts::{define::epsi_stream_type, errors::MpegErrorValue}; + +use std::io::prelude::*; +use xrtsp::rtp::RtpPacket; +use xrtsp::rtp::{rtp_queue::RtpQueue, utils::Unmarshal}; + +pub struct GB28181ServerSession { + pub session_id: Uuid, + pub local_port: u16, + stream_name: String, + io: Box, + event_sender: StreamHubEventSender, + stream_handler: Arc, + dump_file: Option, + dump_last_recv_timestamp: u64, + pub exit_sender: UnboundedSender<()>, + exit_receiver: UnboundedReceiver<()>, +} + +pub fn print(data: BytesMut) { + println!("==========={}", data.len()); + let mut idx = 0; + for i in data { + print!("{i:02X} "); + idx += 1; + if idx % 16 == 0 { + println!() + } + } + println!("===========") +} + +pub fn current_time() -> u64 { + let duration = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH); + + match duration { + Ok(result) => (result.as_millis()) as u64, + _ => 0, + } +} + +impl GB28181ServerSession { + pub async fn new( + // stream: UdpIO, + local_port: u16, + event_sender: StreamHubEventSender, + stream_name: String, + need_dump: bool, + ) -> Option { + let stream_handler = Arc::new(GB28181StreamHandler::new()); + let session_id = Uuid::new(RandomDigitCount::Zero); + + let dump_file = if need_dump { + let file_handler = File::create(format!("./{stream_name}.dump")).unwrap(); + Some(file_handler) + } else { + None + }; + + if let Some(udp_io) = UdpIO::new(local_port, None).await { + let local_port = udp_io.get_local_port().unwrap(); + let io: Box = Box::new(udp_io); + + let (exit_sender, exit_receiver) = mpsc::unbounded_channel::<()>(); + + return Some(Self { + local_port, + session_id, + io, + stream_name, + event_sender, + stream_handler, + dump_file, + dump_last_recv_timestamp: 0, + exit_sender, + exit_receiver, + }); + } + None + } + + pub fn dump(&mut self, data: &BytesMut) { + if let Some(f) = &mut self.dump_file { + let cur_time_delta = if self.dump_last_recv_timestamp == 0 { + self.dump_last_recv_timestamp = current_time(); + 0 + } else { + let cur_time = current_time(); + let cur_time_delta = (cur_time - self.dump_last_recv_timestamp) as u16; + self.dump_last_recv_timestamp = cur_time; + cur_time_delta + }; + + if let Err(err) = f.write_all(&cur_time_delta.to_be_bytes()) { + log::error!("dump time delta err: {}", err); + } + + let length = data.len() as u16; + log::trace!("dump length: {}", length); + if let Err(err) = f.write_all(&length.to_be_bytes()) { + log::error!("dump length err: {}", err); + } + + if let Err(err) = f.write_all(&data[..]) { + log::error!("dump data err: {}", err); + } + } + } + + pub async fn run(&mut self) -> Result<(), SessionError> { + let sender = self.publish_to_stream_hub().await?; + let mut ps_demuxer = self.new_ps_demuxer(sender); + + let mut bytes_reader = BytesReader::new(BytesMut::default()); + let mut rtp_queue = RtpQueue::new(200); + + loop { + tokio::select! { + rv = self.io.read() => { + let data = rv?; + self.dump(&data); + bytes_reader.extend_from_slice(&data[..]); + + let rtp_packet = RtpPacket::unmarshal(&mut bytes_reader)?; + rtp_queue.write_queue(rtp_packet); + + while let Some(rtp_packet) = rtp_queue.read_queue() { + if let Err(err) = ps_demuxer.demux(rtp_packet.payload) { + match err.value { + MpegErrorValue::MpegPsError(ps_err) => match ps_err.value { + MpegPsErrorValue::NotEnoughBytes => { + continue; + } + _ => { + return Err(SessionError { + value: SessionErrorValue::MpegError(ps_err.into()), + }); + } + }, + _ => { + return Err(SessionError { + value: SessionErrorValue::MpegError(err), + }); + } + } + } + } + } + _ = self.exit_receiver.recv()=>{ + self.unpublish_to_stream_hub()?; + break; + } + } + } + + Ok(()) + } + + fn new_ps_demuxer(&self, sender: UnboundedSender) -> PsDemuxer { + let handler = Box::new( + move |pts: u64, + _dts: u64, + stream_type: u8, + payload: BytesMut| + -> Result<(), MpegPsError> { + match stream_type { + epsi_stream_type::PSI_STREAM_H264 | epsi_stream_type::PSI_STREAM_H265 => { + let video_frame_data = FrameData::Video { + timestamp: pts as u32, + data: payload, + }; + log::trace!("receive video data"); + if let Err(err) = sender.send(video_frame_data) { + log::error!("send video frame err: {}", err); + } + } + epsi_stream_type::PSI_STREAM_AAC => { + let audio_frame_data = FrameData::Audio { + timestamp: pts as u32, + data: payload, + }; + log::trace!("receive audio data"); + if let Err(err) = sender.send(audio_frame_data) { + log::error!("send audio frame err: {}", err); + } + } + _ => {} + } + Ok(()) + }, + ); + + PsDemuxer::new(handler) + } + + pub async fn publish_to_stream_hub(&mut self) -> Result { + let publisher_info = PublisherInfo { + id: self.session_id, + pub_type: PublishType::PushPsStream, + pub_data_type: streamhub::define::PubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + }; + + let (event_result_sender, event_result_receiver) = oneshot::channel(); + + let publish_event = StreamHubEvent::Publish { + identifier: StreamIdentifier::GB28181 { + stream_name: self.stream_name.clone(), + }, + result_sender: event_result_sender, + info: publisher_info, + stream_handler: self.stream_handler.clone(), + }; + + if self.event_sender.send(publish_event).is_err() { + return Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }); + } + + let sender = event_result_receiver.await??.0.unwrap(); + Ok(sender) + } + + pub fn unpublish_to_stream_hub(&self) -> Result<(), SessionError> { + let unpublish_event = StreamHubEvent::UnPublish { + identifier: StreamIdentifier::GB28181 { + stream_name: self.stream_name.clone(), + }, + info: PublisherInfo { + id: self.session_id, + pub_type: PublishType::PushPsStream, + pub_data_type: streamhub::define::PubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + }, + }; + + let rv = self.event_sender.send(unpublish_event); + match rv { + Err(_) => { + log::error!( + "unpublish_to_stream_hub error.stream_name: {}", + self.stream_name + ); + Err(SessionError { + value: SessionErrorValue::StreamHubEventSendErr, + }) + } + Ok(()) => { + log::info!( + "unpublish_to_stream_hub successfully.stream name: {}", + self.stream_name + ); + Ok(()) + } + } + } +} + +#[derive(Default)] +pub struct GB28181StreamHandler {} + +impl GB28181StreamHandler { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TStreamHandler for GB28181StreamHandler { + async fn send_prior_data( + &self, + _sender: DataSender, + _sub_type: SubscribeType, + ) -> Result<(), ChannelError> { + Ok(()) + } + async fn get_statistic_data(&self) -> Option { + None + } + + async fn send_information(&self, _sender: InformationSender) {} +} diff --git a/protocol/hls/src/errors.rs b/protocol/hls/src/errors.rs index 8f1695f7..88c2a54e 100644 --- a/protocol/hls/src/errors.rs +++ b/protocol/hls/src/errors.rs @@ -8,7 +8,7 @@ use { tokio::sync::broadcast::error::RecvError, tokio::sync::oneshot::error::RecvError as OneshotRecvError, xflv::errors::FlvDemuxerError, - xmpegts::errors::MpegTsError, + xmpegts::errors::MpegError, }; #[derive(Debug)] @@ -40,9 +40,9 @@ pub enum MediaErrorValue { MetadataError(#[cause] MetadataError), #[fail(display = "flv demuxer error:{}", _0)] FlvDemuxerError(#[cause] FlvDemuxerError), - #[fail(display = "mpegts error:{}", _0)] - MpegTsError(#[cause] MpegTsError), - #[fail(display = "write file error:{}", _0)] + #[fail(display = "mpegts error:{}\n", _0)] + MpegTsError(#[cause] MpegError), + #[fail(display = "write file error:{}\n", _0)] IOError(#[cause] std::io::Error), } @@ -62,8 +62,8 @@ impl From for MediaError { } } -impl From for MediaError { - fn from(error: MpegTsError) -> Self { +impl From for MediaError { + fn from(error: MpegError) -> Self { MediaError { value: MediaErrorValue::MpegTsError(error), } diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index c9e6f5c5..7182795f 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -36,10 +36,10 @@ impl Flv2HlsRemuxer { pub fn new(duration: i64, app_name: String, stream_name: String, need_record: bool) -> Self { let mut ts_muxer = TsMuxer::new(); let audio_pid = ts_muxer - .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) + .add_stream(epsi_stream_type::PSI_STREAM_AAC) .unwrap(); let video_pid = ts_muxer - .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) + .add_stream(epsi_stream_type::PSI_STREAM_H264) .unwrap(); Self { @@ -162,7 +162,7 @@ impl Flv2HlsRemuxer { self.last_pts = pts; self.ts_muxer - .write(pid, pts * 90, dts * 90, flags, payload)?; + .write(pid, pts as u64 * 90, dts as u64 * 90, flags, payload)?; Ok(()) } diff --git a/protocol/httpflv/src/server.rs b/protocol/httpflv/src/server.rs index 3cb66da7..be9f03b0 100644 --- a/protocol/httpflv/src/server.rs +++ b/protocol/httpflv/src/server.rs @@ -49,6 +49,8 @@ async fn handle_connection( let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer)); resp.headers_mut() .insert("Access-Control-Allow-Origin", "*".parse().unwrap()); + resp.headers_mut() + .insert("Transfer-Encoding", "chunked".parse().unwrap()); Ok(resp) } diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index 4bf416b4..adea4baf 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -208,6 +208,7 @@ impl ChunkPacketizer { } } } + self.writer.flush().await?; Ok(()) diff --git a/protocol/rtmp/src/remuxer/gb281812rtmp.rs b/protocol/rtmp/src/remuxer/gb281812rtmp.rs new file mode 100644 index 00000000..70dab158 --- /dev/null +++ b/protocol/rtmp/src/remuxer/gb281812rtmp.rs @@ -0,0 +1,297 @@ +use super::{ + errors::{RtmpRemuxerError, RtmpRemuxerErrorValue}, + rtmp_cooker::RtmpCooker, +}; +use crate::session::define::SessionType; +use bytes::BytesMut; +use bytesio::bytes_reader::BytesReader; +use h264_decoder::sps::SpsParser; +use streamhub::define::VideoCodecType; +use tokio::sync::oneshot; +use xflv::define::h264_nal_type::{H264_NAL_IDR, H264_NAL_PPS, H264_NAL_SPS}; + +use { + crate::session::common::Common, + std::time::Duration, + streamhub::{ + define::{ + FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender, + SubscribeType, SubscriberInfo, + }, + stream::StreamIdentifier, + utils::{RandomDigitCount, Uuid}, + }, + tokio::{sync::mpsc, time::sleep}, +}; +pub struct GB281812RtmpRemuxerSession { + event_producer: StreamHubEventSender, + //RTMP + app_name: String, + stream_name: String, + + publishe_id: Uuid, + //GB28181 + data_receiver: FrameDataReceiver, + subscribe_id: Uuid, + video_clock_rate: u32, + audio_clock_rate: u32, + base_video_timestamp: u32, + base_audio_timestamp: u32, + rtmp_handler: Common, + rtmp_cooker: RtmpCooker, + sps: Option, + pps: Option, + is_seq_header_generated: bool, +} + +pub fn find_start_code(nalus: &[u8]) -> Option { + let pattern = [0x00, 0x00, 0x01]; + nalus.windows(pattern.len()).position(|w| w == pattern) +} + +impl GB281812RtmpRemuxerSession { + pub fn new(stream_name: String, event_producer: StreamHubEventSender) -> Self { + let (_, data_consumer) = mpsc::unbounded_channel(); + + Self { + app_name: String::from("gb28181"), + stream_name, + data_receiver: data_consumer, + event_producer: event_producer.clone(), + subscribe_id: Uuid::new(RandomDigitCount::Four), + publishe_id: Uuid::new(RandomDigitCount::Four), + video_clock_rate: 90 * 1000, + audio_clock_rate: 90 * 1000, + base_audio_timestamp: 0, + base_video_timestamp: 0, + rtmp_handler: Common::new(None, event_producer, SessionType::Server, None), + rtmp_cooker: RtmpCooker::default(), + sps: None, + pps: None, + is_seq_header_generated: false, + } + } + + pub async fn run(&mut self) -> Result<(), RtmpRemuxerError> { + self.publish_rtmp().await?; + self.subscribe_gb28181().await?; + self.receive_gb28181_data().await?; + + Ok(()) + } + + pub async fn publish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { + self.rtmp_handler + .publish_to_channels( + self.app_name.clone(), + self.stream_name.clone(), + self.publishe_id, + 0, + ) + .await?; + Ok(()) + } + + pub async fn unpublish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { + self.rtmp_handler + .unpublish_to_channels( + self.app_name.clone(), + self.stream_name.clone(), + self.publishe_id, + ) + .await?; + Ok(()) + } + + pub async fn subscribe_gb28181(&mut self) -> Result<(), RtmpRemuxerError> { + let (event_result_sender, event_result_receiver) = oneshot::channel(); + + let sub_info = SubscriberInfo { + id: self.subscribe_id, + sub_type: SubscribeType::PlayerRtmp, + sub_data_type: streamhub::define::SubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + }; + + let subscribe_event = StreamHubEvent::Subscribe { + identifier: StreamIdentifier::GB28181 { + stream_name: self.stream_name.clone(), + }, + info: sub_info, + result_sender: event_result_sender, + }; + + if self.event_producer.send(subscribe_event).is_err() { + return Err(RtmpRemuxerError { + value: RtmpRemuxerErrorValue::StreamHubEventSendErr, + }); + } + + let receiver = event_result_receiver.await??; + self.data_receiver = receiver.frame_receiver.unwrap(); + Ok(()) + } + + pub async fn unsubscribe_gb28181(&mut self) -> Result<(), RtmpRemuxerError> { + let sub_info = SubscriberInfo { + id: self.subscribe_id, + sub_type: SubscribeType::PlayerRtmp, + sub_data_type: streamhub::define::SubDataType::Frame, + notify_info: NotifyInfo { + request_url: String::from(""), + remote_addr: String::from(""), + }, + }; + + let subscribe_event = StreamHubEvent::UnSubscribe { + identifier: StreamIdentifier::GB28181 { + stream_name: self.stream_name.clone(), + }, + info: sub_info, + }; + if let Err(err) = self.event_producer.send(subscribe_event) { + log::error!("unsubscribe_from_channels err {}\n", err); + } + + Ok(()) + } + + pub async fn receive_gb28181_data(&mut self) -> Result<(), RtmpRemuxerError> { + let mut retry_count = 0; + + loop { + if let Some(data) = self.data_receiver.recv().await { + match data { + FrameData::Audio { timestamp, data } => { + self.on_gb28181_audio(&data, timestamp).await? + } + FrameData::Video { timestamp, data } => { + self.on_gb28181_video(data, timestamp).await?; + } + FrameData::MediaInfo { media_info } => { + self.video_clock_rate = media_info.video_clock_rate; + self.audio_clock_rate = media_info.audio_clock_rate; + log::info!( + "audio clock rate: {} video clock rate: {}", + self.audio_clock_rate, + self.video_clock_rate + ); + + if media_info.vcodec == VideoCodecType::H265 { + log::warn!( + "h265 rtsp to rtmp is not supported now!!! will come soon!!" + ); + break; + } + } + _ => continue, + }; + retry_count = 0; + } else { + sleep(Duration::from_millis(100)).await; + retry_count += 1; + } + + if retry_count > 10 { + break; + } + } + + self.unsubscribe_gb28181().await?; + self.unpublish_rtmp().await + } + + async fn on_gb28181_audio( + &mut self, + audio_data: &BytesMut, + timestamp: u32, + ) -> Result<(), RtmpRemuxerError> { + if self.base_audio_timestamp == 0 { + self.base_audio_timestamp = timestamp; + } + let audio_frame = self.rtmp_cooker.gen_audio_frame_data(audio_data)?; + + let timestamp_adjust = + (timestamp - self.base_audio_timestamp) / (self.audio_clock_rate / 1000); + self.rtmp_handler + .on_audio_data(&audio_frame, ×tamp_adjust) + .await?; + + Ok(()) + } + + async fn on_gb28181_video( + &mut self, + nalu: BytesMut, + timestamp: u32, + ) -> Result<(), RtmpRemuxerError> { + if self.base_video_timestamp == 0 { + self.base_video_timestamp = timestamp; + } + + let mut contains_idr = false; + let mut nalu_reader = BytesReader::new(nalu.clone()); + + let nalu_type = nalu_reader.read_u8()?; + let mut is_av = true; + match nalu_type & 0x1F { + H264_NAL_SPS => { + self.sps = Some(nalu.clone()); + is_av = false; + } + H264_NAL_PPS => { + self.pps = Some(nalu.clone()); + is_av = false; + } + H264_NAL_IDR => { + contains_idr = true; + } + _ => {} + } + + // the first sps + pps + idr frame compose the SEQ header + if self.sps.is_some() && self.pps.is_some() && !self.is_seq_header_generated { + let width: u32; + let height: u32; + + let mut sps_parser = SpsParser::new(BytesReader::new(self.sps.clone().unwrap())); + (width, height) = if let Ok((width, height)) = sps_parser.parse() { + (width, height) + } else { + (0, 0) + }; + + log::info!("receive SPS...width:{}x{}", width, height); + let level = sps_parser.sps.level_idc; + let profile = sps_parser.sps.profile_idc; + let mut meta_data = self.rtmp_cooker.gen_meta_data(width, height)?; + self.rtmp_handler.on_meta_data(&mut meta_data, &0).await?; + + let mut seq_header = self.rtmp_cooker.gen_video_seq_header( + self.sps.clone().unwrap(), + self.pps.clone().unwrap(), + profile, + level, + )?; + self.sps = None; + self.pps = None; + self.rtmp_handler.on_video_data(&mut seq_header, &0).await?; + self.is_seq_header_generated = true; + } else if is_av { + let mut frame_data = self + .rtmp_cooker + .gen_video_frame_data(vec![nalu], contains_idr)?; + + let timestamp_adjust = + (timestamp - self.base_video_timestamp) / (self.video_clock_rate / 1000); + self.rtmp_handler + .on_video_data(&mut frame_data, ×tamp_adjust) + .await?; + } + + Ok(()) + } +} diff --git a/protocol/rtmp/src/remuxer/mod.rs b/protocol/rtmp/src/remuxer/mod.rs index 94c2cc85..80058989 100644 --- a/protocol/rtmp/src/remuxer/mod.rs +++ b/protocol/rtmp/src/remuxer/mod.rs @@ -1,4 +1,6 @@ pub mod errors; +pub mod gb281812rtmp; +pub mod rtmp_cooker; pub mod rtsp2rtmp; use streamhub::{ @@ -6,6 +8,8 @@ use streamhub::{ stream::StreamIdentifier, }; +use crate::remuxer::gb281812rtmp::GB281812RtmpRemuxerSession; + use self::{errors::RtmpRemuxerError, rtsp2rtmp::Rtsp2RtmpRemuxerSession}; //Receive publish event from stream hub and @@ -29,8 +33,8 @@ impl RtmpRemuxer { let val = self.receiver.recv().await?; log::info!("{:?}", val); match val { - BroadcastEvent::Publish { identifier } => { - if let StreamIdentifier::Rtsp { stream_path } = identifier { + BroadcastEvent::Publish { identifier } => match identifier { + StreamIdentifier::Rtsp { stream_path } => { let mut session = Rtsp2RtmpRemuxerSession::new(stream_path, self.event_producer.clone()); tokio::spawn(async move { @@ -39,7 +43,19 @@ impl RtmpRemuxer { } }); } - } + StreamIdentifier::GB28181 { stream_name } => { + let mut session = GB281812RtmpRemuxerSession::new( + stream_name, + self.event_producer.clone(), + ); + tokio::spawn(async move { + if let Err(err) = session.run().await { + log::error!("gb281812rtmp session error: {}\n", err); + } + }); + } + _ => {} + }, _ => { log::trace!("other infos..."); } diff --git a/protocol/rtmp/src/remuxer/rtmp_cooker.rs b/protocol/rtmp/src/remuxer/rtmp_cooker.rs new file mode 100644 index 00000000..e0b208f1 --- /dev/null +++ b/protocol/rtmp/src/remuxer/rtmp_cooker.rs @@ -0,0 +1,124 @@ +use bytes::BytesMut; +use bytesio::bytes_writer::BytesWriter; +use indexmap::IndexMap; + +use xflv::{ + flv_tag_header::{AudioTagHeader, VideoTagHeader}, + mpeg4_avc::{Mpeg4Avc, Mpeg4AvcProcessor, Pps, Sps}, + Marshal, +}; + +use super::errors::RtmpRemuxerError; +use crate::amf0::{amf0_writer::Amf0Writer, Amf0ValueType}; + +#[derive(Default)] +pub struct RtmpCooker {} + +impl RtmpCooker { + pub fn gen_meta_data(&self, width: u32, height: u32) -> Result { + let mut amf_writer = Amf0Writer::new(); + amf_writer.write_string(&String::from("@setDataFrame"))?; + amf_writer.write_string(&String::from("onMetaData"))?; + + let mut properties = IndexMap::new(); + properties.insert(String::from("width"), Amf0ValueType::Number(width as f64)); + properties.insert(String::from("height"), Amf0ValueType::Number(height as f64)); + properties.insert(String::from("videocodecid"), Amf0ValueType::Number(7.)); + properties.insert(String::from("audiocodecid"), Amf0ValueType::Number(10.)); + amf_writer.write_eacm_array(&properties)?; + + Ok(amf_writer.extract_current_bytes()) + } + pub fn gen_video_seq_header( + &self, + sps: BytesMut, + pps: BytesMut, + profile: u8, + level: u8, + ) -> Result { + let video_tag_header = VideoTagHeader { + frame_type: 1, + codec_id: 7, + avc_packet_type: 0, + composition_time: 0, + }; + let tag_header_data = video_tag_header.marshal()?; + + let mut processor = Mpeg4AvcProcessor { + mpeg4_avc: Mpeg4Avc { + profile, + compatibility: 0, + level, + nalu_length: 4, + nb_pps: 1, + sps: vec![Sps { data: sps }], + nb_sps: 1, + pps: vec![Pps { data: pps }], + ..Default::default() + }, + }; + let mpegavc_data = processor.decoder_configuration_record_save()?; + + let mut writer = BytesWriter::new(); + writer.write(&tag_header_data)?; + writer.write(&mpegavc_data)?; + + Ok(writer.extract_current_bytes()) + } + + pub fn gen_video_frame_data( + &self, + nalus: Vec, + contains_idr: bool, + ) -> Result { + let frame_type = if contains_idr { 1 } else { 2 }; + let video_tag_header = VideoTagHeader { + frame_type, + codec_id: 7, + avc_packet_type: 1, + composition_time: 0, + }; + let tag_header_data = video_tag_header.marshal()?; + + let mut processor = Mpeg4AvcProcessor { + mpeg4_avc: Mpeg4Avc { + nalu_length: 4, + ..Default::default() + }, + }; + let mpegavc_data = processor.nalus_to_mpeg4avc(nalus)?; + + let mut writer = BytesWriter::new(); + writer.write(&tag_header_data)?; + writer.write(&mpegavc_data)?; + + Ok(writer.extract_current_bytes()) + } + //generate audio rtmp frame (including seq header and common frame) + pub fn gen_audio_frame_data( + &self, + audio_data: &BytesMut, + ) -> Result { + let mut aac_packet_type: u8 = 0; + + if audio_data.len() > 5 { + aac_packet_type = 1; + } + + let audio_tag_header = AudioTagHeader { + sound_format: 10, + sound_rate: 3, + sound_size: 1, + sound_type: 1, + aac_packet_type, + }; + + let tag_header_data = audio_tag_header.marshal()?; + + let mut writer = BytesWriter::new(); + writer.write(&tag_header_data)?; + writer.write(audio_data)?; + + Ok(writer.extract_current_bytes()) + } +} diff --git a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs index 25d7c595..466e57f2 100644 --- a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs +++ b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs @@ -1,22 +1,16 @@ use bytes::BytesMut; -use bytesio::{bytes_reader::BytesReader, bytes_writer::BytesWriter}; +use bytesio::bytes_reader::BytesReader; use h264_decoder::sps::SpsParser; -use indexmap::IndexMap; use streamhub::define::VideoCodecType; use tokio::sync::oneshot; -use xflv::{ - define::h264_nal_type::{H264_NAL_IDR, H264_NAL_PPS, H264_NAL_SPS}, - flv_tag_header::{AudioTagHeader, VideoTagHeader}, - mpeg4_avc::{Mpeg4Avc, Mpeg4AvcProcessor, Pps, Sps}, - Marshal, -}; +use xflv::define::h264_nal_type::{H264_NAL_IDR, H264_NAL_PPS, H264_NAL_SPS}; -use crate::{ - amf0::{amf0_writer::Amf0Writer, Amf0ValueType}, - session::define::SessionType, -}; +use crate::session::define::SessionType; -use super::errors::{RtmpRemuxerError, RtmpRemuxerErrorValue}; +use super::{ + errors::{RtmpRemuxerError, RtmpRemuxerErrorValue}, + rtmp_cooker::RtmpCooker, +}; use { crate::session::common::Common, @@ -36,7 +30,7 @@ pub struct Rtsp2RtmpRemuxerSession { //RTMP app_name: String, stream_name: String, - rtmp_handler: Common, + publishe_id: Uuid, //RTSP data_receiver: FrameDataReceiver, @@ -46,6 +40,9 @@ pub struct Rtsp2RtmpRemuxerSession { audio_clock_rate: u32, base_video_timestamp: u32, base_audio_timestamp: u32, + + rtmp_handler: Common, + rtmp_cooker: RtmpCooker, } pub fn find_start_code(nalus: &[u8]) -> Option { @@ -74,13 +71,15 @@ impl Rtsp2RtmpRemuxerSession { stream_name, data_receiver: data_consumer, event_producer: event_producer.clone(), - rtmp_handler: Common::new(None, event_producer, SessionType::Server, None), + subscribe_id: Uuid::new(RandomDigitCount::Four), publishe_id: Uuid::new(RandomDigitCount::Four), video_clock_rate: 1000, audio_clock_rate: 1000, base_audio_timestamp: 0, base_video_timestamp: 0, + rtmp_handler: Common::new(None, event_producer, SessionType::Server, None), + rtmp_cooker: RtmpCooker::default(), } } @@ -177,10 +176,9 @@ impl Rtsp2RtmpRemuxerSession { loop { if let Some(data) = self.data_receiver.recv().await { match data { - FrameData::Audio { - timestamp, - mut data, - } => self.on_rtsp_audio(&mut data, timestamp).await?, + FrameData::Audio { timestamp, data } => { + self.on_rtsp_audio(&data, timestamp).await? + } FrameData::Video { timestamp, mut data, @@ -222,34 +220,19 @@ impl Rtsp2RtmpRemuxerSession { async fn on_rtsp_audio( &mut self, - audio_data: &mut BytesMut, + audio_data: &BytesMut, timestamp: u32, ) -> Result<(), RtmpRemuxerError> { if self.base_audio_timestamp == 0 { self.base_audio_timestamp = timestamp; } - let mut audio_tag_header = AudioTagHeader { - sound_format: 10, - sound_rate: 3, - sound_size: 1, - sound_type: 1, - aac_packet_type: 0, - }; - - if audio_data.len() > 5 { - audio_tag_header.aac_packet_type = 1; - } - - let tag_header_data = audio_tag_header.marshal()?; - let mut writer = BytesWriter::new(); - writer.write(&tag_header_data)?; - writer.write(audio_data)?; + let audio_frame = self.rtmp_cooker.gen_audio_frame_data(audio_data)?; let timestamp_adjust = (timestamp - self.base_audio_timestamp) / (self.audio_clock_rate / 1000); self.rtmp_handler - .on_audio_data(&mut writer.extract_current_bytes(), ×tamp_adjust) + .on_audio_data(&audio_frame, ×tamp_adjust) .await?; Ok(()) @@ -320,14 +303,20 @@ impl Rtsp2RtmpRemuxerSession { } if sps.is_some() && pps.is_some() { - let mut meta_data = self.gen_rtmp_meta_data(width, height)?; + let mut meta_data = self.rtmp_cooker.gen_meta_data(width, height)?; self.rtmp_handler.on_meta_data(&mut meta_data, &0).await?; - let mut seq_header = - self.gen_rtmp_video_seq_header(sps.unwrap(), pps.unwrap(), profile, level)?; + let mut seq_header = self.rtmp_cooker.gen_video_seq_header( + sps.unwrap(), + pps.unwrap(), + profile, + level, + )?; self.rtmp_handler.on_video_data(&mut seq_header, &0).await?; } else { - let mut frame_data = self.gen_rtmp_video_frame_data(nalu_vec, contains_idr)?; + let mut frame_data = self + .rtmp_cooker + .gen_video_frame_data(nalu_vec, contains_idr)?; let timestamp_adjust = (timestamp - self.base_video_timestamp) / (self.video_clock_rate / 1000); @@ -338,84 +327,4 @@ impl Rtsp2RtmpRemuxerSession { Ok(()) } - - fn gen_rtmp_meta_data(&self, width: u32, height: u32) -> Result { - let mut amf_writer = Amf0Writer::new(); - amf_writer.write_string(&String::from("@setDataFrame"))?; - amf_writer.write_string(&String::from("onMetaData"))?; - - let mut properties = IndexMap::new(); - properties.insert(String::from("width"), Amf0ValueType::Number(width as f64)); - properties.insert(String::from("height"), Amf0ValueType::Number(height as f64)); - properties.insert(String::from("videocodecid"), Amf0ValueType::Number(7.)); - properties.insert(String::from("audiocodecid"), Amf0ValueType::Number(10.)); - amf_writer.write_eacm_array(&properties)?; - - Ok(amf_writer.extract_current_bytes()) - } - fn gen_rtmp_video_seq_header( - &self, - sps: BytesMut, - pps: BytesMut, - profile: u8, - level: u8, - ) -> Result { - let video_tag_header = VideoTagHeader { - frame_type: 1, - codec_id: 7, - avc_packet_type: 0, - composition_time: 0, - }; - let tag_header_data = video_tag_header.marshal()?; - - let mut processor = Mpeg4AvcProcessor { - mpeg4_avc: Mpeg4Avc { - profile, - compatibility: 0, - level, - nalu_length: 4, - nb_pps: 1, - sps: vec![Sps { data: sps }], - nb_sps: 1, - pps: vec![Pps { data: pps }], - ..Default::default() - }, - }; - let mpegavc_data = processor.decoder_configuration_record_save()?; - - let mut writer = BytesWriter::new(); - writer.write(&tag_header_data)?; - writer.write(&mpegavc_data)?; - - Ok(writer.extract_current_bytes()) - } - - fn gen_rtmp_video_frame_data( - &self, - nalus: Vec, - contains_idr: bool, - ) -> Result { - let frame_type = if contains_idr { 1 } else { 2 }; - let video_tag_header = VideoTagHeader { - frame_type, - codec_id: 7, - avc_packet_type: 1, - composition_time: 0, - }; - let tag_header_data = video_tag_header.marshal()?; - - let mut processor = Mpeg4AvcProcessor { - mpeg4_avc: Mpeg4Avc { - nalu_length: 4, - ..Default::default() - }, - }; - let mpegavc_data = processor.nalus_to_mpeg4avc(nalus)?; - - let mut writer = BytesWriter::new(); - writer.write(&tag_header_data)?; - writer.write(&mpegavc_data)?; - - Ok(writer.extract_current_bytes()) - } } diff --git a/protocol/rtmp/src/session/common.rs b/protocol/rtmp/src/session/common.rs index a7cb24ec..f463e285 100644 --- a/protocol/rtmp/src/session/common.rs +++ b/protocol/rtmp/src/session/common.rs @@ -194,7 +194,7 @@ impl Common { pub async fn on_audio_data( &mut self, - data: &mut BytesMut, + data: &BytesMut, timestamp: &u32, ) -> Result<(), SessionError> { let channel_data = FrameData::Audio { diff --git a/protocol/rtsp/src/rtp/mod.rs b/protocol/rtsp/src/rtp/mod.rs index afdf1bf3..0842036c 100644 --- a/protocol/rtsp/src/rtp/mod.rs +++ b/protocol/rtsp/src/rtp/mod.rs @@ -5,6 +5,7 @@ pub mod rtp_aac; pub mod rtp_h264; pub mod rtp_h265; pub mod rtp_header; +pub mod rtp_queue; pub mod utils; use byteorder::BigEndian; diff --git a/protocol/rtsp/src/rtp/rtcp/rtcp_context.rs b/protocol/rtsp/src/rtp/rtcp/rtcp_context.rs index f7fbc2ea..7563eca8 100644 --- a/protocol/rtsp/src/rtp/rtcp/rtcp_context.rs +++ b/protocol/rtsp/src/rtp/rtcp/rtcp_context.rs @@ -15,11 +15,7 @@ use super::{ //the new is 2 and old is 65534, the distance between 2 and 65534 is 4 which is //65535 - 65534 + 2 + 1.(65533,65534,65535,0,1,2) pub fn distance(new: u16, old: u16) -> u16 { - if new < old { - 65535 - old + new + 1 - } else { - new - old - } + new.wrapping_sub(old) } const MIN_SEQUENTIAL: u32 = 2; @@ -248,3 +244,19 @@ impl RtcpContext { self.last_rtp_timestamp = pkt.header.timestamp; } } + +#[cfg(test)] +mod tests { + + use super::distance; + #[test] + fn test_distance() { + assert_eq!(distance(0, 0), 0); + assert_eq!(distance(2, 0), 2); + assert_eq!(distance(32767, 0), 32767); + assert_eq!(distance(65535, 0), 65535); + + assert_eq!(distance(0, 65535), 1); + assert_eq!(distance(0, 2), 65534); + } +} diff --git a/protocol/rtsp/src/rtp/rtp_queue.rs b/protocol/rtsp/src/rtp/rtp_queue.rs new file mode 100644 index 00000000..d57821f7 --- /dev/null +++ b/protocol/rtsp/src/rtp/rtp_queue.rs @@ -0,0 +1,249 @@ +use super::RtpPacket; +use std::collections::VecDeque; + +const MIN_SEQUENTIAL: usize = 2; +const RTP_SEQ_MOD: u32 = 1 << 16; +const MAX_DROPOUT: u16 = 3000; +const MAX_MISORDER: u16 = 100; + +pub struct RtpQueue { + cycles: u32, /* shifted count of seq. number cycles */ + bad_seq: u16, /* last 'bad' seq number + 1 */ + probation: usize, /* sequ. packets till source is valid */ + + first_ordered_seq: u16, /* the first ordered sequence number needs to be sent out */ + max_cache_size: usize, + cache: VecDeque, + bad_cache: Vec, +} + +impl RtpQueue { + pub fn new(max_cache_size: usize) -> Self { + RtpQueue { + cycles: 0, + bad_seq: 0, + probation: MIN_SEQUENTIAL, + first_ordered_seq: 0, + max_cache_size, + cache: VecDeque::new(), + bad_cache: Vec::new(), + } + } + fn front_seq(&self) -> u16 { + if let Some(pkt) = self.cache.front() { + pkt.header.seq_number + } else { + 0 + } + } + + fn back_seq(&self) -> u16 { + if let Some(pkt) = self.cache.back() { + pkt.header.seq_number + } else { + 0 + } + } + + fn clear_cache(&mut self) { + self.cache.clear(); + self.probation = MIN_SEQUENTIAL; + } + + fn clear_bad_cache(&mut self) { + self.bad_cache.clear(); + self.bad_seq = 0; + } + fn insert(&mut self, packet: RtpPacket) { + let cur_seq_number = packet.header.seq_number; + + let cur_cache_size = self.cache.len(); + + for (index, item) in self.cache.iter_mut().rev().enumerate() { + // let delta = cur_seq_number.wrapping_sub(item.header.seq_number) as i16; + // if delta == 0 { + // break; + // } else if delta > 0 { + // self.cache.insert(cur_cache_size - index, packet); + // break; + // } + + match cur_seq_number.wrapping_sub(item.header.seq_number) as i16 { + 0 => { + break; + } + 1.. => { + self.cache.insert(cur_cache_size - index, packet); + break; + } + + _ => {} + } + } + } + + fn get_seqs(&self) -> String { + let mut res: String = String::from(""); + for ele in &self.cache { + res += ele.header.seq_number.to_string().as_str(); + res += ","; + } + + res + } + pub fn write_queue(&mut self, packet: RtpPacket) { + let cur_seq_number = packet.header.seq_number; + + log::debug!( + "write queue: {}, cache size:{}, queue: {}", + cur_seq_number, + self.cache.len(), + self.get_seqs() + ); + + if self.probation > 0 { + if self.cache.is_empty() { + self.cache.push_back(packet); + return; + } + + if packet.header.seq_number == self.back_seq().wrapping_add(1) { + self.probation -= 1; + if self.probation == 0 { + if let Some(pkt) = self.cache.front() { + self.first_ordered_seq = pkt.header.seq_number; + } + } + } else { + self.clear_cache(); + } + + self.cache.push_back(packet); + } else { + let delta = cur_seq_number.wrapping_sub(self.back_seq()); + + if delta == 0 { + log::debug!("duplicate"); + //duplicate + return; + } else if delta < MAX_DROPOUT { + log::debug!("with permissible gap"); + /* in order, with permissible gap */ + if cur_seq_number < self.back_seq() { + /* + * Sequence number wrapped - count another 64K cycle. + */ + self.cycles += RTP_SEQ_MOD; + } + self.cache.push_back(packet); + } else if self.back_seq().wrapping_sub(cur_seq_number) + < self.back_seq().wrapping_sub(self.front_seq()) + { + log::debug!("reordered packet"); + //reordered packet + self.insert(packet); + } else if self.front_seq().wrapping_sub(cur_seq_number) < MAX_MISORDER { + log::debug!("mis order"); + //too late + return; + } else { + log::debug!("bad"); + if self.bad_cache.is_empty() || cur_seq_number == self.bad_seq { + self.bad_cache.push(packet); + self.bad_seq = cur_seq_number.wrapping_add(1); + } else { + self.clear_bad_cache(); + } + + // Two sequential packets -- assume that the other side + // restarted without telling us so just re-sync + // (i.e., pretend this was the first packet). + if self.bad_cache.len() >= MIN_SEQUENTIAL { + self.cache.extend(self.bad_cache.to_owned()); + self.clear_bad_cache(); + } + return; + } + self.clear_bad_cache(); + } + } + + pub fn read_queue(&mut self) -> Option { + if self.cache.is_empty() || self.probation > 0 { + return None; + } + + let first_packet = self.cache.front().unwrap().to_owned(); + if self.first_ordered_seq == first_packet.header.seq_number { + self.first_ordered_seq = self.first_ordered_seq.wrapping_add(1); + } else { + if self.cache.len() < self.max_cache_size { + return None; + } + self.first_ordered_seq = first_packet.header.seq_number.wrapping_add(1); + } + + self.cache.pop_front(); + Some(first_packet) + } +} + +#[cfg(test)] +mod tests { + use super::RtpQueue; + use crate::rtp::{rtp_header::RtpHeader, RtpPacket}; + use rand::Rng; + + #[test] + pub fn test_seqnumber() { + let aa: u16 = 32768; + let bb: u16 = 65535; + + println!("{}", aa.wrapping_sub(bb) as i16); + println!("{}", bb.wrapping_sub(aa) as i16); + } + + #[test] + pub fn test_read_write_queue() { + let mut rng = rand::thread_rng(); + let mut rtp_queue = RtpQueue::new(100); + + for i in 0..3 { + let mut rtp_packet = RtpPacket::new(RtpHeader::default()); + rtp_packet.header.seq_number = i; + rtp_queue.write_queue(rtp_packet); + } + + for _ in 0..90 { + let random_number: u16 = rng.gen_range(0..90); + let mut rtp_packet = RtpPacket::new(RtpHeader::default()); + rtp_packet.header.seq_number = random_number; + rtp_queue.write_queue(rtp_packet); + } + + while let Some(packet) = rtp_queue.read_queue() { + println!("rtp packet number: {}", packet.header.seq_number); + } + + // aa.saturating_sub(rhs) + } + + #[test] + pub fn test_match() { + let aa = -1; + + match aa { + 0 => { + println!("0") + } + 1.. => { + println!("bigger than 0") + } + _ => { + println!("smaller than 0") + } + } + + // aa.saturating_sub(rhs) + } +} diff --git a/protocol/rtsp/src/rtp/utils.rs b/protocol/rtsp/src/rtp/utils.rs index 6f9ac3e0..bf5cfea3 100644 --- a/protocol/rtsp/src/rtp/utils.rs +++ b/protocol/rtsp/src/rtp/utils.rs @@ -91,7 +91,8 @@ pub async fn split_annexb_and_process( let mut nalu_with_start_code = if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) { let mut second_pos = first_pos + 3 + distance_to_first_pos; - while second_pos > 0 && nalus[second_pos - 1] == 0 { + //judge if the start code is [0x00,0x00,0x00,0x01] + if second_pos > 0 && nalus[second_pos - 1] == 0 { second_pos -= 1; } nalus.split_to(second_pos) @@ -131,8 +132,8 @@ mod tests { pub fn test_annexb_split() { let mut nalus = BytesMut::new(); nalus.extend_from_slice(&[ - 0x00, 0x00, 0x01, 0x02, 0x03, 0x05, 0x06, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, - 0x00, 0x00, 0x01, 0x02, 0x03, + 0x00, 0x00, 0x01, 0x02, 0x03, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, + 0x04, 0x00, 0x00, 0x01, 0x02, 0x03, ]); while !nalus.is_empty() { @@ -145,7 +146,7 @@ mod tests { if let Some(distance_to_first_pos) = find_start_code(&nalus[first_pos + 3..]) { let mut second_pos = first_pos + 3 + distance_to_first_pos; println!("left: {first_pos} right: {distance_to_first_pos}"); - while second_pos > 0 && nalus[second_pos - 1] == 0 { + if second_pos > 0 && nalus[second_pos - 1] == 0 { second_pos -= 1; } // while nalus[pos_right ] diff --git a/protocol/rtsp/src/session/mod.rs b/protocol/rtsp/src/session/mod.rs index 7c835ff4..4b7faf66 100644 --- a/protocol/rtsp/src/session/mod.rs +++ b/protocol/rtsp/src/session/mod.rs @@ -357,7 +357,9 @@ impl RtspServerSession { }; let address = rtsp_request.address.clone(); - if let Some(rtp_io) = UdpIO::new(address.clone(), rtp_port, 0).await { + + let remote_address = format!("{}:{rtp_port}", address.clone()); + if let Some(rtp_io) = UdpIO::new(0, Some(remote_address)).await { rtp_server_port = rtp_io.get_local_port(); let box_udp_io: Box = Box::new(rtp_io); @@ -369,8 +371,9 @@ impl RtspServerSession { } } + let rtcp_remote_address = format!("{}:{rtcp_port}", address.clone()); if let Some(rtcp_io) = - UdpIO::new(address.clone(), rtcp_port, rtp_server_port.unwrap() + 1) + UdpIO::new(rtp_server_port.unwrap() + 1, Some(rtcp_remote_address)) .await { rtcp_server_port = rtcp_io.get_local_port();