Skip to content

Commit

Permalink
Merge branch 'master' of github.com:8xFF/atm0s-media-server into fix-…
Browse files Browse the repository at this point in the history
…crash-assert-on-destroy
  • Loading branch information
giangndm committed Nov 9, 2024
2 parents e06282e + c563f2d commit 5c1fd62
Show file tree
Hide file tree
Showing 30 changed files with 1,807 additions and 260 deletions.
40 changes: 34 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ jobs:
extension: ""
cross: false
build_record_tool: true
build_record_worker: true
- build: linux musl x64
os: ubuntu-latest
rust: stable
Expand All @@ -103,6 +104,8 @@ jobs:
target: aarch64-unknown-linux-gnu
extension: ""
cross: true
build_record_tool: true
build_record_worker: true
- build: linux musl aarch64
os: ubuntu-latest
rust: stable
Expand Down Expand Up @@ -158,13 +161,15 @@ jobs:
extension: ""
cross: false
build_record_tool: true
build_record_worker: true
- build: macos aarch64
os: macos-latest
rust: stable
target: aarch64-apple-darwin
extension: ""
cross: true
build_record_tool: true
build_record_worker: true
# - build: windows gnu x64
# os: ubuntu-latest
# rust: stable
Expand Down Expand Up @@ -217,7 +222,15 @@ jobs:
with:
use-cross: ${{ matrix.cross }}
command: build
args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record
args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record_cli

- name: Build record worker
if: matrix.build_record_worker
uses: actions-rs/cargo@v1
with:
use-cross: ${{ matrix.cross }}
command: build
args: --verbose --release --package media-server-record --target ${{ matrix.target }} --bin convert_record_worker

