Skip to content

Commit

Permalink
fix record compose
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Nov 19, 2024
1 parent 4363429 commit 5080279
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 25 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions packages/media_connector/src/sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ impl ConnectorSqlStorage {
log::info!("[ConnectorSqlStorage] use path style");
s3.use_path_style();
}

let signed_url = s3.put("aaa.mp4", 10000).unwrap();
log::info!("[ConnectorSqlStorage] signed_url: {:?}", signed_url);
let res = reqwest::Client::new().put(signed_url).body(vec![1; 10000]).send().await.unwrap();
assert_eq!(res.status().as_u16(), 200);

let s3_sub_folder = s3_endpoint.path[1..].join("/");

Self {
Expand Down
71 changes: 58 additions & 13 deletions packages/media_record/bin/convert_record_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use poem_openapi::{
use rusty_s3::S3Action;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

macro_rules! try_opt {
macro_rules! try_validate_app {
($self:ident, $token:ident, $err:expr) => {
match $self.apps.validate_app(&$token.token) {
Some(app) => app,
Expand All @@ -42,6 +42,21 @@ macro_rules! try_opt {
};
}

macro_rules! try_opt {
($opt:expr, $err:expr) => {
match $opt {
Some(o) => o,
None => {
return Json(Response {
status: false,
error: Some($err.to_owned()),
data: None,
});
}
}
};
}

#[derive(SecurityScheme)]
#[oai(rename = "Token Authorization", ty = "bearer", key_in = "header", key_name = "Authorization")]
pub struct TokenAuthorization(pub Bearer);
Expand Down Expand Up @@ -187,14 +202,29 @@ struct HttpApis {
impl HttpApis {
#[oai(path = "/convert/job", method = "post")]
async fn create_job(&self, TokenAuthorization(token): TokenAuthorization, Json(body): Json<ConvertJobRequest>) -> Json<Response<ConvertJobResponse>> {
let app = try_opt!(self, token, "Invalid token");
let app = try_validate_app!(self, token, "Invalid token");
let job_id = rand::random::<u64>().to_string();
let input_s3 = format!("{}/{}/?path_style=true", self.input_s3_uri, body.record_path);
let transmux_s3_uri = self.transmux_s3_uri.clone();
let input_s3 = try_opt!(concat_s3_uri_path(&self.input_s3_uri, &body.record_path), "Invalid input_s3_uri");
let compose_s3_uri = self.compose_s3_uri.clone();
let job_id_c = job_id.clone();
let hook = self.hook.clone();

// get yyyy/mm/dd with chrono
let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string();
let transmux = if let Some(t) = body.transmux {
if let Some(custom_s3) = t.custom_s3 {
Some(RecordConvertOutputLocation::S3(custom_s3))
} else {
let s3 = try_opt!(
concat_s3_uri_path(&self.transmux_s3_uri, &format!("{}/transmux/{current_date_path}/{job_id_c}", app.app)),
"Invalid transmux_s3_uri"
);
Some(RecordConvertOutputLocation::S3(s3))
}
} else {
None
};

tokio::spawn(async move {
log::info!("Convert job {job_id_c} started");
hook.on_event(
Expand All @@ -210,16 +240,9 @@ impl HttpApis {
},
);

// get yyyy/mm/dd with chrono
let current_date_path = chrono::Utc::now().format("%Y/%m/%d").to_string();
let converter = RecordConvert::new(RecordConvertConfig {
in_s3: input_s3,
transmux: body.transmux.map(|t| {
let uri = t
.custom_s3
.unwrap_or_else(|| format!("{transmux_s3_uri}/{}/transmux/{current_date_path}/{job_id_c}?path_style=true", app.app));
RecordConvertOutputLocation::S3(uri)
}),
transmux,
compose: body.compose.map(|c| {
let (uri, relative) = c
.custom_s3
Expand All @@ -228,7 +251,6 @@ impl HttpApis {
(u, relative)
})
.unwrap_or_else(|| {
let compose_s3_uri = format!("{compose_s3_uri}?path_style=true");
let (s3, credentials, s3_sub_folder) = convert_s3_uri(&compose_s3_uri).expect("should convert compose_s3_uri");
let relative = format!("{}/compose/{current_date_path}/{job_id_c}.webm", app.app);
let path = PathBuf::from(s3_sub_folder).join(&relative);
Expand Down Expand Up @@ -281,3 +303,26 @@ impl HttpApis {
})
}
}

fn concat_s3_uri_path(s3_uri: &str, path: &str) -> Option<String> {
fn ensure_last_slash(s: String) -> String {
if s.ends_with('/') {
s
} else {
s + "/"
}
}

let parts = s3_uri.split('?').collect::<Vec<_>>();
if parts.len() == 2 {
let first = PathBuf::from(parts[0]).join(path).to_str()?.to_string();
let first = ensure_last_slash(first);
log::info!("first: {}", first);
Some(first + "?" + parts[1])
} else {
let first = PathBuf::from(s3_uri).join(path).to_str()?.to_string();
let first = ensure_last_slash(first);
log::info!("first: {}", first);
Some(first)
}
}
4 changes: 0 additions & 4 deletions packages/media_record/bin/run_worker.sh

This file was deleted.

2 changes: 2 additions & 0 deletions packages/media_record/src/convert/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ impl RecordComposer {

let room_reader = RoomReader::new(s3, credentials, &s3_sub_folder);
let peers = room_reader.peers().await.map_err(|e| e.to_string())?;
log::info!("check room peers {:?}", peers.iter().map(|p| p.peer()).collect::<Vec<_>>());
//we use channel to wait all sessions
for peer in peers {
let sessions = peer.sessions().await.map_err(|e| e.to_string())?;
log::info!("check peer {} sessions {:?}", peer.peer(), sessions.iter().map(|s| s.id()).collect::<Vec<_>>());
for mut session in sessions {
session.connect().await.map_err(|e| e.to_string())?;
let id = session.id();
Expand Down
1 change: 1 addition & 0 deletions packages/media_utils/src/uri.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::de::DeserializeOwned;
use serde_querystring::DuplicateQS;

#[derive(Debug, Clone)]
pub struct CustomUri<Q> {
pub username: Option<String>,
pub password: Option<String>,
Expand Down

0 comments on commit 5080279

Please sign in to comment.