Skip to content

Commit

Permalink
BREAKING CHANGE: switching to sans-io-runtime (#257)
Browse files Browse the repository at this point in the history
This PR switched to use sans-io architecture with sans-io-runtime instead of async-runtime. The main reason is for performance and simplicity in data flow, everything will be sync now, make it easier for predicting time consume
  • Loading branch information
giangndm authored Apr 23, 2024
1 parent a1453a0 commit 7911144
Show file tree
Hide file tree
Showing 287 changed files with 6,049 additions and 28,398 deletions.
5,541 changes: 1,441 additions & 4,100 deletions Cargo.lock

Large diffs are not rendered by default.

35 changes: 14 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
[workspace]
members = [
"packages/cluster",
"packages/endpoint",
"packages/transport",
"packages/audio-mixer",
"packages/media-utils",
"bin",
"packages/protocol",
"transports/webrtc",
"transports/rtmp",
"transports/sip",
"servers/media-server"
"packages/media_utils",
"packages/media_core",
"packages/media_runner",
"packages/transport_webrtc",
]

[workspace.dependencies]
futures = "0.3"
async-trait = "0.1"
async-std = { version = "1.12", features = ["tokio1", "unstable", "attributes"] }
async-notify = "0.3"
parking_lot = "0.12"
log = { version = "0.4" }
env_logger = { version = "0.11" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
prost = "0.12"
poem-openapi = { version = "4.0" }
derive_more = { version = "0.99" }
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "9d16b9c224c2d1cd3015c9a2e60f51914c9bd880" }
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "3c2945754926bd014c36e8ec20c228051a45603f" }
convert-enum = "0.1"
num_enum = "0.7"
log = "0.4"
smallmap = "1.4"
derivative = "2.2"
derive_more = "0.99"
rand = "0.8"
59 changes: 27 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,9 @@ For a deep dive into the technical aspects of network architecture, please refer

(Above is a demo video of the version used by Bluesea Network)

## Project Status: Alpha
## Project Status: Refactoring

This project is currently in the alpha stage of development. It is actively being developed and may undergo significant changes, including changes to the API. The primary focus at this stage is to make the project stable and reliable.

Our goals for this project can be summarized as follows:

- **Goal 1**: Cluster: Create a global decentralized media server cluster with multiple zones, support both SDKs for flexible and mainstream protocols (RTMP, SIP, SRT) for compatibility.
- **Goal 2**: Market: Develop a sharing marketplace for the media server, enabling resource sharing and monetization. This will help scale the media server cluster during peak times and reduce costs during off-peak times. In the feature marketplace fees will be used to fund the development of the project.
- **Goal 3**: P2P Network: Establish a network between users, where servers only act as fallbacks. This approach aims to significantly reduce infrastructure costs and scale to infinity.