- name: Rename server
if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
Expand All @@ -227,7 +240,12 @@ jobs:
- name: Rename record
if: ${{ matrix.build_record_tool && matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
run: |
mv ./target/${{ matrix.target }}/release/convert_record${{ matrix.extension }} convert_record-${{ matrix.target }}${{ matrix.extension }}
mv ./target/${{ matrix.target }}/release/convert_record_cli${{ matrix.extension }} convert_record_cli-${{ matrix.target }}${{ matrix.extension }}
- name: Rename record worker
if: ${{ matrix.build_record_worker && matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
run: |
mv ./target/${{ matrix.target }}/release/convert_record_worker${{ matrix.extension }} convert_record_worker-${{ matrix.target }}${{ matrix.extension }}
- name: Upload Artifact to Summary
if: ${{ matrix.build != 'windows gnu x64' && matrix.build != 'windows msvc x64' }}
Expand All @@ -237,7 +255,7 @@ jobs:
path: |
*-${{ matrix.target }}${{ matrix.extension }}
- name: Upload server binarie to release
- name: Upload server binary to release
if: startsWith(github.ref, 'refs/tags/')
uses: svenstaro/upload-release-action@v2
with:
Expand All @@ -247,13 +265,23 @@ jobs:
tag: ${{ github.ref }}
overwrite: true

- name: Upload record_tool binarie to release
- name: Upload convert_record_cli binary to release
if: startsWith(github.ref, 'refs/tags/') && matrix.build_record_tool
uses: svenstaro/upload-release-action@v2
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: convert_record-${{ matrix.target }}${{ matrix.extension }}
asset_name: convert_record-${{ matrix.target }}${{ matrix.extension }}
file: convert_record_cli-${{ matrix.target }}${{ matrix.extension }}
asset_name: convert_record_cli-${{ matrix.target }}${{ matrix.extension }}
tag: ${{ github.ref }}
overwrite: true

- name: Upload convert_record_worker binary to release
if: startsWith(github.ref, 'refs/tags/') && matrix.build_record_worker
uses: svenstaro/upload-release-action@v2
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: convert_record_worker-${{ matrix.target }}${{ matrix.extension }}
asset_name: convert_record_worker-${{ matrix.target }}${{ matrix.extension }}
tag: ${{ github.ref }}
overwrite: true

Expand Down
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cross.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ passthrough = [
]

[target.aarch64-unknown-linux-gnu]
pre-build = ["apt-get update && apt-get install -y protobuf-compiler"]
pre-build = [
"apt-get update && apt-get --assume-yes install pkg-config protobuf-compiler libssl-dev"
]
[target.aarch64-unknown-linux-musl]
pre-build = ["apt-get update && apt-get install -y protobuf-compiler"]
pre-build = [
"apt-get update && apt-get --assume-yes install pkg-config protobuf-compiler libssl-dev"
]
9 changes: 7 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ RUN case $TARGETPLATFORM in \
*) exit 1 ;; \
esac; \
mv /tmp/$BUILD/atm0s-media-server-$BUILD /atm0s-media-server; \
chmod +x /atm0s-media-server
mv /tmp/$BUILD/convert_record_cli-$BUILD /convert_record_cli; \
mv /tmp/$BUILD/convert_record_worker-$BUILD /convert_record_worker; \
chmod +x /atm0s-media-server; \
chmod +x /convert_record_cli; \
chmod +x /convert_record_worker

FROM ubuntu:22.04

Expand All @@ -21,5 +25,6 @@ RUN apt update && apt install -y wget curl && apt clean && rm -rf /var/lib/apt/l

COPY maxminddb-data /maxminddb-data
COPY --from=base /atm0s-media-server /atm0s-media-server

COPY --from=base /convert_record_cli /convert_record_cli
COPY --from=base /convert_record_worker /convert_record_worker
ENTRYPOINT ["/atm0s-media-server"]
14 changes: 14 additions & 0 deletions docs/getting-started/installation/single-zone.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ docker run -d --name media2 --net=host ghcr.io/8xff/atm0s-media-gateway:master \
--allow-private-ip
```

## Deploy a record convert worker (optional)

```bash
docker run -d --name record-convert-worker --privileged \
-e INPUT_S3_URI=http://user:password@host/atm0s-record/raw \
-e MULTI_TENANCY_SYNC=MULTI_TENANCY_SYNC_ENDPOINT \
-e SECRET=secr3t \
-e HTTP_ADDR=0.0.0.0:20000 \
--entrypoint /convert_record_worker \
ghcr.io/8xff/atm0s-media-gateway:master
```

API docs: `ENDPOINT:20000/api/docs/`

## Testing your cluster

Some samples required access to microphone and camera permission, therefore it need to run with https if you want to test with some remote servers. We have 2 options for that:
Expand Down
181 changes: 177 additions & 4 deletions docs/user-guide/features/recording.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,182 @@
# Recording (Incomplete)

For maximum performance and flexibility, we don't do any media transcoding when recording. Instead, we record raw media data and store it in a file. This way can also be very flexible; for example, we can record all rooms but only compose some rooms to a single file after that. This way also allow us to replay the recording without any transcoding.
For maximum performance and flexibility, we don't perform any media transcoding during recording. Instead, we record raw media data and store it in a file. This approach provides great flexibility; for example, we can record all rooms but only compose selected rooms into a single file afterwards. This method also allows us to replay recordings without any transcoding.

For simple integration, we use S3 as the storage backend. Each media server will collect raw streams and put them in the S3 upload queue. The S3 destination is configured in the media server config file or fetched dynamically from the connector service.
For simple integration, we use S3 as the storage backend. Each media server collects raw streams and puts them in the S3 upload queue. The S3 destination is configured in the media server config file or fetched dynamically from the connector service.

TODO: Write about some best practices.
## Record flow

# How to use
To start recording, we need to create a join token with the record field set to true. The client will then join the room with this token. After joining, the server starts recording the media stream and creates chunks in memory. If there are too many chunks, they will be written to disk. When a chunk exceeds the size limit or time threshold, the media-server submits it to the upload queue.

## Upload flow

Media-server request connector node for getting s3 presigned url then upload the chunk to s3.

## Compose flow

We fired some hook to registered endpoint when a recording started, then you can save information to your database.

When a recording starts, a hook event is fired to your registered endpoint. Here's an example of the event structure:

```json
{
"node": 22,
"ts": 1731132577151,
"event": {
"Record": {
"app": "app_id",
"room": "room_uuid",
"event": {
"Started": {
"path": "app_id/room_uuid/number"
}
}
}
}
}
```

You can convert record when received room destroyed event.

We provide 2 style of record convert, compose it to single file or transmux to multiple media files. Depend on your need, you can choose one of them.
Inside repo, we have a compose worker in media-record package. We provide two ways to compose:

- Compose by CLI
- Compose by API

### Compose by CLI

```bash
Record file converter for atm0s-media-server. This tool allow convert room raw record to multiple webm files

Usage: convert_record_cli [OPTIONS] --in-s3 <IN_S3>

Options:
--in-s3 <IN_S3> S3 Source [env: IN_S3=]
--transmux Transmux [env: TRANSMUX=]
--transmux-out-s3 <TRANSMUX_OUT_S3> Transmux S3 Dest [env: TRANSMUX_OUT_S3=]
--transmux-out-path <TRANSMUX_OUT_PATH> Transmux Folder Dest [env: TRANSMUX_OUT_PATH=]
--compose-audio Compose audio [env: COMPOSE_AUDIO=]
--compose-video Compose video [env: COMPOSE_VIDEO=]
--compose-out-s3 <COMPOSE_OUT_S3> Compose S3 URL [env: COMPOSE_OUT_S3=]
--compose-out-path <COMPOSE_OUT_PATH> Compose File Path [env: COMPOSE_OUT_PATH=]
-h, --help Print help
-V, --version Print version
```

### Compose by API

The conversion worker provides a REST API endpoint to submit conversion jobs. You can convert recordings either by transmuxing to separate files or composing them into a single file.

To submit a conversion job, send a POST request to `/api/convert/job` with your Bearer token for authentication. Here's the request format:

```json
{
"record_path": "path/to/recording",
"transmux": {
"custom_s3": "http://user:password@host:port/bucket/path?path_style=true" // optional
},
"compose": {
"audio": true,
"video": true,
"custom_s3": "presigned_url" // optional
}
}
```

The API will return a job ID that you can use to track the conversion progress:

```json
{
"status": true,
"data": {
"job_id": "job_12345"
}
}
```

Key parameters:
- `record_path`: S3 path to the source recording
- `transmux`: (Optional) Settings for converting to separate media files
- `compose`: (Optional) Settings for composing into a single file
- `audio`: Enable audio composition
- `video`: Enable video composition
- `custom_s3`: Optional custom S3 output location

At least one of `transmux` or `compose` must be specified in the request.

After you got job id, each time the worker updates the job status, it will fire a hook event to your registered endpoint. You can use this event to update your database.

Here's list of the event structure:

- `Started`: { job_id: string } When a job is created
- `Completed`: { job_id: string } When a job is completed
- `Failed`: { job_id: string } When a job is failed


For more information about the event structure, please refer to protobuf definition in [media-server](https://github.com/8xFF/atm0s-media-server/blob/master/packages/protocol/proto/cluster/connector.proto).

```proto
message HookEvent {
uint32 node = 1;
uint64 ts = 2;
oneof event {
RoomEvent room = 3;
PeerEvent peer = 4;
RecordEvent record = 5;
ComposeEvent compose = 6;
}
}
message ComposeEvent {
message RecordJobStarted {
}
message RecordJobFailed {
string error = 2;
}
message RecordJobCompleted {
message TrackTimeline {
string path = 1;
uint64 start = 2;
uint64 end = 3; // Optional field, can be omitted
}
message TrackSummary {
shared.Kind kind = 1;
repeated TrackTimeline timeline = 2;
}
message SessionSummary {
map<string, TrackSummary> track = 1;
}
message PeerSummary {
map<uint64, SessionSummary> sessions = 1;
}
message TransmuxSummary {
string metadata_json = 1;
map<string, PeerSummary> peers = 2;
}
message ComposeSummary {
string media_uri = 1;
}
TransmuxSummary transmux = 1;
ComposeSummary compose = 2;
}
string app = 1;
string job_id = 2;
oneof event {
RecordJobStarted started = 10;
RecordJobFailed failed = 11;
RecordJobCompleted completed = 12;
}
}
```
6 changes: 3 additions & 3 deletions packages/media_connector/src/sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl ConnectorSqlStorage {
JoinType::InnerJoin,
entity::peer_session::Relation::Peer
.def()
.on_condition(move |_left, right| Expr::col((right, entity::peer::Column::Room)).is(room).into_condition()),
.on_condition(move |_left, right| Expr::col((right, entity::peer::Column::Room)).eq(room).into_condition()),
)
.count(&self.db)
.await
Expand All @@ -550,14 +550,14 @@ impl Storage for ConnectorSqlStorage {
}
async fn on_tick(&mut self, now_ms: u64) {
if let Err(e) = self.close_exited_rooms(now_ms).await {
log::error!("[ConnectorSqlStorage] db error {e:?}");
log::error!("[ConnectorSqlStorage] on_tick db error {e:?}");
}
}
async fn on_event(&mut self, now_ms: u64, from: NodeId, event_ts: u64, event: connector_request::Request) -> Option<connector_response::Response> {
match event {
connector_request::Request::Peer(event) => {
if let Err(e) = self.on_peer_event(now_ms, from, event_ts, &event.app, event.session_id, event.event.clone()?).await {
log::error!("[ConnectorSqlStorage] db error {e:?}");
log::error!("[ConnectorSqlStorage] on_peer_event db error {e:?}");
return None;
}
self.hook_events.push_back((
Expand Down
Loading

0 comments on commit 5c1fd62

Please sign in to comment.