From 8d18a1d2355d0031eb2b05fd81dad59f9542ae63 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:45:49 +0700 Subject: [PATCH] feat: convert record to separated media files and push to s3 (#351) * convert raw record to independent audio and video tracks with summary * upload to s3 * fix missing track ended event in record * allow convert to local or s3 * fixing typos * update deps --- .github/workflows/release.yml | 8 +- Cargo.lock | 118 +++++------ packages/media_core/src/endpoint/internal.rs | 2 +- packages/media_record/Cargo.toml | 11 +- packages/media_record/bin/convert_record.rs | 190 ++++++++++++++++++ packages/media_record/bin/convert_webm.rs | 84 -------- packages/media_record/src/lib.rs | 4 +- packages/media_record/src/media.rs | 90 +++++---- packages/media_record/src/media/vpx_writer.rs | 79 +++++--- packages/media_record/src/storage/mod.rs | 2 +- 10 files changed, 364 insertions(+), 224 deletions(-) create mode 100644 packages/media_record/bin/convert_record.rs delete mode 100644 packages/media_record/bin/convert_webm.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6737fbce..703b27a0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -217,7 +217,7 @@ jobs: with: use-cross: ${{ matrix.cross }} command: build - args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_webm + args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record - name: Rename server if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} @@ -227,7 +227,7 @@ jobs: - name: Rename record if: ${{ matrix.build_record_tool && matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} run: | - mv ./target/${{ matrix.target }}/release/convert_webm${{ matrix.extension }} convert_webm-${{ matrix.target }}${{ matrix.extension }} + mv ./target/${{ matrix.target }}/release/convert_record${{ matrix.extension }} convert_record-${{ matrix.target }}${{ matrix.extension }} - name: Upload Artifact to Summary if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }} @@ -252,8 +252,8 @@ jobs: uses: svenstaro/upload-release-action@v2 with: repo_token: ${{ secrets.GITHUB_TOKEN }} - file: convert_webm-${{ matrix.target }}${{ matrix.extension }} - asset_name: convert_webm-${{ matrix.target }}${{ matrix.extension }} + file: convert_record-${{ matrix.target }}${{ matrix.extension }} + asset_name: convert_record-${{ matrix.target }}${{ matrix.extension }} tag: ${{ github.ref }} overwrite: true diff --git a/Cargo.lock b/Cargo.lock index 767ec767..3b33bd3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,9 +264,9 @@ dependencies = [ [[package]] name = "async-executor" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8828ec6e544c02b0d6691d21ed9f9218d0384a82542855073c2a3f58304aaf0" +checksum = "d7ebdfa2ebdab6b1760375fa7d6f382b9f486eac35fc994625a00e89280bdbb7" dependencies = [ "async-task", "concurrent-queue", @@ -394,7 +394,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -411,7 +411,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -673,7 +673,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.70", + "syn 2.0.71", "which", ] @@ -772,7 +772,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "syn_derive", ] @@ -843,13 +843,12 @@ checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "cc" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -983,7 +982,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1315,7 +1314,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1339,7 +1338,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1350,7 +1349,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1427,7 +1426,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1800,7 +1799,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2174,9 +2173,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes 1.6.1", "http 1.1.0", @@ -2401,7 +2400,7 @@ dependencies = [ "libflate", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2428,7 +2427,7 @@ checksum = "0122b7114117e64a63ac49f752a5ca4624d534c7b1c7de796ac196381cd2d947" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2953,6 +2952,7 @@ dependencies = [ "rtp", "rusty-s3", "serde", + "serde_json", "surf", "tokio", "tokio-util", @@ -3086,7 +3086,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3374,7 +3374,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3421,7 +3421,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3482,7 +3482,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3539,7 +3539,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.2", + "redox_syscall 0.5.3", "smallvec", "windows-targets 0.52.6", ] @@ -3615,7 +3615,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3694,7 +3694,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3799,7 +3799,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3841,7 +3841,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.70", + "syn 2.0.71", "thiserror", ] @@ -3950,7 +3950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4037,7 +4037,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.70", + "syn 2.0.71", "tempfile", ] @@ -4051,7 +4051,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4290,9 +4290,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -4520,7 +4520,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.70", + "syn 2.0.71", "walkdir", ] @@ -4724,7 +4724,7 @@ dependencies = [ [[package]] name = "rusty-s3" version = "0.5.0" -source = "git+https://github.com/giangndm/rusty-s3.git?branch=main#d56dfb3eb48765716b99a908dbf52231842df24a" +source = "git+https://github.com/giangndm/rusty-s3.git?branch=main#cc9ecec126930d362dbe17be4900595d8dbce81f" dependencies = [ "base64 0.21.7", "hmac 0.12.1", @@ -4832,7 +4832,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4890,7 +4890,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.70", + "syn 2.0.71", "unicode-ident", ] @@ -4954,7 +4954,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "thiserror", ] @@ -5003,9 +5003,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -5017,9 +5017,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -5073,7 +5073,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5732,9 +5732,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.70" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -5750,7 +5750,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5846,22 +5846,22 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5960,9 +5960,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes 1.6.1", @@ -5985,7 +5985,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6097,7 +6097,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6469,7 +6469,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -6503,7 +6503,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6874,7 +6874,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6894,5 +6894,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index 1d8ea75b..ffdcf639 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -244,6 +244,7 @@ impl EndpointInternal { } TransportState::Disconnected(err) => { log::info!("[EndpointInternal] disconnected {:?}", err); + self.leave_room(now); self.queue.push_back(InternalOutput::PeerEvent( now, peer_event::Event::Disconnected(peer_event::Disconnected { duration_ms: 0, reason: 0 }), //TODO provide correct reason @@ -251,7 +252,6 @@ impl EndpointInternal { if self.cfg.record { self.queue.push_back(InternalOutput::RecordEvent(now, SessionRecordEvent::Disconnected)); } - self.leave_room(now); self.queue.push_back(InternalOutput::Destroy); } } diff --git a/packages/media_record/Cargo.toml b/packages/media_record/Cargo.toml index a9dccc0f..6440fea3 100644 --- a/packages/media_record/Cargo.toml +++ b/packages/media_record/Cargo.toml @@ -20,15 +20,16 @@ webm = { version = "1.1.2", optional = true } rtp = { version = "0.11.0", optional = true } clap = { version = "4.5", features = ["env", "derive"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } +serde_json = "1.0.120" [features] -default = ["convert_webm"] -convert_webm = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"] +default = ["convert_record"] +convert_record = ["tokio/full", "tracing-subscriber", "webm", "rtp", "clap", "serde"] [dev-dependencies] tokio = { version = "1", features = ["full"] } [[bin]] -name = "convert_webm" -path = "./bin/convert_webm.rs" -required-features = ["convert_webm"] +name = "convert_record" +path = "./bin/convert_record.rs" +required-features = ["convert_record"] diff --git a/packages/media_record/bin/convert_record.rs b/packages/media_record/bin/convert_record.rs new file mode 100644 index 00000000..0c7f3442 --- /dev/null +++ b/packages/media_record/bin/convert_record.rs @@ -0,0 +1,190 @@ +//! +//! Convert record util download all raw record chunks and convert to some independent track video files. +//! The file is created at local at first then upload to s3, after upload to s3 successfully, it will be removed in local. +//! TODO: avoid using local file, may be we have way to do-it in-memory buffer then upload in-air to s3. +//! + +use std::{collections::HashMap, time::Duration}; + +use clap::Parser; +use media_server_record::{RoomReader, SessionMediaWriter}; +use media_server_utils::CustomUri; +use rusty_s3::{Bucket, Credentials, S3Action, UrlStyle}; +use serde::{Deserialize, Serialize}; +use surf::Body; +use tokio::sync::mpsc::channel; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +#[derive(Deserialize, Clone)] +struct S3Options { + pub path_style: Option, + pub region: Option, +} + +/// Record file converter for atm0s-media-server. +/// This tool allow convert room raw record to multiple webm files. +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// S3 Source + #[arg(env, long)] + in_s3: String, + + /// S3 Dest + #[arg(env, long)] + out_s3: Option, + + /// Folder Dest + #[arg(env, long)] + out_path: Option, +} + +fn convert_s3_uri(uri: &str) -> (Bucket, Credentials, String) { + let s3_endpoint = CustomUri::::try_from(uri).expect("Should parse s3 uri"); + let url_style = if s3_endpoint.query.path_style == Some(true) { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }; + + let s3_bucket = s3_endpoint.path[0].clone(); + let s3_sub_folder = s3_endpoint.path[1..].join("/"); + let s3 = Bucket::new(s3_endpoint.endpoint.parse().unwrap(), url_style, s3_bucket, s3_endpoint.query.region.unwrap_or("".to_string())).unwrap(); + let credentials = Credentials::new(s3_endpoint.username.expect("Should have s3 accesskey"), s3_endpoint.password.expect("Should have s3 secretkey")); + (s3, credentials, s3_sub_folder) +} + +#[derive(Serialize)] +struct TrackTimeline { + path: String, + start: u64, + end: Option, +} + +#[derive(Default, Serialize)] +struct TrackSummary { + timeline: Vec, +} + +#[derive(Default, Serialize)] +struct SessionSummary { + track: HashMap, +} + +#[derive(Default, Serialize)] +struct PeerSummary { + sessions: HashMap, +} + +#[derive(Default, Serialize)] +struct RecordSummary { + peers: HashMap, +} + +#[tokio::main] +async fn main() { + if std::env::var_os("RUST_LOG").is_none() { + std::env::set_var("RUST_LOG", "info"); + } + if std::env::var_os("RUST_BACKTRACE").is_none() { + std::env::set_var("RUST_BACKTRACE", "1"); + } + let args: Args = Args::parse(); + tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); + let (s3, credentials, s3_sub_folder) = convert_s3_uri(&args.in_s3); + + let temp_folder_str = args.out_path.unwrap_or_default(); + let temp_folder = std::path::Path::new(&temp_folder_str); + std::fs::create_dir_all(temp_folder).expect("Should create output folder"); + let mut record_summary = RecordSummary { peers: HashMap::new() }; + let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder); + let peers = room_reader.peers().await.unwrap(); + //we use channel to wait all sessions + let (tx, mut rx) = channel(1); + for peer in peers { + let peer_id = peer.peer(); + log::info!("got peer {peer_id}"); + let sessions = peer.sessions().await.unwrap(); + for mut session in sessions { + let peer_id = peer_id.clone(); + let session_id = session.id(); + let session_folder = temp_folder.join(format!("{}-{}-", peer_id, session_id)); + log::info!("got session {session_id}"); + let tx = tx.clone(); + tokio::spawn(async move { + log::info!("start session {session_id} loop"); + let mut media = SessionMediaWriter::new(session_folder.to_str().expect("Should convert path to str")); + session.connect().await.expect("Should connect session record folder"); + while let Some(row) = session.recv().await { + log::debug!("push session {session_id} pkt {}", row.ts); + if let Some(event) = media.push(row) { + tx.send((peer_id.clone(), session_id, event)).await.expect("Should send to main"); + } + } + log::info!("end session {session_id} loop"); + }); + } + } + drop(tx); + + while let Some((peer_id, session_id, event)) = rx.recv().await { + let peer = record_summary.peers.entry(peer_id).or_default(); + let session = peer.sessions.entry(session_id).or_default(); + match event { + media_server_record::Event::TrackStart(name, ts, path) => { + let track = session.track.entry(name.0).or_default(); + track.timeline.push(TrackTimeline { path, start: ts, end: None }); + } + media_server_record::Event::TrackStop(name, ts) => { + if let Some(track) = session.track.get_mut(&name.0) { + if let Some(timeline) = track.timeline.last_mut() { + if timeline.end.is_none() { + timeline.end = Some(ts); + } else { + log::warn!("timeline end not empty"); + } + } else { + log::warn!("track stop but timeline not found"); + } + } else { + log::warn!("track stop but track not found"); + } + } + } + } + + let summary_json = serde_json::to_string(&record_summary).expect("Should convert to json"); + + if let Some(out_s3) = args.out_s3 { + let (s3, credentials, s3_sub_folder) = convert_s3_uri(&out_s3); + let out_folder = std::path::Path::new(&s3_sub_folder); + + let summary_path = out_folder.join("summary.json"); + let summary_key = summary_path.to_str().expect("Should convert"); + let summary_put_obj = s3.put_object(Some(&credentials), summary_key); + let summary_put_url = summary_put_obj.sign(Duration::from_secs(3600)); + surf::put(summary_put_url).body(Body::from_string(summary_json)).await.expect("Should upload summary to s3"); + + for (_, peer) in record_summary.peers { + for (_, session) in peer.sessions { + for (_, track) in session.track { + for timeline in track.timeline { + let path = out_folder.join(&timeline.path); + let key = path.to_str().expect("Should convert"); + let put_obj = s3.put_object(Some(&credentials), key); + let put_url = put_obj.sign(Duration::from_secs(3600)); + surf::put(put_url) + .body(Body::from_file(&timeline.path).await.expect("Should open file")) + .await + .expect("Should upload to s3"); + //remove file after upload success + tokio::fs::remove_file(&timeline.path).await.expect("Should remove file after upload"); + } + } + } + } + } else { + let summary_out = temp_folder.join("summary.json"); + std::fs::write(summary_out.to_str().expect("Should convert path to str"), &summary_json).expect("Should write summary.json file"); + } +} diff --git a/packages/media_record/bin/convert_webm.rs b/packages/media_record/bin/convert_webm.rs deleted file mode 100644 index b2708fd7..00000000 --- a/packages/media_record/bin/convert_webm.rs +++ /dev/null @@ -1,84 +0,0 @@ -use clap::Parser; -use media_server_record::{RoomReader, SessionMediaWriter}; -use media_server_utils::CustomUri; -use rusty_s3::{Bucket, Credentials, UrlStyle}; -use serde::Deserialize; -use tokio::sync::mpsc::channel; -use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; - -#[derive(Deserialize, Clone)] -struct S3Options { - pub path_style: Option, - pub region: Option, -} - -/// Record file converter for atm0s-media-server. -/// This tool allow convert room raw record to multiple webm files. -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -struct Args { - /// Http port - #[arg(env, long)] - uri: String, -} - -fn convert_s3_uri(uri: &str) -> (Bucket, Credentials, String) { - let s3_endpoint = CustomUri::::try_from(uri).expect("Should parse s3 uri"); - let url_style = if s3_endpoint.query.path_style == Some(true) { - UrlStyle::Path - } else { - UrlStyle::VirtualHost - }; - - let s3_bucket = s3_endpoint.path[0].clone(); - let s3_sub_folder = s3_endpoint.path[1..].join("/"); - let s3 = Bucket::new(s3_endpoint.endpoint.parse().unwrap(), url_style, s3_bucket, s3_endpoint.query.region.unwrap_or("".to_string())).unwrap(); - let credentials = Credentials::new(s3_endpoint.username.expect("Should have s3 accesskey"), s3_endpoint.password.expect("Should have s3 secretkey")); - (s3, credentials, s3_sub_folder) -} - -#[tokio::main] -async fn main() { - if std::env::var_os("RUST_LOG").is_none() { - std::env::set_var("RUST_LOG", "info"); - } - if std::env::var_os("RUST_BACKTRACE").is_none() { - std::env::set_var("RUST_BACKTRACE", "1"); - } - let args: Args = Args::parse(); - tracing_subscriber::registry().with(fmt::layer()).with(EnvFilter::from_default_env()).init(); - let (s3, credentials, s3_sub_folder) = convert_s3_uri(&args.uri); - - let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder); - let peers = room_reader.peers().await.unwrap(); - //we use channel to wait all sessions - let (tx, mut rx) = channel(1); - for peer in peers { - let peer_id = peer.peer(); - log::info!("got peer {peer_id}"); - let sessions = peer.sessions().await.unwrap(); - for mut session in sessions { - let peer_id = peer_id.clone(); - let session_id = session.id(); - log::info!("got session {session_id}"); - let tx = tx.clone(); - tokio::spawn(async move { - log::info!("start session {session_id} loop"); - let mut media = SessionMediaWriter::new(&format!("{}-{}-", peer_id, session_id)); - session.connect().await.expect("Should connect session record folder"); - while let Some(row) = session.recv().await { - log::debug!("push session {session_id} pkt {}", row.ts); - media.push(row); - } - - tx.send(session.path()).await.expect("Should send to main"); - log::info!("end session {session_id} loop"); - }); - } - } - drop(tx); - - while let Some(session) = rx.recv().await { - log::info!("done {session}"); - } -} diff --git a/packages/media_record/src/lib.rs b/packages/media_record/src/lib.rs index cb54aca8..2351d4e6 100644 --- a/packages/media_record/src/lib.rs +++ b/packages/media_record/src/lib.rs @@ -9,14 +9,14 @@ use storage::{memory::MemoryFile, FileId, RecordFile}; use tokio::sync::mpsc::Sender; use worker::UploadWorker; -#[cfg(feature="convert_webm")] +#[cfg(feature = "convert_record")] mod media; mod raw_record; mod session; mod storage; mod worker; -#[cfg(feature="convert_webm")] +#[cfg(feature = "convert_record")] pub use media::*; pub use raw_record::*; diff --git a/packages/media_record/src/media.rs b/packages/media_record/src/media.rs index 78efc8a3..7dbca098 100644 --- a/packages/media_record/src/media.rs +++ b/packages/media_record/src/media.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, fs::File}; use media_server_protocol::{ + endpoint::{TrackMeta, TrackName}, + media::MediaPacket, record::{SessionRecordEvent, SessionRecordRow}, transport::RemoteTrackId, }; @@ -9,20 +11,19 @@ use vpx_writer::VpxWriter; mod vpx_demuxer; mod vpx_writer; -struct TrackWriter { - writer: usize, +trait TrackWriter { + fn push_media(&mut self, pkt_ms: u64, pkt: MediaPacket); } -struct WriterContainer { - writer: VpxWriter, - audio_inuse: bool, - video_inuse: bool, +pub enum Event { + TrackStart(TrackName, u64, String), + TrackStop(TrackName, u64), } pub struct SessionMediaWriter { path: String, - writers: Vec, - tracks: HashMap, + tracks_meta: HashMap, + tracks_writer: HashMap>, } impl SessionMediaWriter { @@ -30,35 +31,23 @@ impl SessionMediaWriter { log::info!("new session media writer {path}"); Self { path: path.to_string(), - writers: vec![], - tracks: HashMap::new(), + tracks_meta: HashMap::new(), + tracks_writer: HashMap::new(), } } - fn get_free_writer_for(&mut self, ts: u64, is_audio: bool) -> usize { - for (index, writer) in self.writers.iter().enumerate() { - if (is_audio && !writer.audio_inuse) || (!is_audio && !writer.video_inuse) { - return index; - } - } - let index = self.writers.len(); - let path = format!("{}{}-{}.webm", self.path, index, ts); - let writer = VpxWriter::new(File::create(path).expect("Should open file"), ts); - self.writers.push(WriterContainer { - writer, - audio_inuse: false, - video_inuse: false, - }); - index - } - - pub fn push(&mut self, event: SessionRecordRow) { + pub fn push(&mut self, event: SessionRecordRow) -> Option { match event.event { SessionRecordEvent::TrackStarted(id, name, meta) => { log::info!("track {:?} started, name {name} meta {:?}", id, meta); + self.tracks_meta.insert(id, (name, meta)); + None } SessionRecordEvent::TrackStopped(id) => { log::info!("track {:?} stopped", id); + let (name, _) = self.tracks_meta.remove(&id)?; + self.tracks_writer.remove(&id)?; + Some(Event::TrackStop(name, event.ts)) } SessionRecordEvent::TrackMedia(id, media) => { // We allow clippy::map_entry because the suggestion provided by clippy has a bug: @@ -66,24 +55,41 @@ impl SessionMediaWriter { // There is a open Issue on the Rust Clippy GitHub Repo: // https://github.com/rust-lang/rust-clippy/issues/11976 #[allow(clippy::map_entry)] - if !self.tracks.contains_key(&id) { - let writer = self.get_free_writer_for(event.ts, media.meta.is_audio()); - if media.meta.is_audio() { - self.writers[writer].audio_inuse = true; + let out = if !self.tracks_writer.contains_key(&id) { + if let Some((name, _meta)) = self.tracks_meta.get(&id) { + let (file_path, writer): (String, Box) = match &media.meta { + media_server_protocol::media::MediaMeta::Opus { .. } => { + let file_path = format!("{}-opus-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + media_server_protocol::media::MediaMeta::H264 { .. } => todo!(), + media_server_protocol::media::MediaMeta::Vp8 { .. } => { + let file_path = format!("{}-vp8-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + media_server_protocol::media::MediaMeta::Vp9 { .. } => { + let file_path = format!("{}-vp9-{}-{}.webm", self.path, name.0, event.ts); + let writer = Box::new(VpxWriter::new(File::create(&file_path).unwrap(), event.ts)); + (file_path, writer) + } + }; + log::info!("create writer for track {name}"); + self.tracks_writer.insert(id, writer); + Some(Event::TrackStart(name.clone(), event.ts, file_path)) } else { - self.writers[writer].video_inuse = true; + log::warn!("missing track info for pkt form track {:?}", id); + return None; } - log::info!("write track {:?} to writer {writer}", id); - self.tracks.insert(id, TrackWriter { writer }); - } - let track = self.tracks.get_mut(&id).expect("Should have track here"); - if media.meta.is_audio() { - self.writers[track.writer].writer.push_opus(event.ts, media); } else { - self.writers[track.writer].writer.push_vpx(event.ts, media); - } + None + }; + let writer = self.tracks_writer.get_mut(&id).expect("Should have track here"); + writer.push_media(event.ts, media); + out } - _ => {} + _ => None, } } } diff --git a/packages/media_record/src/media/vpx_writer.rs b/packages/media_record/src/media/vpx_writer.rs index 89d1387a..3c5248ae 100644 --- a/packages/media_record/src/media/vpx_writer.rs +++ b/packages/media_record/src/media/vpx_writer.rs @@ -3,47 +3,74 @@ use std::io::{Seek, Write}; use media_server_protocol::media::MediaPacket; use webm::mux::{AudioCodecId, AudioTrack, Segment, Track, VideoCodecId, VideoTrack, Writer}; -use super::vpx_demuxer::VpxDemuxer; +use super::{vpx_demuxer::VpxDemuxer, TrackWriter}; pub struct VpxWriter { - webm: Segment>, + webm: Option>>, audio: Option, video: Option<(VideoTrack, VpxDemuxer)>, start_ts: u64, + last_ts: u64, } impl VpxWriter { pub fn new(writer: W, start_ts: u64) -> Self { - let mut webm = Segment::new(Writer::new(writer)).expect("Should create webm"); - //We must have audio before video - let audio = Some(webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); - Self { webm, audio, video: None, start_ts } - } - - pub fn push_opus(&mut self, pkt_ms: u64, pkt: MediaPacket) { - let delta_ts = pkt_ms - self.start_ts; - if self.audio.is_none() { - self.audio = Some(self.webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); + let webm = Segment::new(Writer::new(writer)).expect("Should create webm"); + Self { + webm: Some(webm), + audio: None, + video: None, + start_ts, + last_ts: start_ts, } - let audio = self.audio.as_mut().expect("Should have audio"); - audio.add_frame(&pkt.data, delta_ts * 1_000_000, true); } +} - pub fn push_vpx(&mut self, pkt_ms: u64, pkt: MediaPacket) { +impl TrackWriter for VpxWriter { + fn push_media(&mut self, pkt_ms: u64, pkt: MediaPacket) { let delta_ts = pkt_ms - self.start_ts; - if self.video.is_none() { - let codec = match &pkt.meta { - media_server_protocol::media::MediaMeta::Vp8 { .. } => VideoCodecId::VP8, - media_server_protocol::media::MediaMeta::Vp9 { .. } => VideoCodecId::VP9, - _ => panic!("Wrong codec, should be vp8 or vp9"), - }; - let demuxer = VpxDemuxer::new(); - self.video = Some((self.webm.add_video_track(0, 0, None, codec), demuxer)); + self.last_ts = pkt_ms; + if pkt.meta.is_audio() { + if self.audio.is_none() { + if let Some(webm) = &mut self.webm { + self.audio = Some(webm.add_audio_track(48000, 2, None, AudioCodecId::Opus)); + } else { + log::warn!("Webm instant destroyed"); + return; + } + } + let audio = self.audio.as_mut().expect("Should have audio"); + audio.add_frame(&pkt.data, delta_ts * 1_000_000, true); + } else { + if self.video.is_none() { + let codec = match &pkt.meta { + media_server_protocol::media::MediaMeta::Vp8 { .. } => VideoCodecId::VP8, + media_server_protocol::media::MediaMeta::Vp9 { .. } => VideoCodecId::VP9, + _ => panic!("Wrong codec, should be vp8 or vp9"), + }; + let demuxer = VpxDemuxer::new(); + if let Some(webm) = &mut self.webm { + self.video = Some((webm.add_video_track(100, 100, None, codec), demuxer)); + } else { + log::warn!("Webm instant destroyed"); + return; + } + } + + let (video, demuxer) = self.video.as_mut().expect("Should have video"); + if let Some((key, data)) = demuxer.push(pkt) { + video.add_frame(&data, delta_ts * 1_000_000, key); + } } + } +} - let (video, demuxer) = self.video.as_mut().expect("Should have video"); - if let Some((key, data)) = demuxer.push(pkt) { - video.add_frame(&data, delta_ts * 1_000_000, key); +impl Drop for VpxWriter { + fn drop(&mut self) { + if let Some(webm) = self.webm.take() { + if let Err(_e) = webm.try_finalize(Some(self.last_ts - self.start_ts)) { + log::error!("Close VpxWriter failed"); + } } } } diff --git a/packages/media_record/src/storage/mod.rs b/packages/media_record/src/storage/mod.rs index 87d91f44..30f5bc9f 100644 --- a/packages/media_record/src/storage/mod.rs +++ b/packages/media_record/src/storage/mod.rs @@ -92,7 +92,7 @@ impl Storage for HybridStorage { match file { HybridFile::Mem(file) => { if self.mem.can_push(file.len()) { - log::warn!("[HybridStorage] push {:?} to memory", file_id); + log::info!("[HybridStorage] push {:?} to memory", file_id); self.mem.push(file).await; } else if self.disk.can_push(file.len()) { log::warn!("[HybridStorage] memory storage full => fallback to disk with file {:?}", file_id);