Our primary focus is currently on Goal 1. For details on the current phase plan, please refer to our [Roadmap](https://github.com/8xFF/atm0s-media-server/issues/69).

Goals 2 and 3 are in the research phase. For more information, you can visit this [Repository](https://github.com/giangndm/decentralized-p2p-streaming).
We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).

## Features

Expand All @@ -57,27 +47,32 @@ Goals 2 and 3 are in the research phase. For more information, you can visit thi
- Support encodings: H264, Vp8, Vp9, H265 (Coming soon), AV1 (Coming soon)
- Cross-platform: Linux, macOS, Windows.
- Decentralized WebRTC SFU (Selective Forwarding Unit)
- Modern, full-featured client SDKs
- [x] [Vanilla JavaScript](https://github.com/8xFF/atm0s-media-sdk-js)
- [x] [Rust](WIP)
- [x] [React](https://github.com/8xFF/atm0s-media-sdk-react)
- [x] [React Native](WIP)
- [ ] Flutter
- [ ] iOS Native
- [ ] Android Native
- Easy to deploy: single binary, Docker, or Kubernetes
- Advanced features including:
- [x] Audio Mix-Minus (WIP)
- [x] Simulcast/SVC
- [x] SFU
- [x] SFU Cascading (each stream is a global PubSub channel, similar to [Cloudflare interconnected network](https://blog.cloudflare.com/announcing-cloudflare-calls/))
- [ ] Recording
- [x] RTMP
- [x] SIP (WIP)
- [x] WebRTC
- [x] Whip/Whep

## Quick Start
- Easy to scale: global pubsub network, similar to [Cloudflare interconnected network](https://blog.cloudflare.com/announcing-cloudflare-calls/))

| Feature | Description | Status |
| ------------------- | ------------------------------------------------------------------------------------ | ------ |
| Multi-zones Cluster | Implement with global pubsub network [RFC-0003](https://github.com/8xFF/rfcs/pull/3) | 🚧 |
| Whip | Whip Protocol | 🚧 |
| Whep | Whep Protocol | 🚧 |
| WebRTC-SDK | Webrtc-SDK Protocol [RFC-0005](https://github.com/8xFF/rfcs/pull/5) ||
| RTMP | RTMP Protocol ||
| RTMP-Transcode | RTMP with Transcode ||
| SIP | SIP calls ||
| MoQ | Media-over-Quic ||
| Monitoring | Dashboard for monitoring ||
| Recording | Record stream ||
| Gateway | External gateway [RFC-0003](https://github.com/8xFF/rfcs/pull/3) ||
| Connector | External event handling ||

Status:

- ❌: Not started
- 🚧: In progress
- 🚀: In review/testing
- ✅: Completed

## Quick Start (not ready yet)

### Prebuild or build from source

Expand Down
19 changes: 19 additions & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "bin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
clap = { version = "4.5", features = ["env", "derive"] }
log = { workspace = true }
rand = { workspace = true }
poem = { version = "3.0", features = ["static-files"] }
poem-openapi = { version = "5.0", features = ["swagger-ui"] }
tokio = { version = "1.37", features = ["full"] }
sans-io-runtime = { workspace = true }
atm0s-sdn = { workspace = true }
media-server-protocol = { path = "../packages/protocol" }
media-server-runner = { path = "../packages/media_runner" }
10 changes: 10 additions & 0 deletions bin/public/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<html>
<body>
<div>
<a href="whip/index.html">Whip broadcast</a>
</div>
<div>
<a href="whep/index.html">Whep viewer</a>
</div>
</body>
</html>
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ window.start = async () => {
pc.addTransceiver("audio", { direction: 'recvonly' });
pc.addTransceiver("video", { direction: 'recvonly' });

pc.ontrack = (event) =>
{
console.log(event)
if (event.track.kind == "video")
document.querySelector("video").srcObject = event.streams[0];
let stream = new MediaStream();
document.querySelector("video").srcObject = stream;
pc.ontrack = (event) => {
stream.addTrack(event.track);
}

//Create whep client
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ window.start = async () => {
pc.addTransceiver(track, {
direction: "sendonly",
streams: [stream],
sendEncodings: [
{ rid: "0", active: true, scaleResolutionDownBy: 2},
{ rid: "1", active: true, scaleResolutionDownBy: 2},
{ rid: "2", active: true },
],
// sendEncodings: [
// { rid: "0", active: true, scaleResolutionDownBy: 2},
// { rid: "1", active: true, scaleResolutionDownBy: 2},
// { rid: "2", active: true },
// ],
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

//import { EventEmitter } from "events";

export class WHIPClient
Expand Down
70 changes: 70 additions & 0 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::net::SocketAddr;

use media_server_protocol::endpoint::ClusterConnId;
use media_server_protocol::transport::{RpcReq, RpcRes};
use poem::endpoint::StaticFilesEndpoint;
use poem::{listener::TcpListener, middleware::Cors, EndpointExt, Route, Server};
use poem_openapi::types::{ToJSON, Type};
use poem_openapi::OpenApiService;
use poem_openapi::{types::ParseFromJSON, Object};
use tokio::sync::mpsc::Sender;

#[derive(Debug, Default, Object)]
pub struct Response<T: ParseFromJSON + ToJSON + Type + Send + Sync> {
pub status: bool,
#[oai(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[oai(skip_serializing_if = "Option::is_none")]
pub data: Option<T>,
}

pub struct Rpc<Req, Res> {
pub req: Req,
pub answer_tx: tokio::sync::oneshot::Sender<Res>,
}

impl<Req, Res> Rpc<Req, Res> {
pub fn new(req: Req) -> (Self, tokio::sync::oneshot::Receiver<Res>) {
let (answer_tx, answer_rx) = tokio::sync::oneshot::channel();
(Self { req, answer_tx }, answer_rx)
}

pub fn res(self, res: Res) {
let _ = self.answer_tx.send(res);
}
}

mod api_connector;
mod api_media;
mod utils;

pub async fn run_gateway_http_server(sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Result<(), Box<dyn std::error::Error>> {
let api_service: OpenApiService<_, ()> = OpenApiService::new(api_media::MediaApis, "Media Gateway APIs", env!("CARGO_PKG_VERSION")).server("/");
let ui = api_service.swagger_ui();
let spec = api_service.spec();
let route = Route::new()
.nest("/", api_service)
.nest("/ui", ui)
.at("/spec", poem::endpoint::make_sync(move |_| spec.clone()))
.with(Cors::new())
.data(api_media::MediaServerCtx { sender });

Server::new(TcpListener::bind("0.0.0.0:3000")).run(route).await?;
Ok(())
}

pub async fn run_media_http_server(port: u16, sender: Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>) -> Result<(), Box<dyn std::error::Error>> {
let api_service: OpenApiService<_, ()> = OpenApiService::new(api_media::MediaApis, "Media Server APIs", env!("CARGO_PKG_VERSION")).server("/");
let ui = api_service.swagger_ui();
let spec = api_service.spec();
let route = Route::new()
.nest("/", api_service)
.nest("/samples", StaticFilesEndpoint::new("./public").index_file("index.html"))
.nest("/ui", ui)
.at("/spec", poem::endpoint::make_sync(move |_| spec.clone()))
.with(Cors::new())
.data(api_media::MediaServerCtx { sender });

Server::new(TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), port))).run(route).await?;
Ok(())
}
1 change: 1 addition & 0 deletions bin/src/http/api_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading

0 comments on commit 7911144

Please sign in to comment.