Skip to content

Commit

Permalink
support rtsp client
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc committed Aug 9, 2024
1 parent 85c7a87 commit a61950c
Show file tree
Hide file tree
Showing 30 changed files with 2,376 additions and 1,079 deletions.
134 changes: 126 additions & 8 deletions application/xiu/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
serde_json::Value,
std::sync::Arc,
streamhub::{
define::{self, StreamHubEventSender},
define::{self, RelayType, StreamHubEventSender},
stream::StreamIdentifier,
utils::Uuid,
},
Expand All @@ -25,7 +25,7 @@ struct ApiResponse<T> {

// the input to our `KickOffClient` handler
#[derive(Deserialize)]
struct KickOffClient {
struct KickOffClientParams {
uuid: String,
}

Expand All @@ -36,12 +36,21 @@ struct QueryWholeStreamsParams {
}

#[derive(Deserialize)]
struct QueryStream {
struct QueryStreamParams {
identifier: StreamIdentifier,
// if specify uuid, then query the stream by uuid and filter no used data.
uuid: Option<String>,
}

#[derive(Deserialize)]
struct RelayStreamParams {
//guaranteed by the user to be unique
id: String,
identifier: Option<StreamIdentifier>,
server_address: Option<String>,
relay_type: RelayType,
}

#[derive(Clone)]
struct ApiService {
channel_event_producer: StreamHubEventSender,
Expand Down Expand Up @@ -93,7 +102,7 @@ impl ApiService {
}
}

async fn query_stream(&self, stream: QueryStream) -> Json<ApiResponse<Value>> {
async fn query_stream(&self, stream: QueryStreamParams) -> Json<ApiResponse<Value>> {
let uuid = if let Some(uid) = stream.uuid {
Uuid::from_str2(&uid)
} else {
Expand Down Expand Up @@ -132,7 +141,7 @@ impl ApiService {
}
}

async fn kick_off_client(&self, id: KickOffClient) -> Result<String> {
async fn kick_off_client(&self, id: KickOffClientParams) -> Result<String> {
let id_result = Uuid::from_str2(&id.uuid);

if let Some(id) = id_result {
Expand All @@ -145,6 +154,103 @@ impl ApiService {

Ok(String::from("ok"))
}

async fn start_relay_stream(&self, relay_info: RelayStreamParams) -> Json<ApiResponse<Value>> {
if relay_info.identifier.is_none() || relay_info.server_address.is_none() {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("identifier or server_address is none"),
data: Value::Null,
};
return Json(api_response);
}

let (result_sender, result_receiver) = oneshot::channel();

let hub_event = define::StreamHubEvent::ApiStartRelayStream {
id: relay_info.id,
identifier: relay_info.identifier.unwrap(),
server_address: relay_info.server_address.unwrap(),
relay_type: relay_info.relay_type,
result_sender,
};

if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api relay_stream event error: {}", err);
}

match result_receiver.await {
Ok(val) => match val {
Ok(()) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: Value::Null,
};
Json(api_response)
}
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
},
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}

async fn stop_relay_stream(&self, relay_info: RelayStreamParams) -> Json<ApiResponse<Value>> {
let (result_sender, result_receiver) = oneshot::channel();

let hub_event = define::StreamHubEvent::ApiStopRelayStream {
id: relay_info.id,
relay_type: relay_info.relay_type,
result_sender,
};

if let Err(err) = self.channel_event_producer.send(hub_event) {
log::error!("send api relay_stream event error: {}", err);
}

match result_receiver.await {
Ok(val) => match val {
Ok(()) => {
let api_response = ApiResponse {
error_code: 0,
desp: String::from("succ"),
data: Value::Null,
};
Json(api_response)
}
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
},
Err(err) => {
let api_response = ApiResponse {
error_code: -1,
desp: String::from("failed"),
data: serde_json::json!(err.to_string()),
};
Json(api_response)
}
}
}
}

pub async fn run(producer: StreamHubEventSender, port: usize) {
Expand All @@ -161,23 +267,35 @@ pub async fn run(producer: StreamHubEventSender, port: usize) {
};

let api_query_stream = api.clone();
let query_stream = move |Json(stream): Json<QueryStream>| async move {
let query_stream = move |Json(stream): Json<QueryStreamParams>| async move {
api_query_stream.query_stream(stream).await
};

let api_kick_off = api.clone();
let kick_off = move |Json(id): Json<KickOffClient>| async move {
let kick_off = move |Json(id): Json<KickOffClientParams>| async move {
match api_kick_off.kick_off_client(id).await {
Ok(response) => response,
Err(_) => "error".to_owned(),
}
};

let api_start_relay_stream = api.clone();
let start_relay_stream = move |Json(params): Json<RelayStreamParams>| async move {
api_start_relay_stream.start_relay_stream(params).await
};

let api_stop_relay_stream = api.clone();
let stop_relay_stream = move |Json(params): Json<RelayStreamParams>| async move {
api_stop_relay_stream.stop_relay_stream(params).await
};

let app = Router::new()
.route("/", get(root))
.route("/api/query_whole_streams", get(query_streams))
.route("/api/query_stream", post(query_stream))
.route("/api/kick_off_client", post(kick_off));
.route("/api/kick_off_client", post(kick_off))
.route("/api/start_relay_stream", post(start_relay_stream))
.route("/api/stop_relay_stream", post(stop_relay_stream));

log::info!("Http api server listening on http://0.0.0.0:{}", port);
axum::Server::bind(&([0, 0, 0, 0], port as u16).into())
Expand Down
6 changes: 4 additions & 2 deletions application/xiu/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl Config {
if rtsp_port > 0 {
rtsp_config = Some(RtspConfig {
enabled: true,
relay_enabled: false,
port: rtsp_port,
auth: None,
});
Expand Down Expand Up @@ -123,6 +124,7 @@ pub struct RtspConfig {
pub enabled: bool,
pub port: usize,
pub auth: Option<AuthConfig>,
pub relay_enabled: bool,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -187,7 +189,7 @@ pub struct HttpNotifierConfig {
pub struct AuthSecretConfig {
pub key: String,
pub password: String,
pub push_password: Option<String>
pub push_password: Option<String>,
}

#[derive(Debug, Deserialize, Clone, Default)]
Expand All @@ -211,7 +213,7 @@ fn test_toml_parse() {
Err(err) => println!("{}", err),
}

let str = fs::read_to_string("./src/config/config.toml");
let str = fs::read_to_string("./src/config/examples/config1.toml");

match str {
Ok(val) => {
Expand Down
18 changes: 16 additions & 2 deletions application/xiu/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::config::{AuthConfig, AuthSecretConfig};
use commonlib::auth::AuthType;
use rtmp::remuxer::RtmpRemuxer;
use std::sync::Arc;
use crate::config::{AuthConfig, AuthSecretConfig};
use xrtsp::relay::pull_client_manager::RtspPullClientManager;

use {
super::api,
Expand All @@ -16,7 +17,7 @@ use {
relay::{pull_client::PullClient, push_client::PushClient},
rtmp::RtmpServer,
},
streamhub::{notify::Notifier, notify::http::HttpNotifier, StreamsHub},
streamhub::{notify::http::HttpNotifier, notify::Notifier, StreamsHub},
tokio,
xrtsp::rtsp::RtspServer,
xwebrtc::webrtc::WebRTCServer,
Expand Down Expand Up @@ -255,6 +256,19 @@ impl Service {
log::error!("rtsp server error: {}", err);
}
});

if rtsp_cfg_value.relay_enabled {
let mut rtsp_relay_manager = RtspPullClientManager::new(
stream_hub.get_client_event_consumer(),
stream_hub.get_hub_event_sender(),
);

tokio::spawn(async move {
if let Err(err) = rtsp_relay_manager.run().await {
log::error!("rtsp relay manager error: {}", err);
}
});
}
}

Ok(())
Expand Down
Loading

0 comments on commit a61950c

Please sign in to comment.