Skip to content

Commit

Permalink
record: update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongcha98 committed Dec 2, 2024
1 parent 68a0d85 commit 47054a5
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 32 deletions.
85 changes: 84 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions libs/api/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ pub fn strategy() -> String {
pub fn record(stream: &str) -> String {
format!("/api/record/{}", stream)
}

pub fn record_file(stream: &str, dir: &str, file: &str) -> String {
format!("/api/record/{}/{}/{}", stream, dir, file)
}
3 changes: 2 additions & 1 deletion liveion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crate-type = ["lib"]
[dependencies]
api = { path = "../libs/api" }
auth = { path = "../libs/auth" }
http-log = { path = "../libs/http-log" }
# http-log = { path = "../libs/http-log" }
libwish = { path = "../libs/libwish" }

net4mqtt = { path = "../libs/net4mqtt", optional = true }
Expand Down Expand Up @@ -46,6 +46,7 @@ reqwest = { version = "0.12", features = [
rust-embed = { version = "8.4", features = ["axum-ex"], optional = true }
which = "7.0.0"
portpicker = "0.1.1"
opendal = { version = "0.50.2", features = ["services-fs"] }

[features]
webui = ["dep:rust-embed"]
Expand Down
63 changes: 38 additions & 25 deletions liveion/src/forward/internal.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::borrow::ToOwned;
use std::fs;
use std::io::Cursor;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;

use chrono::Utc;
use libwish::Client;
use opendal::{services, Operator};
use portpicker::pick_unused_port;
use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;
Expand Down Expand Up @@ -652,7 +652,7 @@ impl PeerForwardInternal {

// record
impl PeerForwardInternal {
pub(crate) async fn record(&self) -> Result<()> {
pub(crate) async fn record(&self, endpoint: String) -> Result<()> {
if which::which("ffmpeg").is_err() {
return Err(AppError::throw("not support record"));
}
Expand Down Expand Up @@ -680,38 +680,49 @@ impl PeerForwardInternal {
.map(|track| (audio_port, track.subscribe()));
let (sdp, codecs) = Self::get_sdp_and_codecs(publish.peer.clone()).await?;
let sdp = Self::build_sdp(&sdp, codecs, video_port, audio_port)?.marshal();
let dir = format!("{}/{}", self.stream, Utc::now().timestamp_millis());
std::fs::create_dir_all(dir.clone())?;
let output = format!("{}/record.mpd", dir);
let dir = format!("{}/{}/", self.stream, Utc::now().timestamp_millis());
let output = format!("{}/{}record.mpd", endpoint, dir);
let has_video = video.is_some();
let mut builder = services::Fs::default();
builder = builder.root(".");
let op = Operator::new(builder)?.finish();
op.create_dir(&dir).await?;
tokio::spawn(Self::do_record(sdp, video, audio, output.clone()));
// Keyframe
if has_video {
let peer_weak = Arc::downgrade(&publish.peer);
tokio::spawn(async move {
let check_path = format!("{}/init-stream0.m4s", dir);
while !fs::exists(check_path.clone()).unwrap_or(false) {
match peer_weak.upgrade() {
Some(pc) => {
let _ = pc
.write_rtcp(&[RtcpMessage::PictureLossIndication
.to_rtcp_packet(video_ssrc.unwrap())])
.await;
}
None => {
break;
}
}
sleep(Duration::from_millis(500)).await;
}
});
let mut builder = services::Fs::default();
builder = builder.root(&dir);
let op = Operator::new(builder)?.finish();
let pc = Arc::downgrade(&publish.peer);
tokio::spawn(Self::record_pli(op, pc, video_ssrc.unwrap()));
}
Ok(())
}
None => Err(AppError::throw("publish is none")),
}
}

// Keyframe
async fn record_pli(op: Operator, pc: Weak<RTCPeerConnection>, ssrc: u32) {
while let Ok(list) = op.list("").await {
let list = list.iter().filter(|e| e.path() != "/").count();
if list > 0 {
break;
}
match pc.upgrade() {
Some(pc) => {
let _ = pc
.write_rtcp(&[RtcpMessage::PictureLossIndication.to_rtcp_packet(ssrc)])
.await;
}
None => {
break;
}
}
sleep(Duration::from_millis(500)).await;
}
}

async fn do_record(
sdp: String,
video: Option<(u16, Receiver<Arc<webrtc::rtp::packet::Packet>>)>,
Expand All @@ -721,13 +732,15 @@ impl PeerForwardInternal {
let mut args = vec![
"ffmpeg",
"-protocol_whitelist",
"rtp,file,udp,pipe,http",
"rtp,udp,file,pipe,http",
"-f",
"sdp",
"-i",
"pipe:0",
"-f",
"dash",
"-chunked_post",
"0",
&output,
];
info!("record cli : {:?}", args);
Expand Down
4 changes: 2 additions & 2 deletions liveion/src/forward/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ impl PeerForward {
}
}

pub async fn record(&self) -> Result<()> {
self.internal.record().await
pub async fn record(&self, endpoint: String) -> Result<()> {
self.internal.record(endpoint).await
}
}

Expand Down
2 changes: 2 additions & 0 deletions liveion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{future::Future, sync::Arc};

use axum::{extract::Request, middleware, response::IntoResponse, routing::get, Router};
use http::{StatusCode, Uri};
use route::record;
use tokio::net::TcpListener;
use tower_http::{
cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer,
Expand Down Expand Up @@ -55,6 +56,7 @@ where
.layer(middleware::from_fn(access_middleware))
.layer(auth_layer),
)
.merge(record::route())
.route(api::path::METRICS, get(metrics))
.with_state(app_state.clone())
.layer(if cfg.http.cors {
Expand Down
3 changes: 2 additions & 1 deletion liveion/src/route/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async fn record(
Path(stream): Path<String>,
Json(_body): Json<api::request::Record>,
) -> Result<String> {
state.stream_manager.record(stream).await?;
let endpoint = format!("http://{}/api/record", state.config.http.listen);
state.stream_manager.record(stream, endpoint).await?;
Ok("".to_string())
}
1 change: 1 addition & 0 deletions liveion/src/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::config::{Config, IceServer};
use crate::stream::manager::Manager;

pub mod admin;
pub mod record;
pub mod session;
pub mod strategy;
pub mod stream;
Expand Down
30 changes: 30 additions & 0 deletions liveion/src/route/record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::result::Result;
use crate::AppState;
use axum::extract::{Path, Request, State};
use axum::routing::post;
use axum::Router;
use http::StatusCode;
use opendal::{services, Operator};
use tokio_stream::StreamExt;
pub fn route() -> Router<AppState> {
Router::new().route(
&api::path::record_file(":stream", ":dir", ":file"),
post(record_file),
)
}

async fn record_file(
State(_state): State<AppState>,
Path((stream, dir, file)): Path<(String, String, String)>,
req: Request,
) -> Result<StatusCode> {
let mut data = req.into_body().into_data_stream();
let mut builder = services::Fs::default();
builder = builder.root(&format!("./{}/{}", stream, dir));
let op = Operator::new(builder)?.finish();
let mut writer = op.writer(&file).await?;
while let Some(Ok(data)) = data.next().await {
writer.write(data).await?;
}
Ok(StatusCode::OK)
}
4 changes: 2 additions & 2 deletions liveion/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,12 @@ impl Manager {
Ok(recv)
}

pub async fn record(&self, stream: String) -> Result<()> {
pub async fn record(&self, stream: String, endpoint: String) -> Result<()> {
let streams = self.stream_map.read().await;
let forward = streams.get(&stream).cloned();
drop(streams);
if let Some(forward) = forward {
forward.record().await
forward.record(endpoint).await
} else {
Err(AppError::stream_not_found("stream not exists"))
}
Expand Down

0 comments on commit 47054a5

Please sign in to comment.