From d042e1545ff2e02ead37f63d067975d07642aa37 Mon Sep 17 00:00:00 2001 From: Giang Minh Date: Mon, 15 Jul 2024 15:53:34 +0700 Subject: [PATCH] convert raw record to independent audio and video tracks with summary --- .github/workflows/release.yml | 8 +- Cargo.lock | 1 + packages/media_record/Cargo.toml | 11 +-- .../{convert_webm.rs => convert_record.rs} | 68 ++++++++++++-- packages/media_record/src/lib.rs | 4 +- packages/media_record/src/media.rs | 90 ++++++++++--------- packages/media_record/src/media/vpx_writer.rs | 79 ++++++++++------ 7 files changed, 176 insertions(+), 85 deletions(-) rename packages/media_record/bin/{convert_webm.rs => convert_record.rs} (56%) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6737fbce..703b27a0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -217,7 +217,7 @@ jobs: with: use-cross: ${{ matrix.cross }} command: build - args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_webm + args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record - name: Rename server if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} @@ -227,7 +227,7 @@ jobs: - name: Rename record if: ${{ matrix.build_record_tool && matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} run: | - mv ./target/${{ matrix.target }}/release/convert_webm${{ matrix.extension }} convert_webm-${{ matrix.target }}${{ matrix.extension }} + mv ./target/${{ matrix.target }}/release/convert_record${{ matrix.extension }} convert_record-${{ matrix.target }}${{ matrix.extension }} - name: Upload Artifact to Summary if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} @@ -252,8 +252,8 @@ jobs: uses: svenstaro/upload-release-action@v2 with: repo_token: ${{ secrets.GITHUB_TOKEN }} - file: convert_webm-${{ matrix.target }}${{ matrix.extension }} - asset_name: convert_webm-${{ matrix.target }}${{ matrix.extension }} + file: convert_record-${{ matrix.target }}${{ matrix.extension }} + asset_name: convert_record-${{ matrix.target }}${{ matrix.extension }} tag: ${{ github.ref }} overwrite: true diff --git a/Cargo.lock b/Cargo.lock index 2784cb1b..3237bbae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2953,6 +2953,7 @@ dependencies = [ "rtp", "rusty-s3", "serde", + "serde_json", "surf", "tokio", "tokio-util", diff --git a/packages/media_record/Cargo.toml b/packages/media_record/Cargo.toml index a9dccc0f..6440fea3 100644 --- a/packages/media_record/Cargo.toml +++ b/packages/media_record/Cargo.toml @@ -20,15 +20,16 @@ webm = { version = "1.1.2", optional = true } rtp = { version = "0.11.0", optional = true } clap = { version = "4.5", features = ["env", "derive"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } +serde_json = "1.0.120" [features] -default = ["convert_webm"] -convert_webm = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"] +default = ["convert_record"] +convert_record = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"] [dev-dependencies] tokio = { version = "1", features = ["full"] } [[bin]] -name = "convert_webm" -path = "./bin/convert_webm.rs" -required-features = ["convert_webm"] +name = "convert_record" +path = "./bin/convert_record.rs" +required-features = ["convert_record"] diff --git a/packages/media_record/bin/convert_webm.rs b/packages/media_record/bin/convert_record.rs similarity index 56% rename from packages/media_record/bin/convert_webm.rs rename to packages/media_record/bin/convert_record.rs index b2708fd7..2186caed 100644 --- a/packages/media_record/bin/convert_webm.rs +++ b/packages/media_record/bin/convert_record.rs @@ -1,8 +1,10 @@ +use std::{collections::HashMap, io::Write}; + use clap::Parser; use media_server_record::{RoomReader, SessionMediaWriter}; use media_server_utils::CustomUri; use rusty_s3::{Bucket, Credentials, UrlStyle}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::channel; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -37,6 +39,33 @@ fn convert_s3_uri(uri: &str) -> (Bucket, Credentials, String) { (s3, credentials, s3_sub_folder) } +#[derive(Serialize)] +struct TrackTimeline { + path: String, + start: u64, + end: Option, +} + +#[derive(Default, Serialize)] +struct TrackSummary { + timeline: Vec, +} + +#[derive(Default, Serialize)] +struct SessionSummary { + track: HashMap, +} + +#[derive(Default, Serialize)] +struct PeerSummary { + sessions: HashMap, +} + +#[derive(Default, Serialize)] +struct RecordSummary { + peers: HashMap, +} + #[tokio::main] async fn main() { if std::env::var_os("RUST_LOG").is_none() { @@ -49,6 +78,7 @@ async fn main() { tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); let (s3, credentials, s3_sub_folder) = convert_s3_uri(&args.uri); + let mut record_summary = RecordSummary { peers: HashMap::new() }; let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder); let peers = room_reader.peers().await.unwrap(); //we use channel to wait all sessions @@ -68,17 +98,43 @@ async fn main() { session.connect().await.expect("Should connect session record folder"); while let Some(row) = session.recv().await { log::debug!("push session {session_id} pkt {}", row.ts); - media.push(row); + if let Some(event) = media.push(row) { + tx.send((peer_id.clone(), session_id, event)).await.expect("Should send to main"); + } } - - tx.send(session.path()).await.expect("Should send to main"); log::info!("end session {session_id} loop"); }); } } drop(tx); - while let Some(session) = rx.recv().await { - log::info!("done {session}"); + while let Some((peer_id, session_id, event)) = rx.recv().await { + let peer = record_summary.peers.entry(peer_id).or_default(); + let session = peer.sessions.entry(session_id).or_default(); + match event { + media_server_record::Event::TrackStart(name, ts, path) => { + let track = session.track.entry(name.0).or_default(); + track.timeline.push(TrackTimeline { path, start: ts, end: None }); + } + media_server_record::Event::TrackStop(name, ts) => { + if let Some(track) = session.track.get_mut(&name.0) { + if let Some(timeline) = track.timeline.last_mut() { + if timeline.end.is_none() { + timeline.end = Some(ts); + } else { + log::warn!("timeline end not empty"); + } + } else { + log::warn!("track stop but timeline not found"); + } + } else { + log::warn!("track stop but track not found"); + } + } + } } + + let summary_json = serde_json::to_string(&record_summary).expect("Should convert to json"); + let mut summary_fs = std::fs::File::create("./meta.json").expect("Should create file"); + summary_fs.write_all(summary_json.as_bytes()).expect("Should write meta to file"); } diff --git a/packages/media_record/src/lib.rs b/packages/media_record/src/lib.rs index cb54aca8..d5657454 100644 --- a/packages/media_record/src/lib.rs +++ b/packages/media_record/src/lib.rs @@ -9,14 +9,14 @@ use storage::{memory::MemoryFile, FileId, RecordFile}; use tokio::sync::mpsc::Sender; use worker::UploadWorker; -#[cfg(feature="convert_webm")] +#[cfg(feature="convert_record")] mod media; mod raw_record; mod session; mod storage; mod worker; -#[cfg(feature="convert_webm")] +#[cfg(feature="convert_record")] pub use media::*; pub use raw_record::*; diff --git a/packages/media_record/src/media.rs b/packages/media_record/src/media.rs index bb7c5b34..ee5a0020 100644 --- a/packages/media_record/src/media.rs +++ b/packages/media_record/src/media.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, fs::File}; use media_server_protocol::{ + endpoint::{TrackMeta, TrackName}, + media::MediaPacket, record::{SessionRecordEvent, SessionRecordRow}, transport::RemoteTrackId, }; @@ -9,20 +11,19 @@ use vpx_writer::VpxWriter; mod vpx_demuxer; mod vpx_writer; -struct TrackWriter { - writer: usize, +trait TrackWriter { + fn push_media(&mut self, pkt_ms: u64, pkt: MediaPacket); } -struct WriterContainer { - writer: VpxWriter, - audio_inuse: bool, - video_inuse: bool, +pub enum Event { + TrackStart(TrackName, u64, String), + TrackStop(TrackName, u64), } pub struct SessionMediaWriter { path: String, - writers: Vec, - tracks: HashMap, + tracks_meta: HashMap, + tracks_writer: HashMap>, } impl SessionMediaWriter { @@ -30,55 +31,60 @@ impl SessionMediaWriter { log::info!("new session media writer {path}"); Self { path: path.to_string(), - writers: vec![], - tracks: HashMap::new(), + tracks_meta: HashMap::new(), + tracks_writer: HashMap::new(), } } - fn get_free_writer_for(&mut self, ts: u64, is_audio: bool) -> usize { - for (index, writer) in self.writers.iter().enumerate() { - if (is_audio && !writer.audio_inuse) || (!is_audio && !writer.video_inuse) { - return index; - } - } - let index = self.writers.len(); - let path = format!("{}{}-{}.webm", self.path, index, ts); - let writer = VpxWriter::new(File::create(&path).expect("Should open file"), ts); - self.writers.push(WriterContainer { - writer, - audio_inuse: false, - video_inuse: false, - }); - index - } - - pub fn push(&mut self, event: SessionRecordRow) { + pub fn push(&mut self, event: SessionRecordRow) -> Option { match event.event { SessionRecordEvent::TrackStarted(id, name, meta) => { log::info!("track {:?} started, name {name} meta {:?}", id, meta); + self.tracks_meta.insert(id, (name, meta)); + None } SessionRecordEvent::TrackStopped(id) => { log::info!("track {:?} stopped", id); + let (name, _) = self.tracks_meta.remove(&id)?; + self.tracks_writer.remove(&id)?; + Some(Event::TrackStop(name, event.ts)) } SessionRecordEvent::TrackMedia(id, media) => { - if !self.tracks.contains_key(&id) { - let writer = self.get_free_writer_for(event.ts, media.meta.is_audio()); - if media.meta.is_audio() { - self.writers[writer].audio_inuse = true; + let out = if !self.tracks_writer.contains_key(&id) { + if let Some((name, _meta)) = self.tracks_meta.get(&id) { + let (file_path, writer): (String, Box) = match &media.meta { + media_server_protocol::media::MediaMeta::Opus { .. } => { + let file_path = format!("{}-opus-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + media_server_protocol::media::MediaMeta::H264 { .. } => todo!(), + media_server_protocol::media::MediaMeta::Vp8 { .. } => { + let file_path = format!("{}-vp8-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + media_server_protocol::media::MediaMeta::Vp9 { .. } => { + let file_path = format!("{}-vp9-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + }; + log::info!("create writer for track {name}"); + self.tracks_writer.insert(id, writer); + Some(Event::TrackStart(name.clone(), event.ts, file_path)) } else { - self.writers[writer].video_inuse = true; + log::warn!("missing track info for pkt form track {:?}", id); + return None; } - log::info!("write track {:?} to writer {writer}", id); - self.tracks.insert(id, TrackWriter { writer }); - } - let track = self.tracks.get_mut(&id).expect("Should have track here"); - if media.meta.is_audio() { - self.writers[track.writer].writer.push_opus(event.ts, media); } else { - self.writers[track.writer].writer.push_vpx(event.ts, media); - } + None + }; + let writer = self.tracks_writer.get_mut(&id).expect("Should have track here"); + writer.push_media(event.ts, media); + out } - _ => {} + _ => None, } } } diff --git a/packages/media_record/src/media/vpx_writer.rs b/packages/media_record/src/media/vpx_writer.rs index 140ee4f7..fda6be07 100644 --- a/packages/media_record/src/media/vpx_writer.rs +++ b/packages/media_record/src/media/vpx_writer.rs @@ -3,47 +3,74 @@ use std::io::{Seek, Write}; use media_server_protocol::media::MediaPacket; use webm::mux::{AudioCodecId, AudioTrack, Segment, Track, VideoCodecId, VideoTrack, Writer}; -use super::vpx_demuxer::VpxDemuxer; +use super::{vpx_demuxer::VpxDemuxer, TrackWriter}; pub struct VpxWriter { - webm: Segment>, + webm: Option>>, audio: Option, video: Option<(VideoTrack, VpxDemuxer)>, start_ts: u64, + last_ts: u64, } impl VpxWriter { pub fn new(writer: W, start_ts: u64) -> Self { - let mut webm = Segment::new(Writer::new(writer)).expect("Should create webm"); - //We must have audio before video - let audio = Some(webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); - Self { webm, audio, video: None, start_ts } - } - - pub fn push_opus(&mut self, pkt_ms: u64, pkt: MediaPacket) { - let delta_ts = pkt_ms - self.start_ts; - if self.audio.is_none() { - self.audio = Some(self.webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); + let webm = Segment::new(Writer::new(writer)).expect("Should create webm"); + Self { + webm: Some(webm), + audio: None, + video: None, + start_ts, + last_ts: start_ts, } - let audio = self.audio.as_mut().expect("Should have audio"); - audio.add_frame(&pkt.data, delta_ts * 1000_000, true); } +} - pub fn push_vpx(&mut self, pkt_ms: u64, pkt: MediaPacket) { +impl TrackWriter for VpxWriter { + fn push_media(&mut self, pkt_ms: u64, pkt: MediaPacket) { let delta_ts = pkt_ms - self.start_ts; - if self.video.is_none() { - let codec = match &pkt.meta { - media_server_protocol::media::MediaMeta::Vp8 { .. } => VideoCodecId::VP8, - media_server_protocol::media::MediaMeta::Vp9 { .. } => VideoCodecId::VP9, - _ => panic!("Wrong codec, should be vp8 or vp9"), - }; - let demuxer = VpxDemuxer::new(); - self.video = Some((self.webm.add_video_track(0, 0, None, codec), demuxer)); + self.last_ts = pkt_ms; + if pkt.meta.is_audio() { + if self.audio.is_none() { + if let Some(webm) = &mut self.webm { + self.audio = Some(webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); + } else { + log::warn!("Webm instant destroyed"); + return; + } + } + let audio = self.audio.as_mut().expect("Should have audio"); + audio.add_frame(&pkt.data, delta_ts * 1000_000, true); + } else { + if self.video.is_none() { + let codec = match &pkt.meta { + media_server_protocol::media::MediaMeta::Vp8 { .. } => VideoCodecId::VP8, + media_server_protocol::media::MediaMeta::Vp9 { .. } => VideoCodecId::VP9, + _ => panic!("Wrong codec, should be vp8 or vp9"), + }; + let demuxer = VpxDemuxer::new(); + if let Some(webm) = &mut self.webm { + self.video = Some((webm.add_video_track(100, 100, None, codec), demuxer)); + } else { + log::warn!("Webm instant destroyed"); + return; + } + } + + let (video, demuxer) = self.video.as_mut().expect("Should have video"); + if let Some((key, data)) = demuxer.push(pkt) { + video.add_frame(&data, delta_ts * 1000_000, key); + } } + } +} - let (video, demuxer) = self.video.as_mut().expect("Should have video"); - if let Some((key, data)) = demuxer.push(pkt) { - video.add_frame(&data, delta_ts * 1000_000, key); +impl Drop for VpxWriter { + fn drop(&mut self) { + if let Some(webm) = self.webm.take() { + if let Err(_e) = webm.try_finalize(Some(self.last_ts - self.start_ts)) { + log::error!("Close VpxWriter failed"); + } } } }