Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into webrtc2rtmp
Browse files Browse the repository at this point in the history
# Conflicts:
#	Cargo.lock
  • Loading branch information
harlanc committed Jan 31, 2024
2 parents ae363d1 + bc21472 commit 40c7681
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 84 deletions.
251 changes: 214 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion confs/local/hls.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.7.4" }
tokio-util = { version = "0.6.5", features = ["codec"] }

streamhub = { path = "../../library/streamhub/" }
Expand Down
2 changes: 1 addition & 1 deletion confs/local/httpflv.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.7.4" }
futures = "0.3"

streamhub = { path = "../../library/streamhub/" }
Expand Down
2 changes: 1 addition & 1 deletion confs/online/hls.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.7.4" }
tokio-util = { version = "0.6.5", features = ["codec"] }

streamhub = "0.1.2"
Expand Down
2 changes: 1 addition & 1 deletion confs/online/httpflv.Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.7.4" }
futures = "0.3"

streamhub = "0.1.2"
Expand Down
2 changes: 1 addition & 1 deletion protocol/hls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ keywords = ["hls", "video", "streaming"]
edition = "2018"

[dependencies]
axum = { version = "0.7.4" }
byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
tokio-util = { version = "0.6.5", features = ["codec"] }

streamhub = { path = "../../library/streamhub/" }
Expand Down
29 changes: 14 additions & 15 deletions protocol/hls/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use {
hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
axum::{
body::Body, extract::Request, handler::HandlerWithoutStateExt, http::StatusCode,
response::Response,
},
tokio::fs::File,
std::net::SocketAddr,
tokio::{fs::File, net::TcpListener},
tokio_util::codec::{BytesCodec, FramedRead},
};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, GenericError>;
static NOTFOUND: &[u8] = b"Not Found";

async fn handle_connection(req: Request<Body>) -> Result<Response<Body>> {
async fn handle_connection(req: Request<Body>) -> Response<Body> {
let path = req.uri().path();

let mut file_path: String = String::from("");
Expand Down Expand Up @@ -45,7 +46,6 @@ async fn handle_connection(req: Request<Body>) -> Result<Response<Body>> {
file_path = format!("./{app_name}/{stream_name}/{ts_name}.ts");
}
}

simple_file_send(file_path.as_str()).await
}

Expand All @@ -57,28 +57,27 @@ fn not_found() -> Response<Body> {
.unwrap()
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
async fn simple_file_send(filename: &str) -> Response<Body> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
return Ok(Response::new(body));
let body = Body::from_stream(stream);
return Response::new(body);
}

Ok(not_found())
not_found()
}

pub async fn run(port: usize) -> Result<()> {
let listen_address = format!("0.0.0.0:{port}");
let sock_addr = listen_address.parse().unwrap();
let sock_addr: SocketAddr = listen_address.parse().unwrap();

let new_service =
make_service_fn(move |_| async { Ok::<_, GenericError>(service_fn(handle_connection)) });
let listener = TcpListener::bind(sock_addr).await?;

let server = Server::bind(&sock_addr).serve(new_service);
log::info!("Hls server listening on http://{}", sock_addr);
server.await?;

axum::serve(listener, handle_connection.into_service()).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion protocol/httpflv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1.4.2"
bytes = "1.0.0"
failure = "0.1.1"
log = "0.4"
hyper = { version = "0.14", features = ["full"] }
axum = { version = "0.7.4" }
futures = "0.3"

streamhub = { path = "../../library/streamhub/" }
Expand Down
1 change: 0 additions & 1 deletion protocol/httpflv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
extern crate hyper;
extern crate rtmp;

pub mod define;
Expand Down
49 changes: 24 additions & 25 deletions protocol/httpflv/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use {
super::httpflv::HttpFlv,
futures::channel::mpsc::unbounded,
hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
axum::{
body::Body,
extract::{ConnectInfo, Request, State},
handler::Handler,
http::StatusCode,
response::Response,
},
futures::channel::mpsc::unbounded,
std::net::SocketAddr,
streamhub::define::StreamHubEventSender,
tokio::net::TcpListener,
};

type GenericError = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, GenericError>;
static NOTFOUND: &[u8] = b"Not Found";

async fn handle_connection(
State(event_producer): State<StreamHubEventSender>, // event_producer: ChannelEventProducer
ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
req: Request<Body>,
event_producer: StreamHubEventSender, // event_producer: ChannelEventProducer
remote_addr: SocketAddr,
) -> Result<Response<Body>> {
) -> Response<Body> {
let path = req.uri().path();

match path.find(".flv") {
Expand Down Expand Up @@ -46,39 +49,35 @@ async fn handle_connection(
}
});

let mut resp = Response::new(Body::wrap_stream(http_response_data_consumer));
let mut resp = Response::new(Body::from_stream(http_response_data_consumer));
resp.headers_mut()
.insert("Access-Control-Allow-Origin", "*".parse().unwrap());

Ok(resp)
resp
}

_ => Ok(Response::builder()
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(NOTFOUND.into())
.unwrap()),
.unwrap(),
}
}

pub async fn run(event_producer: StreamHubEventSender, port: usize) -> Result<()> {
let listen_address = format!("0.0.0.0:{port}");
let sock_addr = listen_address.parse().unwrap();

let new_service = make_service_fn(move |socket: &AddrStream| {
let remote_addr = socket.remote_addr();
let flv_copy = event_producer.clone();
async move {
Ok::<_, GenericError>(service_fn(move |req| {
handle_connection(req, flv_copy.clone(), remote_addr)
}))
}
});
let sock_addr: SocketAddr = listen_address.parse().unwrap();

let server = Server::bind(&sock_addr).serve(new_service);
let listener = TcpListener::bind(sock_addr).await?;

log::info!("Httpflv server listening on http://{}", sock_addr);

server.await?;
let handle_connection = handle_connection.with_state(event_producer.clone());

axum::serve(
listener,
handle_connection.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;

Ok(())
}

0 comments on commit 40c7681

Please sign in to comment.