From 42cf7398571e5e6c6a2c970d414050b56a86bee1 Mon Sep 17 00:00:00 2001 From: Phineas Date: Sat, 23 Jul 2022 01:25:19 +0100 Subject: [PATCH 01/10] implement llhls chunking --- application/xiu/Cargo.toml | 2 +- protocol/hls/src/flv2hls.rs | 29 ++++++++++++++++++++++++++- protocol/hls/src/flv_data_receiver.rs | 2 +- protocol/hls/src/m3u8.rs | 16 ++++++++++++++- protocol/hls/src/server.rs | 10 ++++++++- protocol/hls/src/test_flv2hls.rs | 5 +++-- protocol/hls/src/ts.rs | 18 ++++++++++++++--- 7 files changed, 72 insertions(+), 10 deletions(-) diff --git a/application/xiu/Cargo.toml b/application/xiu/Cargo.toml index 717c2827..1e08a574 100644 --- a/application/xiu/Cargo.toml +++ b/application/xiu/Cargo.toml @@ -22,7 +22,7 @@ failure = "0.1.1" rtmp = "0.0.14"#{path = "../../protocol/rtmp/"}#"0.0.4" httpflv = "0.0.7"#{path = "../../protocol/httpflv/"} -hls = "0.0.10"#{path = "../../protocol/hls/"} +hls = {path = "../../protocol/hls/"} # rtmp = {path = "../../protocol/rtmp/"}#"0.0.4" # httpflv = {path = "../../protocol/httpflv/"} diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index a54cf42e..5e8fe132 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -18,13 +18,16 @@ pub struct Flv2HlsRemuxer { ts_muxer: TsMuxer, last_ts_dts: i64, + last_partial_ts_dts: i64, last_ts_pts: i64, last_dts: i64, last_pts: i64, duration: i64, + partial_seg_duration: i64, need_new_segment: bool, + need_new_partial_segment: bool, video_pid: u16, audio_pid: u16, @@ -33,7 +36,12 @@ pub struct Flv2HlsRemuxer { } impl Flv2HlsRemuxer { - pub fn new(duration: i64, app_name: String, stream_name: String) -> Self { + pub fn new( + duration: i64, + partial_seg_duration: i64, + app_name: String, + stream_name: String, + ) -> Self { let mut ts_muxer = TsMuxer::new(); let audio_pid = ts_muxer .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) @@ -51,13 +59,16 @@ impl Flv2HlsRemuxer { ts_muxer, last_ts_dts: 0, + last_partial_ts_dts: 0, last_ts_pts: 0, last_dts: 0, last_pts: 0, duration, + partial_seg_duration, need_new_segment: false, + need_new_partial_segment: false, video_pid, audio_pid, @@ -114,6 +125,7 @@ impl Flv2HlsRemuxer { flv_demux_data: &FlvDemuxerData, ) -> Result<(), MediaError> { self.need_new_segment = false; + self.need_new_partial_segment = false; let pid: u16; let pts: i64; @@ -136,6 +148,8 @@ impl Flv2HlsRemuxer { flags = MPEG_FLAG_IDR_FRAME; if dts - self.last_ts_dts >= self.duration * 1000 { self.need_new_segment = true; + } else if self.last_ts_dts >= self.partial_seg_duration { + self.need_new_partial_segment = true; } } } @@ -152,6 +166,19 @@ impl Flv2HlsRemuxer { _ => return Ok(()), } + if self.need_new_partial_segment { + println!("nnps?: {}", self.need_new_partial_segment) + } + + if self.need_new_partial_segment { + let d = self.ts_muxer.get_data(); + + self.m3u8_handler + .add_partial_segment(dts - self.last_partial_ts_dts, d)?; + + self.last_partial_ts_dts = dts; + } + if self.need_new_segment { let mut discontinuity: bool = false; if dts > self.last_ts_dts + 15 * 1000 { diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index 41a955dd..48df4078 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -53,7 +53,7 @@ impl FlvDataReceiver { data_consumer, event_producer, - media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name), + media_processor: Flv2HlsRemuxer::new(duration, 500, app_name, stream_name), subscriber_id, } } diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index b2cca73c..e63faf47 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -46,6 +46,7 @@ pub struct M3u8 { live_ts_count: usize, segments: VecDeque, + partial_segments: VecDeque, is_header_generated: bool, m3u8_header: String, @@ -72,6 +73,7 @@ impl M3u8 { is_live: true, live_ts_count, segments: VecDeque::new(), + partial_segments: VecDeque::new(), is_header_generated: false, m3u8_folder, m3u8_header: String::new(), @@ -97,13 +99,25 @@ impl M3u8 { self.duration = std::cmp::max(duration, self.duration); - let (ts_name, ts_path) = self.ts_handler.write(ts_data)?; + let (ts_name, ts_path) = self.ts_handler.write(ts_data, false)?; let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof); self.segments.push_back(segment); Ok(()) } + pub fn add_partial_segment( + &mut self, + duration: i64, + ts_data: BytesMut, + ) -> Result<(), MediaError> { + let cur_seg = self.segments.len(); + let cur_partial_seg = self.partial_segments.len(); + + let (ts_name, ts_path) = self.ts_handler.write(ts_data, true)?; + Ok(()) + } + pub fn clear(&mut self) -> Result<(), MediaError> { //clear ts for segment in &self.segments { diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index 5dcf1fda..d100aa99 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -37,12 +37,14 @@ async fn handle_connection(req: Request) -> Result> { let (left, _) = path.split_at(ts_index); let rv: Vec<_> = left.split("/").collect(); + println!("{:?}", rv); let app_name = String::from(rv[1]); let stream_name = String::from(rv[2]); let ts_name = String::from(rv[3]); file_path = format!("./{}/{}/{}.ts", app_name, stream_name, ts_name); + println!("{}", file_path) } } @@ -63,7 +65,13 @@ async fn simple_file_send(filename: &str) -> Result> { if let Ok(file) = File::open(filename).await { let stream = FramedRead::new(file, BytesCodec::new()); let body = Body::wrap_stream(stream); - return Ok(Response::new(body)); + let r = Response::builder() + .status(200) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Methods", "*") + .header("Access-Control-Allow-Headers", "*") + .body(body); + return Ok(r.unwrap()); } Ok(not_found()) diff --git a/protocol/hls/src/test_flv2hls.rs b/protocol/hls/src/test_flv2hls.rs index fdcd74d7..0d4ff813 100644 --- a/protocol/hls/src/test_flv2hls.rs +++ b/protocol/hls/src/test_flv2hls.rs @@ -55,7 +55,7 @@ mod tests { #[allow(dead_code)] fn test_flv2hls() -> Result<(), MediaError> { let mut file = - File::open("/Users/zexu/github/xiu/protocol/hls/src/xgplayer_demo.flv").unwrap(); + File::open("/Users/phin/misc/xiu/protocol/hls/src/xgplayer_demo.flv").unwrap(); let mut contents = Vec::new(); file.read_to_end(&mut contents)?; @@ -67,7 +67,8 @@ mod tests { demuxer.read_flv_header()?; let start = Instant::now(); - let mut media_demuxer = Flv2HlsRemuxer::new(5, String::from("live"), String::from("test")); + let mut media_demuxer = + Flv2HlsRemuxer::new(5, 200, String::from("live"), String::from("test")); loop { let data_ = demuxer.read_flv_tag(); diff --git a/protocol/hls/src/ts.rs b/protocol/hls/src/ts.rs index 72a2bef5..ee528a68 100644 --- a/protocol/hls/src/ts.rs +++ b/protocol/hls/src/ts.rs @@ -6,6 +6,7 @@ use { pub struct Ts { ts_number: u32, + pts_number: u32, folder_name: String, } @@ -16,13 +17,24 @@ impl Ts { Self { ts_number: 0, + pts_number: 0, folder_name, } } - pub fn write(&mut self, data: BytesMut) -> Result<(String, String), MediaError> { - let ts_file_name = format!("{}.ts", self.ts_number); + pub fn write(&mut self, data: BytesMut, partial: bool) -> Result<(String, String), MediaError> { + let ts_file_name = format!( + "{}{}.ts", + self.ts_number, + if partial { + self.pts_number += 1; + format!("{}.", self.pts_number) + } else { + self.pts_number = 0; + self.ts_number += 1; + String::from("") + }, + ); let ts_file_path = format!("{}/{}", self.folder_name, ts_file_name); - self.ts_number += 1; let mut ts_file_handler = File::create(ts_file_path.clone())?; ts_file_handler.write_all(&data[..])?; From 1250369920e93ce51fd5e843ec1f114278644a9c Mon Sep 17 00:00:00 2001 From: Phineas Date: Sat, 23 Jul 2022 02:52:28 +0100 Subject: [PATCH 02/10] write llhls m3u8 directives --- protocol/hls/src/flv2hls.rs | 3 ++- protocol/hls/src/m3u8.rs | 36 ++++++++++++++++++++++++++++-------- protocol/hls/src/ts.rs | 4 ++-- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index 5e8fe132..f6c999b0 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -148,7 +148,7 @@ impl Flv2HlsRemuxer { flags = MPEG_FLAG_IDR_FRAME; if dts - self.last_ts_dts >= self.duration * 1000 { self.need_new_segment = true; - } else if self.last_ts_dts >= self.partial_seg_duration { + } else if dts - self.last_partial_ts_dts >= self.partial_seg_duration { self.need_new_partial_segment = true; } } @@ -175,6 +175,7 @@ impl Flv2HlsRemuxer { self.m3u8_handler .add_partial_segment(dts - self.last_partial_ts_dts, d)?; + self.m3u8_handler.refresh_playlist()?; self.last_partial_ts_dts = dts; } diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index e63faf47..e49d2e15 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -12,6 +12,7 @@ pub struct Segment { name: String, path: String, is_eof: bool, + is_partial: bool, } impl Segment { @@ -21,6 +22,7 @@ impl Segment { name: String, path: String, is_eof: bool, + is_partial: bool, ) -> Self { Self { duration, @@ -28,6 +30,7 @@ impl Segment { name, path, is_eof, + is_partial, } } } @@ -67,7 +70,7 @@ impl M3u8 { let m3u8_folder = format!("./{}/{}", app_name, stream_name); fs::create_dir_all(m3u8_folder.clone()).unwrap(); Self { - version: 3, + version: 6, sequence_no: 0, duration, is_live: true, @@ -100,7 +103,7 @@ impl M3u8 { self.duration = std::cmp::max(duration, self.duration); let (ts_name, ts_path) = self.ts_handler.write(ts_data, false)?; - let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof); + let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof, false); self.segments.push_back(segment); Ok(()) @@ -115,6 +118,9 @@ impl M3u8 { let cur_partial_seg = self.partial_segments.len(); let (ts_name, ts_path) = self.ts_handler.write(ts_data, true)?; + let segment = Segment::new(duration, false, ts_name, ts_path, false, true); + self.segments.push_back(segment); + Ok(()) } @@ -144,6 +150,11 @@ impl M3u8 { self.m3u8_header += format!("#EXT-X-VERSION:{}\n", self.version).as_str(); self.m3u8_header += format!("#EXT-X-TARGETDURATION:{}\n", (self.duration + 999) / 1000).as_str(); + self.m3u8_header += format!( + "#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK={}\n", + 1.5 + ) + .as_str(); self.m3u8_header += format!("#EXT-X-MEDIA-SEQUENCE:{}\n", self.sequence_no).as_str(); self.m3u8_header += playlist_type; self.m3u8_header += allow_cache; @@ -159,12 +170,21 @@ impl M3u8 { if segment.discontinuity { m3u8_content += "#EXT-X-DISCONTINUITY\n"; } - m3u8_content += format!( - "#EXTINF:{:.3}\n{}\n", - segment.duration as f64 / 1000.0, - segment.name - ) - .as_str(); + if segment.is_partial { + m3u8_content += format!( + "#EXT-X-PART:DURATION={:.3},URI=\"{}\"\n", + segment.duration as f64 / 1000.0, + segment.name + ) + .as_str(); + } else { + m3u8_content += format!( + "#EXTINF:{:.3}\n{}\n", + segment.duration as f64 / 1000.0, + segment.name + ) + .as_str(); + } if segment.is_eof { m3u8_content += "#EXT-X-ENDLIST\n"; diff --git a/protocol/hls/src/ts.rs b/protocol/hls/src/ts.rs index ee528a68..af0bd8c9 100644 --- a/protocol/hls/src/ts.rs +++ b/protocol/hls/src/ts.rs @@ -24,10 +24,10 @@ impl Ts { pub fn write(&mut self, data: BytesMut, partial: bool) -> Result<(String, String), MediaError> { let ts_file_name = format!( "{}{}.ts", - self.ts_number, + self.ts_number.clone(), if partial { self.pts_number += 1; - format!("{}.", self.pts_number) + format!(".{}", self.pts_number) } else { self.pts_number = 0; self.ts_number += 1; From 08eb84ed4e52ae7159c368e69d3ed0187143c015 Mon Sep 17 00:00:00 2001 From: Phineas Date: Sat, 23 Jul 2022 21:18:39 +0100 Subject: [PATCH 03/10] add hls event manager & dispatcher --- application/xiu/src/main.rs | 18 +++++-- protocol/hls/Cargo.toml | 4 +- protocol/hls/src/flv2hls.rs | 6 ++- protocol/hls/src/flv_data_receiver.rs | 11 ++++- protocol/hls/src/hls_event_manager.rs | 60 ++++++++++++++++++++++++ protocol/hls/src/lib.rs | 5 +- protocol/hls/src/m3u8.rs | 5 +- protocol/hls/src/rtmp_event_processor.rs | 31 ++++++++++-- protocol/hls/src/server.rs | 14 +++++- 9 files changed, 137 insertions(+), 17 deletions(-) create mode 100644 protocol/hls/src/hls_event_manager.rs diff --git a/application/xiu/src/main.rs b/application/xiu/src/main.rs index bec808c7..ebe73d76 100644 --- a/application/xiu/src/main.rs +++ b/application/xiu/src/main.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use { //https://rustcc.cn/article?id=6dcbf032-0483-4980-8bfe-c64a7dfb33c7 anyhow::Result, @@ -17,6 +18,7 @@ use { }; //use application::logger::logger; +use hls::hls_event_manager::HlsEventManager; use hls::rtmp_event_processor::RtmpEventProcessor; #[tokio::main] @@ -39,7 +41,7 @@ async fn main() -> Result<()> { if let Some(log_config_value) = &val.log { env::set_var("RUST_LOG", log_config_value.level.clone()); } else { - env::set_var("RUST_LOG", "info"); + env::set_var("RUST_LOG", "info"); } // let mut builder = Builder::from_default_env(); @@ -196,10 +198,16 @@ impl Service { return Ok(()); } + let hls_manager = HlsEventManager::new(); + let hls_dispatch = hls_manager.setup_dispatch_channel(); + let event_producer = channel.get_session_event_producer().clone(); - let cient_event_consumer = channel.get_client_event_consumer(); - let mut rtmp_event_processor = - RtmpEventProcessor::new(cient_event_consumer, event_producer); + let client_event_consumer = channel.get_client_event_consumer(); + let mut rtmp_event_processor = RtmpEventProcessor::new( + client_event_consumer, + event_producer, + hls_dispatch.clone(), + ); tokio::spawn(async move { if let Err(err) = rtmp_event_processor.run().await { @@ -211,7 +219,7 @@ impl Service { let port = hls_cfg_value.port; tokio::spawn(async move { - if let Err(err) = hls_server::run(port).await { + if let Err(err) = hls_server::run(port, hls_dispatch.clone()).await { //print!("push client error {}\n", err); log::error!("hls server error: {}\n", err); } diff --git a/protocol/hls/Cargo.toml b/protocol/hls/Cargo.toml index ec58661b..61eed703 100644 --- a/protocol/hls/Cargo.toml +++ b/protocol/hls/Cargo.toml @@ -23,11 +23,11 @@ rtmp = "0.0.14"#{path = "../rtmp/"}#"0.0.4" # rtmp = {path = "../rtmp/"}#"0.0.4" xmpegts = "0.0.3"#{path = "../../library/container/mpegts/"} - +url = "2.2.2" hyper = { version = "0.14", features = ["full"] } tokio-util = { version = "0.6.5", features = ["codec"] } [dependencies.tokio] -version = "1.4.0" +version = "1.20.0" default-features = false features = ["full"] \ No newline at end of file diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index f6c999b0..cf829c02 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -1,5 +1,7 @@ use { - super::{define::FlvDemuxerData, errors::MediaError, m3u8::M3u8}, + super::{ + define::FlvDemuxerData, errors::MediaError, hls_event_manager::HlsEventProducer, m3u8::M3u8, + }, bytes::BytesMut, xflv::{ define::{frame_type, FlvData}, @@ -37,6 +39,7 @@ pub struct Flv2HlsRemuxer { impl Flv2HlsRemuxer { pub fn new( + hls_event_tx: HlsEventProducer, duration: i64, partial_seg_duration: i64, app_name: String, @@ -74,6 +77,7 @@ impl Flv2HlsRemuxer { audio_pid, m3u8_handler: M3u8::new( + hls_event_tx, duration, 6, m3u8_name, diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index 48df4078..90e6bf67 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -2,6 +2,7 @@ use { super::{ errors::{HlsError, HlsErrorValue}, flv2hls::Flv2HlsRemuxer, + hls_event_manager::HlsEventProducer, }, rtmp::channels::define::{ ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer, @@ -29,7 +30,6 @@ use { pub struct FlvDataReceiver { app_name: String, stream_name: String, - event_producer: ChannelEventProducer, data_consumer: ChannelDataConsumer, media_processor: Flv2HlsRemuxer, @@ -41,6 +41,7 @@ impl FlvDataReceiver { app_name: String, stream_name: String, event_producer: ChannelEventProducer, + hls_event_tx: HlsEventProducer, duration: i64, ) -> Self { @@ -53,7 +54,13 @@ impl FlvDataReceiver { data_consumer, event_producer, - media_processor: Flv2HlsRemuxer::new(duration, 500, app_name, stream_name), + media_processor: Flv2HlsRemuxer::new( + hls_event_tx, + duration, + 500, + app_name, + stream_name, + ), subscriber_id, } } diff --git a/protocol/hls/src/hls_event_manager.rs b/protocol/hls/src/hls_event_manager.rs new file mode 100644 index 00000000..2c2aec58 --- /dev/null +++ b/protocol/hls/src/hls_event_manager.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::sync::{broadcast, mpsc, oneshot}; + +pub enum DispatchEvent { + CreateChannel { + stream_name: String, + channel: oneshot::Sender, + }, +} + +pub type DispatchEventProducer = mpsc::Sender; +pub type DispatchEventConsumer = mpsc::Receiver; + +#[derive(Debug, Clone)] +pub enum HlsEvent { + Init {}, + HlsSequenceIncr { sequence: u64 }, +} + +pub type HlsEventProducer = broadcast::Sender; +pub type HlsEventConsumer = broadcast::Receiver; + +pub struct HlsEventManager { + stream_to_producer: Arc>>, +} + +impl HlsEventManager { + pub fn new() -> HlsEventManager { + return HlsEventManager { + stream_to_producer: Arc::new(Mutex::new(HashMap::new())), + }; + } + + pub fn setup_dispatch_channel(&self) -> DispatchEventProducer { + let (tx, mut rx) = mpsc::channel(1); + + let stp = self.stream_to_producer.clone(); + + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + use DispatchEvent::*; + match cmd { + CreateChannel { + stream_name, + channel, + } => { + let (tx, rx) = broadcast::channel(2); + stp.lock().unwrap().insert(stream_name.to_owned(), rx); + + channel.send(tx).expect("Failed to send"); + } + } + } + }); + + tx + } +} diff --git a/protocol/hls/src/lib.rs b/protocol/hls/src/lib.rs index 62481e0f..04850061 100644 --- a/protocol/hls/src/lib.rs +++ b/protocol/hls/src/lib.rs @@ -1,9 +1,10 @@ pub mod define; pub mod errors; pub mod flv2hls; -pub mod m3u8; pub mod flv_data_receiver; +pub mod hls_event_manager; +pub mod m3u8; pub mod rtmp_event_processor; +pub mod server; mod test_flv2hls; pub mod ts; -pub mod server; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index e49d2e15..f37ecbdb 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -1,5 +1,5 @@ use { - super::{errors::MediaError, ts::Ts}, + super::{errors::MediaError, hls_event_manager::HlsEventProducer, ts::Ts}, bytes::BytesMut, std::{collections::VecDeque, fs, fs::File, io::Write}, }; @@ -36,6 +36,7 @@ impl Segment { } pub struct M3u8 { + hls_event_tx: HlsEventProducer, version: u16, sequence_no: u64, /*What duration should media files be? @@ -61,6 +62,7 @@ pub struct M3u8 { impl M3u8 { pub fn new( + hls_event_tx: HlsEventProducer, duration: i64, live_ts_count: usize, name: String, @@ -70,6 +72,7 @@ impl M3u8 { let m3u8_folder = format!("./{}/{}", app_name, stream_name); fs::create_dir_all(m3u8_folder.clone()).unwrap(); Self { + hls_event_tx: hls_event_tx.clone(), version: 6, sequence_no: 0, duration, diff --git a/protocol/hls/src/rtmp_event_processor.rs b/protocol/hls/src/rtmp_event_processor.rs index 03059fda..85dad752 100644 --- a/protocol/hls/src/rtmp_event_processor.rs +++ b/protocol/hls/src/rtmp_event_processor.rs @@ -1,19 +1,28 @@ use super::errors::HlsError; use super::flv_data_receiver::FlvDataReceiver; +use super::hls_event_manager::{DispatchEvent, DispatchEventProducer}; +use crate::hls_event_manager::HlsEventProducer; use rtmp::channels::define::ChannelEventProducer; use rtmp::channels::define::ClientEvent; use rtmp::channels::define::ClientEventConsumer; +use tokio::sync::oneshot; pub struct RtmpEventProcessor { client_event_consumer: ClientEventConsumer, event_producer: ChannelEventProducer, + hls_manager_dispatcher: DispatchEventProducer, } impl RtmpEventProcessor { - pub fn new(consumer: ClientEventConsumer, event_producer: ChannelEventProducer) -> Self { + pub fn new( + consumer: ClientEventConsumer, + event_producer: ChannelEventProducer, + hls_manager_dispatcher: DispatchEventProducer, + ) -> Self { Self { client_event_consumer: consumer, event_producer, + hls_manager_dispatcher, } } @@ -25,8 +34,24 @@ impl RtmpEventProcessor { app_name, stream_name, } => { - let mut rtmp_subscriber = - FlvDataReceiver::new(app_name, stream_name, self.event_producer.clone(), 5); + let (resp_tx, resp_rx) = oneshot::channel(); + + let m = DispatchEvent::CreateChannel { + stream_name: stream_name.clone(), + channel: resp_tx, + }; + + self.hls_manager_dispatcher.send(m).await; + + let stream_channel_producer: HlsEventProducer = resp_rx.await.unwrap(); + + let mut rtmp_subscriber = FlvDataReceiver::new( + app_name, + stream_name, + self.event_producer.clone(), + stream_channel_producer, + 5, + ); tokio::spawn(async move { if let Err(err) = rtmp_subscriber.run().await { diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index d100aa99..a25f6b87 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -1,8 +1,11 @@ +use crate::hls_event_manager::DispatchEventProducer; use { + super::hls_event_manager::HlsEventManager, hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }, + std::collections::HashMap, tokio::fs::File, tokio_util::codec::{BytesCodec, FramedRead}, }; @@ -13,6 +16,15 @@ static NOTFOUND: &[u8] = b"Not Found"; async fn handle_connection(req: Request) -> Result> { let path = req.uri().path(); + let directives = req + .uri() + .query() + .map(|v| { + url::form_urlencoded::parse(v.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_else(HashMap::new); let mut file_path: String = String::from(""); @@ -77,7 +89,7 @@ async fn simple_file_send(filename: &str) -> Result> { Ok(not_found()) } -pub async fn run(port: u32) -> Result<()> { +pub async fn run(port: u32, hls_dispatch: DispatchEventProducer) -> Result<()> { let listen_address = format!("0.0.0.0:{}", port); let sock_addr = listen_address.parse().unwrap(); From 3f024c6649a6989e4d1d015956b5a3918ed41d8d Mon Sep 17 00:00:00 2001 From: Phineas Date: Sun, 24 Jul 2022 03:47:18 +0100 Subject: [PATCH 04/10] HLS origin API & _HLS_msn impl --- application/xiu/src/main.rs | 2 +- protocol/hls/src/flv2hls.rs | 6 +- protocol/hls/src/hls_event_manager.rs | 81 ++++++----- protocol/hls/src/hls_request_handler.rs | 176 ++++++++++++++++++++++++ protocol/hls/src/lib.rs | 1 + protocol/hls/src/m3u8.rs | 14 +- protocol/hls/src/server.rs | 105 +++----------- 7 files changed, 252 insertions(+), 133 deletions(-) create mode 100644 protocol/hls/src/hls_request_handler.rs diff --git a/application/xiu/src/main.rs b/application/xiu/src/main.rs index ebe73d76..5f03bf14 100644 --- a/application/xiu/src/main.rs +++ b/application/xiu/src/main.rs @@ -219,7 +219,7 @@ impl Service { let port = hls_cfg_value.port; tokio::spawn(async move { - if let Err(err) = hls_server::run(port, hls_dispatch.clone()).await { + if let Err(err) = hls_server::run(port, hls_manager).await { //print!("push client error {}\n", err); log::error!("hls server error: {}\n", err); } diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index cf829c02..9ff56538 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -119,7 +119,7 @@ impl Flv2HlsRemuxer { true, data, )?; - self.m3u8_handler.refresh_playlist()?; + self.m3u8_handler.refresh_playlist(false)?; Ok(()) } @@ -179,7 +179,7 @@ impl Flv2HlsRemuxer { self.m3u8_handler .add_partial_segment(dts - self.last_partial_ts_dts, d)?; - self.m3u8_handler.refresh_playlist()?; + self.m3u8_handler.refresh_playlist(false)?; self.last_partial_ts_dts = dts; } @@ -193,7 +193,7 @@ impl Flv2HlsRemuxer { self.m3u8_handler .add_segment(dts - self.last_ts_dts, discontinuity, false, data)?; - self.m3u8_handler.refresh_playlist()?; + self.m3u8_handler.refresh_playlist(true)?; self.ts_muxer.reset(); self.last_ts_dts = dts; diff --git a/protocol/hls/src/hls_event_manager.rs b/protocol/hls/src/hls_event_manager.rs index 2c2aec58..b4d891c6 100644 --- a/protocol/hls/src/hls_event_manager.rs +++ b/protocol/hls/src/hls_event_manager.rs @@ -1,13 +1,13 @@ use std::collections::HashMap; use std::sync::Arc; -use std::sync::Mutex; +use std::sync::RwLock; use tokio::sync::{broadcast, mpsc, oneshot}; pub enum DispatchEvent { - CreateChannel { - stream_name: String, - channel: oneshot::Sender, - }, + CreateChannel { + stream_name: String, + channel: oneshot::Sender, + }, } pub type DispatchEventProducer = mpsc::Sender; @@ -15,46 +15,51 @@ pub type DispatchEventConsumer = mpsc::Receiver; #[derive(Debug, Clone)] pub enum HlsEvent { - Init {}, - HlsSequenceIncr { sequence: u64 }, + Init {}, + HlsSequenceIncr { sequence: u64 }, } pub type HlsEventProducer = broadcast::Sender; pub type HlsEventConsumer = broadcast::Receiver; +pub type StpMap = Arc>>; + pub struct HlsEventManager { - stream_to_producer: Arc>>, + pub stream_to_producer: StpMap, } impl HlsEventManager { - pub fn new() -> HlsEventManager { - return HlsEventManager { - stream_to_producer: Arc::new(Mutex::new(HashMap::new())), - }; - } - - pub fn setup_dispatch_channel(&self) -> DispatchEventProducer { - let (tx, mut rx) = mpsc::channel(1); - - let stp = self.stream_to_producer.clone(); - - tokio::spawn(async move { - while let Some(cmd) = rx.recv().await { - use DispatchEvent::*; - match cmd { - CreateChannel { - stream_name, - channel, - } => { - let (tx, rx) = broadcast::channel(2); - stp.lock().unwrap().insert(stream_name.to_owned(), rx); - - channel.send(tx).expect("Failed to send"); - } - } - } - }); - - tx - } + pub fn new() -> HlsEventManager { + return HlsEventManager { + stream_to_producer: Arc::new(RwLock::new(HashMap::new())), + }; + } + + pub fn setup_dispatch_channel(&self) -> DispatchEventProducer { + let (tx, mut rx) = mpsc::channel(1); + + let stp = self.stream_to_producer.clone(); + + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + use DispatchEvent::*; + match cmd { + CreateChannel { + stream_name, + channel, + } => { + let (tx, rx) = broadcast::channel(2); + let tx2 = tx.clone(); + stp.write() + .unwrap() + .insert(stream_name.to_owned(), (tx, rx)); + + channel.send(tx2).expect("Failed to send"); + } + } + } + }); + + tx + } } diff --git a/protocol/hls/src/hls_request_handler.rs b/protocol/hls/src/hls_request_handler.rs new file mode 100644 index 00000000..d0e421f3 --- /dev/null +++ b/protocol/hls/src/hls_request_handler.rs @@ -0,0 +1,176 @@ +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use hyper::{service::Service, Body, Request, Response, StatusCode}; +use tokio::fs::File; +use tokio_util::codec::{BytesCodec, FramedRead}; + +use crate::hls_event_manager::StpMap; + +type GenericError = Box; +static NOTFOUND: &[u8] = b"Not Found"; + +pub struct HlsHandler { + stp_map: StpMap, +} + +impl Service> for HlsHandler { + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + // create a response in a future. + // let fut = async { + // Ok(resp) + // }; + + // // Return the response as an immediate future + // Box::pin(fut) + + let path = req.uri().path(); + let directives = req + .uri() + .query() + .map(|v| { + url::form_urlencoded::parse(v.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_else(HashMap::new); + + let mut file_path: String = String::from(""); + + if path.ends_with(".m3u8") { + let msn = directives.get("_HLS_msn"); + let part = directives.get("_HLS_part"); + + if part.is_some() && msn.is_none() { + // Client sent an invalid request: https://datatracker.ietf.org/doc/html/draft-pantos-hls-rfc8216bis#section-6.2.5.2 + return Box::pin(async { Ok(bad_request()) }); + } + + //http://127.0.0.1/app_name/stream_name/stream_name.m3u8 + let m3u8_index = path.find(".m3u8").unwrap(); + + if m3u8_index > 0 { + let (left, _) = path.split_at(m3u8_index); + let rv: Vec<_> = left.split("/").collect(); + + let app_name = String::from(rv[1]); + let stream_name = String::from(rv[2]); + + if let Some(msn) = directives.get("_HLS_msn") { + // Client wants us to hold the request until media segment number msn is generated + + // TODO: unsafe + let hm_read = self.stp_map.read().unwrap(); + + let stream_event_channel = hm_read.get(&stream_name); + + if stream_event_channel.is_none() { + return Box::pin(async { Ok(not_found()) }); + } else if let Some((tx, _rx)) = stream_event_channel { + let mut rc = tx.clone().subscribe(); + + return Box::pin(async move { + loop { + let m = rc.recv().await; + file_path = + format!("./{}/{}/{}.m3u8", app_name, stream_name, stream_name); + + break; + } + + simple_file_send(file_path.as_str()).await + }); + } + } + + file_path = format!("./{}/{}/{}.m3u8", app_name, stream_name, stream_name); + } + } else if path.ends_with(".ts") { + //http://127.0.0.1/app_name/stream_name/ts_name.m3u8 + let ts_index = path.find(".ts").unwrap(); + + if ts_index > 0 { + let (left, _) = path.split_at(ts_index); + + let rv: Vec<_> = left.split("/").collect(); + println!("{:?}", rv); + + let app_name = String::from(rv[1]); + let stream_name = String::from(rv[2]); + let ts_name = String::from(rv[3]); + + file_path = format!("./{}/{}/{}.ts", app_name, stream_name, ts_name); + println!("{}", file_path); + } + } + let f = async move { simple_file_send(file_path.as_str()).await }; + + Box::pin(f) + } +} + +pub struct MakeHlsHandler { + pub stp_map: StpMap, +} + +impl Service for MakeHlsHandler { + type Response = HlsHandler; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: T) -> Self::Future { + let stp_map = self.stp_map.clone(); + let fut = async move { Ok(HlsHandler { stp_map }) }; + Box::pin(fut) + } +} + +/// HTTP status code 400 +fn bad_request() -> Response { + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(NOTFOUND.into()) + .unwrap() +} + +/// HTTP status code 404 +fn not_found() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(NOTFOUND.into()) + .unwrap() +} + +async fn simple_file_send(filename: &str) -> Result, hyper::Error> { + // Serve a file by asynchronously reading it by chunks using tokio-util crate. + + if let Ok(file) = File::open(filename).await { + let stream = FramedRead::new(file, BytesCodec::new()); + let body = Body::wrap_stream(stream); + let r = Response::builder() + .status(200) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Methods", "*") + .header("Access-Control-Allow-Headers", "*") + .body(body); + return Ok(r.unwrap()); + } + + Ok(not_found()) +} diff --git a/protocol/hls/src/lib.rs b/protocol/hls/src/lib.rs index 04850061..44aafb22 100644 --- a/protocol/hls/src/lib.rs +++ b/protocol/hls/src/lib.rs @@ -3,6 +3,7 @@ pub mod errors; pub mod flv2hls; pub mod flv_data_receiver; pub mod hls_event_manager; +pub mod hls_request_handler; pub mod m3u8; pub mod rtmp_event_processor; pub mod server; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index f37ecbdb..3efbb8f5 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -1,5 +1,9 @@ use { - super::{errors::MediaError, hls_event_manager::HlsEventProducer, ts::Ts}, + super::{ + errors::MediaError, + hls_event_manager::{HlsEvent, HlsEventProducer}, + ts::Ts, + }, bytes::BytesMut, std::{collections::VecDeque, fs, fs::File, io::Write}, }; @@ -165,7 +169,7 @@ impl M3u8 { Ok(()) } - pub fn refresh_playlist(&mut self) -> Result { + pub fn refresh_playlist(&mut self, broadcast_new_msn: bool) -> Result { self.generate_m3u8_header()?; let mut m3u8_content = self.m3u8_header.clone(); @@ -200,6 +204,12 @@ impl M3u8 { let mut file_handler = File::create(m3u8_path).unwrap(); file_handler.write(m3u8_content.as_bytes())?; + if broadcast_new_msn { + self.hls_event_tx.send(HlsEvent::HlsSequenceIncr { + sequence: self.sequence_no, + }); + } + Ok(m3u8_content) } } diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index a25f6b87..025e35cf 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -1,105 +1,32 @@ -use crate::hls_event_manager::DispatchEventProducer; +use std::sync::Arc; use { super::hls_event_manager::HlsEventManager, + super::hls_request_handler::MakeHlsHandler, hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }, - std::collections::HashMap, - tokio::fs::File, - tokio_util::codec::{BytesCodec, FramedRead}, }; -type GenericError = Box; -type Result = std::result::Result; -static NOTFOUND: &[u8] = b"Not Found"; - -async fn handle_connection(req: Request) -> Result> { - let path = req.uri().path(); - let directives = req - .uri() - .query() - .map(|v| { - url::form_urlencoded::parse(v.as_bytes()) - .into_owned() - .collect() - }) - .unwrap_or_else(HashMap::new); - - let mut file_path: String = String::from(""); - - if path.ends_with(".m3u8") { - //http://127.0.0.1/app_name/stream_name/stream_name.m3u8 - let m3u8_index = path.find(".m3u8").unwrap(); - - if m3u8_index > 0 { - let (left, _) = path.split_at(m3u8_index); - let rv: Vec<_> = left.split("/").collect(); - - let app_name = String::from(rv[1]); - let stream_name = String::from(rv[2]); - - file_path = format!("./{}/{}/{}.m3u8", app_name, stream_name, stream_name); - } - } else if path.ends_with(".ts") { - //http://127.0.0.1/app_name/stream_name/ts_name.m3u8 - let ts_index = path.find(".ts").unwrap(); - - if ts_index > 0 { - let (left, _) = path.split_at(ts_index); - - let rv: Vec<_> = left.split("/").collect(); - println!("{:?}", rv); - - let app_name = String::from(rv[1]); - let stream_name = String::from(rv[2]); - let ts_name = String::from(rv[3]); - - file_path = format!("./{}/{}/{}.ts", app_name, stream_name, ts_name); - println!("{}", file_path) - } - } - - return simple_file_send(file_path.as_str()).await; -} - -/// HTTP status code 404 -fn not_found() -> Response { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(NOTFOUND.into()) - .unwrap() -} - -async fn simple_file_send(filename: &str) -> Result> { - // Serve a file by asynchronously reading it by chunks using tokio-util crate. - - if let Ok(file) = File::open(filename).await { - let stream = FramedRead::new(file, BytesCodec::new()); - let body = Body::wrap_stream(stream); - let r = Response::builder() - .status(200) - .header("Access-Control-Allow-Origin", "*") - .header("Access-Control-Allow-Methods", "*") - .header("Access-Control-Allow-Headers", "*") - .body(body); - return Ok(r.unwrap()); - } - - Ok(not_found()) -} - -pub async fn run(port: u32, hls_dispatch: DispatchEventProducer) -> Result<()> { +pub async fn run(port: u32, hls_event_manager: HlsEventManager) -> Result<(), hyper::Error> { let listen_address = format!("0.0.0.0:{}", port); let sock_addr = listen_address.parse().unwrap(); - let new_service = make_service_fn(move |_| async { - Ok::<_, GenericError>(service_fn(move |req| handle_connection(req))) - }); + // let new_service = make_service_fn(move |_| { + // async move { + // Ok::<_, GenericError>(service_fn(move |req| { + // handle_connection(req, Arc::clone(&t)).await + // })) + // } + // }); - let server = Server::bind(&sock_addr).serve(new_service); + let t = Arc::clone(&hls_event_manager.stream_to_producer); + + let server = Server::bind(&sock_addr).serve(MakeHlsHandler { stp_map: t }); log::info!("Hls server listening on http://{}", sock_addr); - server.await?; + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } Ok(()) } From 87a53fa682c9a809898b8487acd1ef87060bc8a3 Mon Sep 17 00:00:00 2001 From: Phineas Date: Sun, 24 Jul 2022 05:03:40 +0100 Subject: [PATCH 05/10] hold requests until msn is generated --- protocol/hls/src/hls_request_handler.rs | 35 +++++++++++++++++++------ protocol/hls/src/server.rs | 16 ++--------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/protocol/hls/src/hls_request_handler.rs b/protocol/hls/src/hls_request_handler.rs index d0e421f3..269d3aa0 100644 --- a/protocol/hls/src/hls_request_handler.rs +++ b/protocol/hls/src/hls_request_handler.rs @@ -9,7 +9,7 @@ use hyper::{service::Service, Body, Request, Response, StatusCode}; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; -use crate::hls_event_manager::StpMap; +use crate::hls_event_manager::{HlsEvent, StpMap}; type GenericError = Box; static NOTFOUND: &[u8] = b"Not Found"; @@ -68,9 +68,18 @@ impl Service> for HlsHandler { let app_name = String::from(rv[1]); let stream_name = String::from(rv[2]); - if let Some(msn) = directives.get("_HLS_msn") { + println!("msn: {:?}", msn); + if let Some(msn_s) = msn { // Client wants us to hold the request until media segment number msn is generated + let msn_ur: Result = msn_s.parse(); + + if msn_ur.is_err() { + return Box::pin(async { Ok(bad_request()) }); + } + + let msn_u = msn_ur.unwrap(); + // TODO: unsafe let hm_read = self.stp_map.read().unwrap(); @@ -82,15 +91,27 @@ impl Service> for HlsHandler { let mut rc = tx.clone().subscribe(); return Box::pin(async move { + let mut fp = String::from(""); + loop { let m = rc.recv().await; - file_path = - format!("./{}/{}/{}.m3u8", app_name, stream_name, stream_name); - break; + if let Ok(HlsEvent::HlsSequenceIncr { sequence: seq }) = m { + if seq != msn_u { + continue; + }; + + fp = format!( + "./{}/{}/{}.m3u8", + app_name, stream_name, stream_name + ); + break; + } else { + continue; + } } - simple_file_send(file_path.as_str()).await + simple_file_send(fp.as_str()).await }); } } @@ -105,14 +126,12 @@ impl Service> for HlsHandler { let (left, _) = path.split_at(ts_index); let rv: Vec<_> = left.split("/").collect(); - println!("{:?}", rv); let app_name = String::from(rv[1]); let stream_name = String::from(rv[2]); let ts_name = String::from(rv[3]); file_path = format!("./{}/{}/{}.ts", app_name, stream_name, ts_name); - println!("{}", file_path); } } let f = async move { simple_file_send(file_path.as_str()).await }; diff --git a/protocol/hls/src/server.rs b/protocol/hls/src/server.rs index 025e35cf..fa9c149c 100644 --- a/protocol/hls/src/server.rs +++ b/protocol/hls/src/server.rs @@ -1,25 +1,13 @@ use std::sync::Arc; use { - super::hls_event_manager::HlsEventManager, - super::hls_request_handler::MakeHlsHandler, - hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, - }, + super::hls_event_manager::HlsEventManager, super::hls_request_handler::MakeHlsHandler, + hyper::Server, }; pub async fn run(port: u32, hls_event_manager: HlsEventManager) -> Result<(), hyper::Error> { let listen_address = format!("0.0.0.0:{}", port); let sock_addr = listen_address.parse().unwrap(); - // let new_service = make_service_fn(move |_| { - // async move { - // Ok::<_, GenericError>(service_fn(move |req| { - // handle_connection(req, Arc::clone(&t)).await - // })) - // } - // }); - let t = Arc::clone(&hls_event_manager.stream_to_producer); let server = Server::bind(&sock_addr).serve(MakeHlsHandler { stp_map: t }); From 21e8889f44e1a9c3b6e73850b85586221da74afc Mon Sep 17 00:00:00 2001 From: Phineas Date: Sun, 24 Jul 2022 09:15:19 +0100 Subject: [PATCH 06/10] implement m3u8 mpsc --- protocol/hls/src/flv2hls.rs | 23 +++++++++----- protocol/hls/src/flv_data_receiver.rs | 4 +++ protocol/hls/src/hls_event_manager.rs | 22 ++++++++++--- protocol/hls/src/hls_request_handler.rs | 33 +++++++++++++++----- protocol/hls/src/m3u8.rs | 39 ++++++++++++++++++++---- protocol/hls/src/rtmp_event_processor.rs | 3 +- 6 files changed, 97 insertions(+), 27 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index 9ff56538..827bfe11 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -1,3 +1,5 @@ +use crate::hls_event_manager::M3u8Consumer; + use { super::{ define::FlvDemuxerData, errors::MediaError, hls_event_manager::HlsEventProducer, m3u8::M3u8, @@ -40,6 +42,7 @@ pub struct Flv2HlsRemuxer { impl Flv2HlsRemuxer { pub fn new( hls_event_tx: HlsEventProducer, + m3u8_consumer: M3u8Consumer, duration: i64, partial_seg_duration: i64, app_name: String, @@ -55,6 +58,17 @@ impl Flv2HlsRemuxer { let m3u8_name = format!("{}.m3u8", stream_name); + let m3u8_handler = M3u8::new( + hls_event_tx, + duration, + 6, + m3u8_name, + app_name.clone(), + stream_name.clone(), + ); + + m3u8_handler.setup_m3u8_listener(m3u8_consumer); + Self { video_demuxer: FlvVideoTagDemuxer::new(), audio_demuxer: FlvAudioTagDemuxer::new(), @@ -76,14 +90,7 @@ impl Flv2HlsRemuxer { video_pid, audio_pid, - m3u8_handler: M3u8::new( - hls_event_tx, - duration, - 6, - m3u8_name, - app_name.clone(), - stream_name.clone(), - ), + m3u8_handler: m3u8_handler, } } diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index 90e6bf67..88b43383 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -1,3 +1,5 @@ +use crate::hls_event_manager::M3u8Consumer; + use { super::{ errors::{HlsError, HlsErrorValue}, @@ -42,6 +44,7 @@ impl FlvDataReceiver { stream_name: String, event_producer: ChannelEventProducer, hls_event_tx: HlsEventProducer, + m3u8_consumer: M3u8Consumer, duration: i64, ) -> Self { @@ -56,6 +59,7 @@ impl FlvDataReceiver { event_producer, media_processor: Flv2HlsRemuxer::new( hls_event_tx, + m3u8_consumer, duration, 500, app_name, diff --git a/protocol/hls/src/hls_event_manager.rs b/protocol/hls/src/hls_event_manager.rs index b4d891c6..e9f55540 100644 --- a/protocol/hls/src/hls_event_manager.rs +++ b/protocol/hls/src/hls_event_manager.rs @@ -3,16 +3,27 @@ use std::sync::Arc; use std::sync::RwLock; use tokio::sync::{broadcast, mpsc, oneshot}; +use crate::m3u8::M3u8PlaylistResponse; + pub enum DispatchEvent { CreateChannel { stream_name: String, - channel: oneshot::Sender, + channel: oneshot::Sender<(HlsEventProducer, M3u8Consumer)>, }, } pub type DispatchEventProducer = mpsc::Sender; pub type DispatchEventConsumer = mpsc::Receiver; +pub enum M3u8Event { + RequestPlaylist { + channel: oneshot::Sender, + }, +} + +pub type M3u8Producer = mpsc::Sender; +pub type M3u8Consumer = mpsc::Receiver; + #[derive(Debug, Clone)] pub enum HlsEvent { Init {}, @@ -22,7 +33,7 @@ pub enum HlsEvent { pub type HlsEventProducer = broadcast::Sender; pub type HlsEventConsumer = broadcast::Receiver; -pub type StpMap = Arc>>; +pub type StpMap = Arc>>; pub struct HlsEventManager { pub stream_to_producer: StpMap, @@ -50,11 +61,14 @@ impl HlsEventManager { } => { let (tx, rx) = broadcast::channel(2); let tx2 = tx.clone(); + + let (m3u8_tx, m3u8_rx) = mpsc::channel(1); + stp.write() .unwrap() - .insert(stream_name.to_owned(), (tx, rx)); + .insert(stream_name.to_owned(), (tx, rx, m3u8_tx)); - channel.send(tx2).expect("Failed to send"); + channel.send((tx2, m3u8_rx)).expect("Failed to send"); } } } diff --git a/protocol/hls/src/hls_request_handler.rs b/protocol/hls/src/hls_request_handler.rs index 269d3aa0..dec9435b 100644 --- a/protocol/hls/src/hls_request_handler.rs +++ b/protocol/hls/src/hls_request_handler.rs @@ -6,10 +6,13 @@ use std::{ }; use hyper::{service::Service, Body, Request, Response, StatusCode}; -use tokio::fs::File; +use tokio::{fs::File, sync::oneshot}; use tokio_util::codec::{BytesCodec, FramedRead}; -use crate::hls_event_manager::{HlsEvent, StpMap}; +use crate::{ + hls_event_manager::{HlsEvent, M3u8Event, StpMap}, + m3u8::M3u8PlaylistResponse, +}; type GenericError = Box; static NOTFOUND: &[u8] = b"Not Found"; @@ -87,11 +90,29 @@ impl Service> for HlsHandler { if stream_event_channel.is_none() { return Box::pin(async { Ok(not_found()) }); - } else if let Some((tx, _rx)) = stream_event_channel { + } else if let Some((tx, _rx, m3u8_prod)) = stream_event_channel { let mut rc = tx.clone().subscribe(); + let mut mp = m3u8_prod.clone(); + let fp = format!("./{}/{}/{}.m3u8", app_name, stream_name, stream_name); return Box::pin(async move { - let mut fp = String::from(""); + let (resp_tx, resp_rx) = oneshot::channel(); + + let q = M3u8Event::RequestPlaylist { channel: resp_tx }; + + mp.send(q).await; + + let M3u8PlaylistResponse { sequence_no: seq } = resp_rx.await.unwrap(); + + if seq > msn_u { + // sequence already exists + return simple_file_send(fp.as_str()).await; + } + + if msn_u > seq + 2 { + // sequence too far in future + return Ok(bad_request()); + } loop { let m = rc.recv().await; @@ -101,10 +122,6 @@ impl Service> for HlsHandler { continue; }; - fp = format!( - "./{}/{}/{}.m3u8", - app_name, stream_name, stream_name - ); break; } else { continue; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index 3efbb8f5..c19e21f9 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -1,3 +1,7 @@ +use std::{ops::Add, sync::Arc}; + +use crate::hls_event_manager::{HlsEventConsumer, M3u8Consumer, M3u8Event}; + use { super::{ errors::MediaError, @@ -39,10 +43,14 @@ impl Segment { } } +pub struct M3u8PlaylistResponse { + pub sequence_no: u64, +} + pub struct M3u8 { hls_event_tx: HlsEventProducer, version: u16, - sequence_no: u64, + sequence_no: Arc, /*What duration should media files be? A duration of 10 seconds of media per file seems to strike a reasonable balance for most broadcast content. http://devimages.apple.com/iphone/samples/bipbop/bipbopall.m3u8*/ @@ -75,10 +83,11 @@ impl M3u8 { ) -> Self { let m3u8_folder = format!("./{}/{}", app_name, stream_name); fs::create_dir_all(m3u8_folder.clone()).unwrap(); + Self { hls_event_tx: hls_event_tx.clone(), version: 6, - sequence_no: 0, + sequence_no: Arc::new(0), duration, is_live: true, live_ts_count, @@ -92,6 +101,22 @@ impl M3u8 { } } + pub fn setup_m3u8_listener(&self, mut m3u8_consumer: M3u8Consumer) { + let seq = Arc::clone(&self.sequence_no); + + tokio::spawn(async move { + while let Some(cmd) = m3u8_consumer.recv().await { + use M3u8Event::*; + match cmd { + RequestPlaylist { channel: c } => { + c.send(M3u8PlaylistResponse { sequence_no: *seq }) + .unwrap_or_default(); + } + } + } + }); + } + pub fn add_segment( &mut self, duration: i64, @@ -104,7 +129,7 @@ impl M3u8 { if self.is_live && segment_count >= self.live_ts_count { let segment = self.segments.pop_front().unwrap(); self.ts_handler.delete(segment.path); - self.sequence_no += 1; + self.sequence_no.add(1); } self.duration = std::cmp::max(duration, self.duration); @@ -205,9 +230,11 @@ impl M3u8 { file_handler.write(m3u8_content.as_bytes())?; if broadcast_new_msn { - self.hls_event_tx.send(HlsEvent::HlsSequenceIncr { - sequence: self.sequence_no, - }); + self.hls_event_tx + .send(HlsEvent::HlsSequenceIncr { + sequence: *self.sequence_no, + }) + .unwrap_or_default(); } Ok(m3u8_content) diff --git a/protocol/hls/src/rtmp_event_processor.rs b/protocol/hls/src/rtmp_event_processor.rs index 85dad752..c86be526 100644 --- a/protocol/hls/src/rtmp_event_processor.rs +++ b/protocol/hls/src/rtmp_event_processor.rs @@ -43,13 +43,14 @@ impl RtmpEventProcessor { self.hls_manager_dispatcher.send(m).await; - let stream_channel_producer: HlsEventProducer = resp_rx.await.unwrap(); + let (stream_channel_producer, m3u8_consumer) = resp_rx.await.unwrap(); let mut rtmp_subscriber = FlvDataReceiver::new( app_name, stream_name, self.event_producer.clone(), stream_channel_producer, + m3u8_consumer, 5, ); From df24e7e6fecb5308efa83376703a7474e24410c4 Mon Sep 17 00:00:00 2001 From: Phineas Date: Mon, 25 Jul 2022 02:04:04 +0100 Subject: [PATCH 07/10] almost working LLHLS! --- protocol/hls/src/flv2hls.rs | 21 ++++- protocol/hls/src/flv_data_receiver.rs | 2 +- protocol/hls/src/m3u8.rs | 128 ++++++++++++++++++++------ 3 files changed, 117 insertions(+), 34 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index 827bfe11..69d94d99 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -19,6 +19,7 @@ pub struct Flv2HlsRemuxer { video_demuxer: FlvVideoTagDemuxer, audio_demuxer: FlvAudioTagDemuxer, + partial_ts_muxer: TsMuxer, ts_muxer: TsMuxer, last_ts_dts: i64, @@ -56,6 +57,14 @@ impl Flv2HlsRemuxer { .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) .unwrap(); + let mut partial_ts_muxer = TsMuxer::new(); + let audio_pid = partial_ts_muxer + .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) + .unwrap(); + let video_pid = partial_ts_muxer + .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) + .unwrap(); + let m3u8_name = format!("{}.m3u8", stream_name); let m3u8_handler = M3u8::new( @@ -74,6 +83,7 @@ impl Flv2HlsRemuxer { audio_demuxer: FlvAudioTagDemuxer::new(), ts_muxer, + partial_ts_muxer, last_ts_dts: 0, last_partial_ts_dts: 0, @@ -159,9 +169,9 @@ impl Flv2HlsRemuxer { flags = MPEG_FLAG_IDR_FRAME; if dts - self.last_ts_dts >= self.duration * 1000 { self.need_new_segment = true; - } else if dts - self.last_partial_ts_dts >= self.partial_seg_duration { - self.need_new_partial_segment = true; } + } else if dts - self.last_partial_ts_dts >= self.partial_seg_duration { + self.need_new_partial_segment = true; } } FlvDemuxerData::Audio { data } => { @@ -182,13 +192,15 @@ impl Flv2HlsRemuxer { } if self.need_new_partial_segment { - let d = self.ts_muxer.get_data(); + let d = self.partial_ts_muxer.get_data(); self.m3u8_handler .add_partial_segment(dts - self.last_partial_ts_dts, d)?; self.m3u8_handler.refresh_playlist(false)?; + self.partial_ts_muxer.reset(); self.last_partial_ts_dts = dts; + self.need_new_partial_segment = false; } if self.need_new_segment { @@ -212,6 +224,9 @@ impl Flv2HlsRemuxer { self.last_pts = pts; self.ts_muxer + .write(pid, pts * 90, dts * 90, flags, payload.clone())?; + + self.partial_ts_muxer .write(pid, pts * 90, dts * 90, flags, payload)?; Ok(()) diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index 88b43383..3ae3efb5 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -61,7 +61,7 @@ impl FlvDataReceiver { hls_event_tx, m3u8_consumer, duration, - 500, + 1000, app_name, stream_name, ), diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index c19e21f9..67a8cc23 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -1,4 +1,7 @@ -use std::{ops::Add, sync::Arc}; +use std::{ + ops::Add, + sync::{Arc, RwLock}, +}; use crate::hls_event_manager::{HlsEventConsumer, M3u8Consumer, M3u8Event}; @@ -12,6 +15,13 @@ use { std::{collections::VecDeque, fs, fs::File, io::Write}, }; +#[derive(Clone)] +pub struct PartialSegment { + duration: i64, + name: String, +} + +#[derive(Clone)] pub struct Segment { /*ts duration*/ duration: i64, @@ -20,7 +30,10 @@ pub struct Segment { name: String, path: String, is_eof: bool, - is_partial: bool, + is_complete: bool, + + // LLHLS partial segments + partials: Vec, } impl Segment { @@ -30,7 +43,7 @@ impl Segment { name: String, path: String, is_eof: bool, - is_partial: bool, + is_complete: bool, ) -> Self { Self { duration, @@ -38,9 +51,18 @@ impl Segment { name, path, is_eof, - is_partial, + is_complete, + partials: vec![], } } + + pub fn set_complete(&mut self) { + self.is_complete = true; + } + + pub fn add_partial(&mut self, seg: PartialSegment) { + self.partials.push(seg); + } } pub struct M3u8PlaylistResponse { @@ -50,7 +72,7 @@ pub struct M3u8PlaylistResponse { pub struct M3u8 { hls_event_tx: HlsEventProducer, version: u16, - sequence_no: Arc, + sequence_no: Arc>, /*What duration should media files be? A duration of 10 seconds of media per file seems to strike a reasonable balance for most broadcast content. http://devimages.apple.com/iphone/samples/bipbop/bipbopall.m3u8*/ @@ -62,7 +84,6 @@ pub struct M3u8 { live_ts_count: usize, segments: VecDeque, - partial_segments: VecDeque, is_header_generated: bool, m3u8_header: String, @@ -87,12 +108,11 @@ impl M3u8 { Self { hls_event_tx: hls_event_tx.clone(), version: 6, - sequence_no: Arc::new(0), + sequence_no: Arc::new(RwLock::new(0)), duration, is_live: true, live_ts_count, segments: VecDeque::new(), - partial_segments: VecDeque::new(), is_header_generated: false, m3u8_folder, m3u8_header: String::new(), @@ -109,8 +129,10 @@ impl M3u8 { use M3u8Event::*; match cmd { RequestPlaylist { channel: c } => { - c.send(M3u8PlaylistResponse { sequence_no: *seq }) - .unwrap_or_default(); + c.send(M3u8PlaylistResponse { + sequence_no: *seq.read().unwrap(), + }) + .unwrap_or_default(); } } } @@ -128,15 +150,19 @@ impl M3u8 { if self.is_live && segment_count >= self.live_ts_count { let segment = self.segments.pop_front().unwrap(); - self.ts_handler.delete(segment.path); - self.sequence_no.add(1); + // self.ts_handler.delete(segment.path); } + let mut s = self.sequence_no.write().unwrap(); + *s += 1; + self.duration = std::cmp::max(duration, self.duration); - let (ts_name, ts_path) = self.ts_handler.write(ts_data, false)?; - let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof, false); - self.segments.push_back(segment); + self.ts_handler.write(ts_data, false)?; + // let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof, false); + self.segments.back_mut().unwrap().set_complete(); + + // self.segments.push_back(segment); Ok(()) } @@ -146,12 +172,48 @@ impl M3u8 { duration: i64, ts_data: BytesMut, ) -> Result<(), MediaError> { - let cur_seg = self.segments.len(); - let cur_partial_seg = self.partial_segments.len(); - let (ts_name, ts_path) = self.ts_handler.write(ts_data, true)?; - let segment = Segment::new(duration, false, ts_name, ts_path, false, true); - self.segments.push_back(segment); + + let cur_seg = self.segments.back_mut(); + + match cur_seg { + None + | Some(Segment { + is_complete: true, .. + }) => { + // needs new segment + + let mut seg = Segment::new( + duration, + false, + format!("{}.ts", self.sequence_no.read().unwrap()), + ts_path, + false, + false, + ); + + let partial = PartialSegment { + duration, + name: ts_name.to_owned(), + }; + + seg.add_partial(partial); + + &self.segments.push_back(seg); + } + Some(seg) => { + // add partial to existing segment + + let partial = PartialSegment { + duration, + name: ts_name.to_owned(), + }; + + println!("partial add {}", &partial.name); + + seg.add_partial(partial); + } + } Ok(()) } @@ -187,7 +249,11 @@ impl M3u8 { 1.5 ) .as_str(); - self.m3u8_header += format!("#EXT-X-MEDIA-SEQUENCE:{}\n", self.sequence_no).as_str(); + self.m3u8_header += format!( + "#EXT-X-MEDIA-SEQUENCE:{}\n", + self.sequence_no.read().unwrap() + ) + .as_str(); self.m3u8_header += playlist_type; self.m3u8_header += allow_cache; @@ -202,16 +268,18 @@ impl M3u8 { if segment.discontinuity { m3u8_content += "#EXT-X-DISCONTINUITY\n"; } - if segment.is_partial { - m3u8_content += format!( - "#EXT-X-PART:DURATION={:.3},URI=\"{}\"\n", - segment.duration as f64 / 1000.0, - segment.name - ) - .as_str(); + if !segment.is_complete { + for partial in &segment.partials { + m3u8_content += format!( + "#EXT-X-PART:DURATION={:.3},URI=\"{}\"\n", + partial.duration as f64 / 1000.0, + partial.name + ) + .as_str(); + } } else { m3u8_content += format!( - "#EXTINF:{:.3}\n{}\n", + "#EXTINF:{:.3},\n{}\n", segment.duration as f64 / 1000.0, segment.name ) @@ -232,7 +300,7 @@ impl M3u8 { if broadcast_new_msn { self.hls_event_tx .send(HlsEvent::HlsSequenceIncr { - sequence: *self.sequence_no, + sequence: *self.sequence_no.read().unwrap(), }) .unwrap_or_default(); } From f2cf25ca2fd0bad76904cc826aa8a5b59d68b8df Mon Sep 17 00:00:00 2001 From: Phineas Date: Mon, 25 Jul 2022 05:06:52 +0100 Subject: [PATCH 08/10] denote idr frames --- protocol/hls/src/flv2hls.rs | 41 +++++++++++++++++-------- protocol/hls/src/hls_request_handler.rs | 9 +++--- protocol/hls/src/m3u8.rs | 13 ++++++-- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index 69d94d99..a42095d0 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -33,10 +33,14 @@ pub struct Flv2HlsRemuxer { partial_seg_duration: i64, need_new_segment: bool, need_new_partial_segment: bool, + segment_has_idr: bool, video_pid: u16, audio_pid: u16, + p_video_pid: u16, + p_audio_pid: u16, + m3u8_handler: M3u8, } @@ -58,10 +62,10 @@ impl Flv2HlsRemuxer { .unwrap(); let mut partial_ts_muxer = TsMuxer::new(); - let audio_pid = partial_ts_muxer + let p_audio_pid = partial_ts_muxer .add_stream(epsi_stream_type::PSI_STREAM_AAC, BytesMut::new()) .unwrap(); - let video_pid = partial_ts_muxer + let p_video_pid = partial_ts_muxer .add_stream(epsi_stream_type::PSI_STREAM_H264, BytesMut::new()) .unwrap(); @@ -96,10 +100,14 @@ impl Flv2HlsRemuxer { partial_seg_duration, need_new_segment: false, need_new_partial_segment: false, + segment_has_idr: false, video_pid, audio_pid, + p_video_pid, + p_audio_pid, + m3u8_handler: m3u8_handler, } } @@ -149,6 +157,7 @@ impl Flv2HlsRemuxer { self.need_new_partial_segment = false; let pid: u16; + let p_pid: u16; let pts: i64; let dts: i64; let mut flags: u16 = 0; @@ -163,13 +172,17 @@ impl Flv2HlsRemuxer { pts = data.pts; dts = data.dts; pid = self.video_pid; + p_pid = self.video_pid; + payload.extend_from_slice(&data.data[..]); if data.frame_type == frame_type::KEY_FRAME { flags = MPEG_FLAG_IDR_FRAME; - if dts - self.last_ts_dts >= self.duration * 1000 { - self.need_new_segment = true; - } + self.segment_has_idr = true; + } + + if dts - self.last_ts_dts >= self.duration * 1000 { + self.need_new_segment = true; } else if dts - self.last_partial_ts_dts >= self.partial_seg_duration { self.need_new_partial_segment = true; } @@ -182,25 +195,28 @@ impl Flv2HlsRemuxer { pts = data.pts; dts = data.dts; pid = self.audio_pid; + p_pid = self.audio_pid; payload.extend_from_slice(&data.data[..]); } _ => return Ok(()), } - if self.need_new_partial_segment { - println!("nnps?: {}", self.need_new_partial_segment) - } - if self.need_new_partial_segment { let d = self.partial_ts_muxer.get_data(); - self.m3u8_handler - .add_partial_segment(dts - self.last_partial_ts_dts, d)?; + println!(" f: {}", flags); + + self.m3u8_handler.add_partial_segment( + dts - self.last_partial_ts_dts, + d, + self.segment_has_idr, + )?; self.m3u8_handler.refresh_playlist(false)?; self.partial_ts_muxer.reset(); self.last_partial_ts_dts = dts; self.need_new_partial_segment = false; + self.segment_has_idr = false; } if self.need_new_segment { @@ -218,6 +234,7 @@ impl Flv2HlsRemuxer { self.last_ts_dts = dts; self.last_ts_pts = pts; self.need_new_segment = false; + self.segment_has_idr = false; } self.last_dts = dts; @@ -227,7 +244,7 @@ impl Flv2HlsRemuxer { .write(pid, pts * 90, dts * 90, flags, payload.clone())?; self.partial_ts_muxer - .write(pid, pts * 90, dts * 90, flags, payload)?; + .write(p_pid, pts * 90, dts * 90, flags, payload)?; Ok(()) } diff --git a/protocol/hls/src/hls_request_handler.rs b/protocol/hls/src/hls_request_handler.rs index dec9435b..5ba38baa 100644 --- a/protocol/hls/src/hls_request_handler.rs +++ b/protocol/hls/src/hls_request_handler.rs @@ -14,7 +14,6 @@ use crate::{ m3u8::M3u8PlaylistResponse, }; -type GenericError = Box; static NOTFOUND: &[u8] = b"Not Found"; pub struct HlsHandler { @@ -109,10 +108,10 @@ impl Service> for HlsHandler { return simple_file_send(fp.as_str()).await; } - if msn_u > seq + 2 { - // sequence too far in future - return Ok(bad_request()); - } + // if msn_u > seq + 2 { + // // sequence too far in future + // return Ok(bad_request()); + // } loop { let m = rc.recv().await; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index 67a8cc23..f9f5ffc3 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -19,6 +19,7 @@ use { pub struct PartialSegment { duration: i64, name: String, + independent: bool, } #[derive(Clone)] @@ -171,6 +172,7 @@ impl M3u8 { &mut self, duration: i64, ts_data: BytesMut, + independent: bool, ) -> Result<(), MediaError> { let (ts_name, ts_path) = self.ts_handler.write(ts_data, true)?; @@ -195,6 +197,7 @@ impl M3u8 { let partial = PartialSegment { duration, name: ts_name.to_owned(), + independent, }; seg.add_partial(partial); @@ -207,6 +210,7 @@ impl M3u8 { let partial = PartialSegment { duration, name: ts_name.to_owned(), + independent, }; println!("partial add {}", &partial.name); @@ -271,9 +275,14 @@ impl M3u8 { if !segment.is_complete { for partial in &segment.partials { m3u8_content += format!( - "#EXT-X-PART:DURATION={:.3},URI=\"{}\"\n", + "#EXT-X-PART:DURATION={:.3},URI=\"{}\"{}\n", partial.duration as f64 / 1000.0, - partial.name + partial.name, + if partial.independent { + ",INDEPENDENT=YES" + } else { + "" + } ) .as_str(); } From 13ca2c2dfd337b62a19070d00f067bcb3fa83763 Mon Sep 17 00:00:00 2001 From: Phineas Date: Mon, 25 Jul 2022 07:03:18 +0100 Subject: [PATCH 09/10] correctly deliver _HLS_msn _HLS_part responses --- protocol/hls/src/flv2hls.rs | 2 - protocol/hls/src/hls_event_manager.rs | 8 +++- protocol/hls/src/hls_request_handler.rs | 56 +++++++++++++++++-------- protocol/hls/src/m3u8.rs | 33 ++++++++++----- protocol/hls/src/ts.rs | 32 +++++++++----- 5 files changed, 89 insertions(+), 42 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index a42095d0..d4783242 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -204,8 +204,6 @@ impl Flv2HlsRemuxer { if self.need_new_partial_segment { let d = self.partial_ts_muxer.get_data(); - println!(" f: {}", flags); - self.m3u8_handler.add_partial_segment( dts - self.last_partial_ts_dts, d, diff --git a/protocol/hls/src/hls_event_manager.rs b/protocol/hls/src/hls_event_manager.rs index e9f55540..d71ddc35 100644 --- a/protocol/hls/src/hls_event_manager.rs +++ b/protocol/hls/src/hls_event_manager.rs @@ -27,7 +27,13 @@ pub type M3u8Consumer = mpsc::Receiver; #[derive(Debug, Clone)] pub enum HlsEvent { Init {}, - HlsSequenceIncr { sequence: u64 }, + HlsSequenceIncr { + sequence: u32, + }, + HlsPartialSegIncr { + parent_seg_num: u32, + partial_seg_num: u32, + }, } pub type HlsEventProducer = broadcast::Sender; diff --git a/protocol/hls/src/hls_request_handler.rs b/protocol/hls/src/hls_request_handler.rs index 5ba38baa..3ffe7941 100644 --- a/protocol/hls/src/hls_request_handler.rs +++ b/protocol/hls/src/hls_request_handler.rs @@ -70,11 +70,11 @@ impl Service> for HlsHandler { let app_name = String::from(rv[1]); let stream_name = String::from(rv[2]); - println!("msn: {:?}", msn); if let Some(msn_s) = msn { // Client wants us to hold the request until media segment number msn is generated - let msn_ur: Result = msn_s.parse(); + let msn_ur: Result = msn_s.parse(); + let msn_p: u32 = part.unwrap_or(&"0".to_owned()).parse().unwrap(); if msn_ur.is_err() { return Box::pin(async { Ok(bad_request()) }); @@ -101,29 +101,51 @@ impl Service> for HlsHandler { mp.send(q).await; - let M3u8PlaylistResponse { sequence_no: seq } = resp_rx.await.unwrap(); + let M3u8PlaylistResponse { + sequence_no: _seq, + ts_number: tsn, + pts_number: ptsn, + } = resp_rx.await.unwrap(); - if seq > msn_u { - // sequence already exists + println!( + "S_tsn: {} R_tsn: {}, S_ptsn: {}, R_ptsn: {}", + tsn, msn_u, ptsn, msn_p + ); + + if tsn > msn_u || (tsn >= msn_u && (msn_p < 1 || msn_p <= ptsn)) { + // segment already exists return simple_file_send(fp.as_str()).await; } - // if msn_u > seq + 2 { - // // sequence too far in future - // return Ok(bad_request()); - // } + if msn_u > tsn + 2 { + // sequence too far in future + return Ok(bad_request()); + } loop { let m = rc.recv().await; - if let Ok(HlsEvent::HlsSequenceIncr { sequence: seq }) = m { - if seq != msn_u { - continue; - }; - - break; - } else { - continue; + match m { + Ok(HlsEvent::HlsSequenceIncr { sequence: seq }) => { + if seq != msn_u { + continue; + }; + + break; + } + Ok(HlsEvent::HlsPartialSegIncr { + parent_seg_num, + partial_seg_num, + }) => { + if msn_p < 1 { + continue; + } else if partial_seg_num >= msn_p + && parent_seg_num == msn_u + { + break; + } + } + _ => continue, } } diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index f9f5ffc3..6c800c8c 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -57,7 +57,8 @@ impl Segment { } } - pub fn set_complete(&mut self) { + pub fn set_complete(&mut self, duration: i64) { + self.duration = duration; self.is_complete = true; } @@ -68,6 +69,8 @@ impl Segment { pub struct M3u8PlaylistResponse { pub sequence_no: u64, + pub ts_number: u32, + pub pts_number: u32, } pub struct M3u8 { @@ -124,6 +127,8 @@ impl M3u8 { pub fn setup_m3u8_listener(&self, mut m3u8_consumer: M3u8Consumer) { let seq = Arc::clone(&self.sequence_no); + let tsn = Arc::clone(&self.ts_handler.ts_number); + let ptsn = Arc::clone(&self.ts_handler.pts_number); tokio::spawn(async move { while let Some(cmd) = m3u8_consumer.recv().await { @@ -132,6 +137,8 @@ impl M3u8 { RequestPlaylist { channel: c } => { c.send(M3u8PlaylistResponse { sequence_no: *seq.read().unwrap(), + ts_number: *tsn.read().unwrap(), + pts_number: *ptsn.read().unwrap(), }) .unwrap_or_default(); } @@ -151,17 +158,16 @@ impl M3u8 { if self.is_live && segment_count >= self.live_ts_count { let segment = self.segments.pop_front().unwrap(); - // self.ts_handler.delete(segment.path); + self.ts_handler.delete(segment.path); + let mut s = self.sequence_no.write().unwrap(); + *s += 1; } - let mut s = self.sequence_no.write().unwrap(); - *s += 1; - self.duration = std::cmp::max(duration, self.duration); self.ts_handler.write(ts_data, false)?; // let segment = Segment::new(duration, discontinuity, ts_name, ts_path, is_eof, false); - self.segments.back_mut().unwrap().set_complete(); + self.segments.back_mut().unwrap().set_complete(duration); // self.segments.push_back(segment); @@ -174,7 +180,7 @@ impl M3u8 { ts_data: BytesMut, independent: bool, ) -> Result<(), MediaError> { - let (ts_name, ts_path) = self.ts_handler.write(ts_data, true)?; + let (ts_name, ts_path, ts_num) = self.ts_handler.write(ts_data, true)?; let cur_seg = self.segments.back_mut(); @@ -188,7 +194,7 @@ impl M3u8 { let mut seg = Segment::new( duration, false, - format!("{}.ts", self.sequence_no.read().unwrap()), + format!("{}.ts", ts_num), ts_path, false, false, @@ -213,8 +219,6 @@ impl M3u8 { independent, }; - println!("partial add {}", &partial.name); - seg.add_partial(partial); } } @@ -309,7 +313,14 @@ impl M3u8 { if broadcast_new_msn { self.hls_event_tx .send(HlsEvent::HlsSequenceIncr { - sequence: *self.sequence_no.read().unwrap(), + sequence: *self.ts_handler.ts_number.read().unwrap(), + }) + .unwrap_or_default(); + } else { + self.hls_event_tx + .send(HlsEvent::HlsPartialSegIncr { + parent_seg_num: *self.ts_handler.ts_number.read().unwrap(), + partial_seg_num: *self.ts_handler.pts_number.read().unwrap(), }) .unwrap_or_default(); } diff --git a/protocol/hls/src/ts.rs b/protocol/hls/src/ts.rs index af0bd8c9..7a9af553 100644 --- a/protocol/hls/src/ts.rs +++ b/protocol/hls/src/ts.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, RwLock}; + use { super::errors::MediaError, bytes::BytesMut, @@ -5,8 +7,8 @@ use { }; pub struct Ts { - ts_number: u32, - pts_number: u32, + pub ts_number: Arc>, + pub pts_number: Arc>, folder_name: String, } @@ -16,21 +18,29 @@ impl Ts { fs::create_dir_all(folder_name.clone()).unwrap(); Self { - ts_number: 0, - pts_number: 0, + ts_number: Arc::new(RwLock::new(0)), + pts_number: Arc::new(RwLock::new(0)), folder_name, } } - pub fn write(&mut self, data: BytesMut, partial: bool) -> Result<(String, String), MediaError> { + pub fn write( + &mut self, + data: BytesMut, + partial: bool, + ) -> Result<(String, String, u32), MediaError> { + let mut pts_number = self.pts_number.write().unwrap(); + let mut ts_number = self.ts_number.write().unwrap(); + let ts_file_name = format!( "{}{}.ts", - self.ts_number.clone(), + (*ts_number).clone(), if partial { - self.pts_number += 1; - format!(".{}", self.pts_number) + *pts_number += 1; + format!(".{}", pts_number) } else { - self.pts_number = 0; - self.ts_number += 1; + *pts_number = 0; + + *ts_number += 1; String::from("") }, ); @@ -39,7 +49,7 @@ impl Ts { let mut ts_file_handler = File::create(ts_file_path.clone())?; ts_file_handler.write_all(&data[..])?; - Ok((ts_file_name, ts_file_path)) + Ok((ts_file_name, ts_file_path, *ts_number)) } pub fn delete(&mut self, ts_file_name: String) { fs::remove_file(ts_file_name).unwrap(); From 9a21270541f7fb015aab99739e6d115eb3bad4c2 Mon Sep 17 00:00:00 2001 From: Phineas Date: Mon, 25 Jul 2022 07:28:00 +0100 Subject: [PATCH 10/10] EXT-X-PRELOAD-HINT. WORKING LLHLS!!! --- protocol/hls/src/flv2hls.rs | 38 ++++++++++++++++++------------------- protocol/hls/src/m3u8.rs | 10 ++++++++++ 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index d4783242..89c28692 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -179,11 +179,13 @@ impl Flv2HlsRemuxer { if data.frame_type == frame_type::KEY_FRAME { flags = MPEG_FLAG_IDR_FRAME; self.segment_has_idr = true; + + if dts - self.last_ts_dts >= self.duration * 1000 { + self.need_new_segment = true; + } } - if dts - self.last_ts_dts >= self.duration * 1000 { - self.need_new_segment = true; - } else if dts - self.last_partial_ts_dts >= self.partial_seg_duration { + if dts - self.last_partial_ts_dts >= self.partial_seg_duration { self.need_new_partial_segment = true; } } @@ -201,22 +203,6 @@ impl Flv2HlsRemuxer { _ => return Ok(()), } - if self.need_new_partial_segment { - let d = self.partial_ts_muxer.get_data(); - - self.m3u8_handler.add_partial_segment( - dts - self.last_partial_ts_dts, - d, - self.segment_has_idr, - )?; - self.m3u8_handler.refresh_playlist(false)?; - - self.partial_ts_muxer.reset(); - self.last_partial_ts_dts = dts; - self.need_new_partial_segment = false; - self.segment_has_idr = false; - } - if self.need_new_segment { let mut discontinuity: bool = false; if dts > self.last_ts_dts + 15 * 1000 { @@ -233,6 +219,20 @@ impl Flv2HlsRemuxer { self.last_ts_pts = pts; self.need_new_segment = false; self.segment_has_idr = false; + } else if self.need_new_partial_segment { + let d = self.partial_ts_muxer.get_data(); + + self.m3u8_handler.add_partial_segment( + dts - self.last_partial_ts_dts, + d, + self.segment_has_idr, + )?; + self.m3u8_handler.refresh_playlist(false)?; + + self.partial_ts_muxer.reset(); + self.last_partial_ts_dts = dts; + self.need_new_partial_segment = false; + self.segment_has_idr = false; } self.last_dts = dts; diff --git a/protocol/hls/src/m3u8.rs b/protocol/hls/src/m3u8.rs index 6c800c8c..4d26f8df 100644 --- a/protocol/hls/src/m3u8.rs +++ b/protocol/hls/src/m3u8.rs @@ -24,6 +24,7 @@ pub struct PartialSegment { #[derive(Clone)] pub struct Segment { + seq: u32, /*ts duration*/ duration: i64, discontinuity: bool, @@ -39,6 +40,7 @@ pub struct Segment { impl Segment { pub fn new( + seq: u32, duration: i64, discontinuity: bool, name: String, @@ -47,6 +49,7 @@ impl Segment { is_complete: bool, ) -> Self { Self { + seq, duration, discontinuity, name, @@ -192,6 +195,7 @@ impl M3u8 { // needs new segment let mut seg = Segment::new( + ts_num, duration, false, format!("{}.ts", ts_num), @@ -290,6 +294,12 @@ impl M3u8 { ) .as_str(); } + + m3u8_content += format!( + "#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"{}\"\n", + format!("{}.{}.ts", segment.seq, &segment.partials.len() + 1) + ) + .as_str(); } else { m3u8_content += format!( "#EXTINF:{:.3},\n{}\n",