Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING CHANGE: switching to sans-io-runtime #257

Merged
merged 17 commits into from
Apr 23, 2024

Conversation

giangndm
Copy link
Contributor

@giangndm giangndm commented Apr 15, 2024

Pull Request

Description

This PR changes the architecture of server with sans-io-runtime.

Related Issue

If this pull request is related to any issue, please mention it here.

Checklist

  • I have tested the changes locally.
  • I have reviewed the code changes.
  • I have updated the documentation, if necessary.
  • I have added appropriate tests, if applicable.

Screenshots

If applicable, add screenshots to help explain the changes made.

Additional Notes

Add any additional notes or context about the pull request here.

Summary by CodeRabbit

  • New Features

    • Introduced a Scalable Media Server solution supporting WebRTC, RTMP, and SIP protocols.
    • Added HTTP server functionality with OpenAPI integration and CORS support.
    • Updated project documentation to reflect shift from alpha stage to performance-focused refactoring.
  • Refactor

    • Reorganized workspace structure and updated dependencies for improved performance and stability.
    • Added new modules for managing media servers, connectors, and gateways.
    • Enhanced media server core with new functionalities for endpoint and transport management.
  • Documentation

    • Added a detailed feature status table in the README.
    • Updated README to outline new project goals and refactoring efforts.
  • Bug Fixes

    • Addressed issues in media server deployment and configuration through enhanced command-line argument parsing.

Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Actionable comments outside the diff hunks (2)
README.md (2)

Line range hint 3-33: Add alternative text to images to improve accessibility.


Line range hint 98-98: Specify the language for the code block to improve readability and syntax highlighting.

- ```bash
+ ```bash

Our primary focus is currently on Goal 1. For details on the current phase plan, please refer to our [Roadmap](https://github.com/8xFF/atm0s-media-server/issues/69).

Goals 2 and 3 are in the research phase. For more information, you can visit this [Repository](https://github.com/giangndm/decentralized-p2p-streaming).
We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix typo in the refactoring description.

- We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance.
+ We are actively refactoring the entire media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).
We are actively refactoring the entire media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).

pub struct TransportSession(pub u64);
pub struct TrackId(pub u16);

pub enum TransportState {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define possible states in TransportState.

Currently, TransportState is an empty enum, which might be a placeholder. Defining actual states can clarify its usage and ensure it fulfills its intended role.


pub enum TransportState {}

pub struct TransportStats {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define possible statistics in TransportStats.

Currently, TransportStats is an empty struct, which might be a placeholder. Defining actual statistics can clarify its usage and ensure it fulfills its intended role.


pub struct TransportStats {}

pub enum TransportControlIn {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define possible control inputs in TransportControlIn.

Currently, TransportControlIn is an empty enum, which might be a placeholder. Defining actual control inputs can clarify its usage and ensure it fulfills its intended role.


pub enum TransportControlIn {}

pub enum TransportControlOut {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define possible control outputs in TransportControlOut.

Currently, TransportControlOut is an empty enum, which might be a placeholder. Defining actual control outputs can clarify its usage and ensure it fulfills its intended role.

}

impl<T: Transport> Endpoint<T> {
fn process_transport_output<'a>(&mut self, out: TransportOutput<'a>) -> Option<Output<'a>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused variable out in process_transport_output method.

The variable out is declared but not used in the process_transport_output method. Consider using it or removing it if it's unnecessary.

Comment on lines 6 to 12
pub struct SharedUdpPort<Task> {
backend_addr: Option<SocketAddr>,
task_remotes: HashMap<SocketAddr, Task>,
task_remotes_map: HashMap<Task, Vec<SocketAddr>>,
task_ufrags: HashMap<String, Task>,
task_ufrags_reverse: HashMap<Task, String>,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused fields in SharedUdpPort.

Several fields in the SharedUdpPort struct (backend_addr, task_remotes, task_remotes_map, task_ufrags, task_ufrags_reverse) are declared but never read. This could indicate dead code or an incomplete implementation. Consider removing them or implementing their functionality.

Comment on lines 26 to 67
impl<Task: Debug + Clone + Copy + Hash + PartialEq + Eq> SharedUdpPort<Task> {
pub fn set_backend_info(&mut self, addr: SocketAddr) {
self.backend_addr = Some(addr);
}

pub fn get_backend_addr(&self) -> Option<SocketAddr> {
self.backend_addr
}

pub fn add_ufrag(&mut self, ufrag: String, task: Task) {
log::info!("Add ufrag {} to task {:?}", ufrag, task);
self.task_ufrags.insert(ufrag.clone(), task);
self.task_ufrags_reverse.insert(task, ufrag);
}

pub fn remove_task(&mut self, task: Task) -> Option<()> {
let ufrag = self.task_ufrags_reverse.remove(&task)?;
log::info!("Remove task {:?} => ufrag {}", task, ufrag);
self.task_ufrags.remove(&ufrag)?;
let remotes = self.task_remotes_map.remove(&task)?;
for remote in remotes {
log::info!(" Remove remote {:?} => task {:?}", remote, task);
self.task_remotes.remove(&remote);
}
Some(())
}

pub fn map_remote(&mut self, remote: SocketAddr, buf: &[u8]) -> Option<Task> {
if let Some(task) = self.task_remotes.get(&remote) {
return Some(*task);
}

let stun_username = Self::get_stun_username(buf)?;
log::warn!("Received a stun packet from an unknown remote: {:?}, username {}", remote, stun_username);
let task = self.task_ufrags.get(stun_username)?;
log::info!("Mapping remote {:?} to task {:?}", remote, task);
self.task_remotes.insert(remote, *task);
self.task_remotes_map.entry(*task).or_default().push(remote);
Some(*task)
}

fn get_stun_username(buf: &[u8]) -> Option<&str> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused methods in SharedUdpPort.

Several methods in the SharedUdpPort struct (set_backend_info, get_backend_addr, add_ufrag, remove_task, map_remote, get_stun_username) are declared but never used. This could indicate dead code or an incomplete implementation. Consider removing them or implementing their functionality.

Comment on lines 28 to 71
impl Transport for TransportWebrtc {
fn on_tick<'a>(&mut self, now: Instant) -> Option<TransportOutput<'a>> {
let next_tick = self.next_tick?;
if next_tick > now {
return None;
}
self.rtc.handle_input(Str0mInput::Timeout(now)).ok()?;
self.pop_output(now)
}

fn on_input<'a>(&mut self, _now: Instant, _input: TransportInput<'a>) -> Option<TransportOutput<'a>> {
todo!()
}

fn pop_output<'a>(&mut self, _now: Instant) -> Option<TransportOutput<'a>> {
loop {
match self.rtc.poll_output().ok()? {
Str0mOutput::Timeout(instance) => {
self.next_tick = Some(instance);
}
Str0mOutput::Transmit(_) => {
//TODO convert transmit to transport output
}
Str0mOutput::Event(event) => match event {
Str0mEvent::Connected => todo!(),
Str0mEvent::IceConnectionStateChange(_) => todo!(),
Str0mEvent::MediaAdded(_) => todo!(),
Str0mEvent::MediaData(_) => todo!(),
Str0mEvent::MediaChanged(_) => todo!(),
Str0mEvent::ChannelOpen(_, _) => todo!(),
Str0mEvent::ChannelData(_) => todo!(),
Str0mEvent::ChannelClose(_) => todo!(),
Str0mEvent::PeerStats(_) => todo!(),
Str0mEvent::MediaIngressStats(_) => todo!(),
Str0mEvent::MediaEgressStats(_) => todo!(),
Str0mEvent::EgressBitrateEstimate(_) => todo!(),
Str0mEvent::KeyframeRequest(_) => todo!(),
Str0mEvent::StreamPaused(_) => todo!(),
Str0mEvent::RtpPacket(_) => todo!(),
_ => {}
},
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement functionality for transport methods.

The methods on_tick, on_input, pop_output use placeholders like todo!() and incomplete implementations. Complete these methods to ensure the transport functions as expected.

@@ -0,0 +1,3 @@
# Atm0s Media Server protocol

This repository contains the protocol definition for inter-communication between the Atm0s Media Server and its clients.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider changing "inter-communication" to "intercommunication" for grammatical correctness.

- inter-communication
+ intercommunication

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
This repository contains the protocol definition for inter-communication between the Atm0s Media Server and its clients.
This repository contains the protocol definition for intercommunication between the Atm0s Media Server and its clients.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 21

Comment on lines +93 to +95
pub fn tasks(&self) -> usize {
self.sdn_worker.tasks() + self.sdn_worker.tasks()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure correct task count calculation.

- self.sdn_worker.tasks() + self.sdn_worker.tasks()
+ self.sdn_worker.tasks() + self.media_cluster.tasks() + self.media_webrtc.tasks()

The current implementation incorrectly sums the tasks of sdn_worker twice. It should include the tasks from media_cluster and media_webrtc to reflect the total number of tasks accurately.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
pub fn tasks(&self) -> usize {
self.sdn_worker.tasks() + self.sdn_worker.tasks()
}
pub fn tasks(&self) -> usize {
self.sdn_worker.tasks() + self.media_cluster.tasks() + self.media_webrtc.tasks()
}

Comment on lines +132 to +140
BackendIncoming::UdpPacket { slot: _, from, data } => {
let out = self.sdn_worker.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(from, data)))?;
Some(self.output_sdn(now, out))
}
BackendIncoming::UdpListenResult { bind: _, result } => {
let (_addr, slot) = result.ok()?;
self.sdn_slot = slot;
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle potential None case explicitly.

- let (_addr, slot) = result.ok()?;
+ if let Some((_addr, slot)) = result {
+     self.sdn_slot = slot;
+ } else {
+     // Handle error or log warning
+ }

The current implementation uses result.ok()? which will return None from the function if result is Err. It's better to handle this case explicitly to ensure that appropriate actions (like logging or error handling) are taken when result is not Ok.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
BackendIncoming::UdpPacket { slot: _, from, data } => {
let out = self.sdn_worker.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(from, data)))?;
Some(self.output_sdn(now, out))
}
BackendIncoming::UdpListenResult { bind: _, result } => {
let (_addr, slot) = result.ok()?;
self.sdn_slot = slot;
None
}
BackendIncoming::UdpPacket { slot: _, from, data } => {
let out = self.sdn_worker.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(from, data)))?;
Some(self.output_sdn(now, out))
}
BackendIncoming::UdpListenResult { bind: _, result } => {
if let Some((_addr, slot)) = result {
self.sdn_slot = slot;
} else {
// Handle error or log warning
}
None
}

Comment on lines +97 to +120
pub fn on_tick<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
let s = &mut self.switcher;
while let Some(c) = s.looper_current(now) {
match c.try_into().ok()? {
TaskType::Sdn => {
let now_ms = self.timer.timestamp_ms(now);
if let Some(out) = s.looper_process(self.sdn_worker.on_tick(now_ms)) {
return Some(self.output_sdn(now, out));
}
}
TaskType::MediaCluster => {
if let Some(out) = s.looper_process(self.media_cluster.on_tick(now)) {
return Some(self.output_cluster(now, out));
}
}
TaskType::MediaWebrtc => {
if let Some(out) = s.looper_process(self.media_webrtc.on_tick(now)) {
return Some(self.output_webrtc(now, out));
}
}
}
}
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor repeated code into a function.

Consider refactoring the repeated pattern of task processing in the on_tick method into a separate function. This will improve code readability and maintainability.

Comment on lines 151 to 177
pub fn pop_output<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
if let Some(out) = self.queue.pop_front() {
return Some(out);
}
let s = &mut self.switcher;
while let Some(c) = s.queue_current() {
match c.try_into().ok()? {
TaskType::Sdn => {
let now_ms = self.timer.timestamp_ms(now);
if let Some(out) = s.looper_process(self.sdn_worker.pop_output(now_ms)) {
return Some(self.output_sdn(now, out));
}
}
TaskType::MediaCluster => {
if let Some(out) = s.looper_process(self.media_cluster.pop_output(now)) {
return Some(self.output_cluster(now, out));
}
}
TaskType::MediaWebrtc => {
if let Some(out) = s.looper_process(self.media_webrtc.pop_output(now)) {
return Some(self.output_webrtc(now, out));
}
}
}
}
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor repeated code into a function.

Similar to the on_tick method, the pop_output method also contains a repeated pattern of task processing. Consider refactoring this into a separate function to reduce code duplication and enhance clarity.

Comment on lines +179 to +202
pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
let s = &mut self.switcher;
while let Some(c) = s.looper_current(now) {
match c.try_into().ok()? {
TaskType::Sdn => {
let now_ms = self.timer.timestamp_ms(now);
if let Some(out) = s.looper_process(self.sdn_worker.on_event(now_ms, SdnWorkerInput::ShutdownRequest)) {
return Some(self.output_sdn(now, out));
}
}
TaskType::MediaCluster => {
if let Some(out) = s.looper_process(self.media_cluster.shutdown(now)) {
return Some(self.output_cluster(now, out));
}
}
TaskType::MediaWebrtc => {
if let Some(out) = s.looper_process(self.media_webrtc.shutdown(now)) {
return Some(self.output_webrtc(now, out));
}
}
}
}
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor repeated code into a function.

The shutdown method shares a similar structure with on_tick and pop_output. Refactoring the common task processing pattern into a separate function would greatly improve the maintainability and readability of the code.

Comment on lines +12 to +15
let stream = new MediaStream();
document.querySelector("video").srcObject = stream;
pc.ontrack = (event) => {
stream.addTrack(event.track);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The verification process has confirmed the presence of the ontrack event handler in whep.demo.js as described in the review. However, no specific test files for this script were found in the repository. This suggests that either the testing is handled in a more integrated environment not specific to whep.demo.js, or the testing coverage might be insufficient for this particular functionality.

It is recommended to:

  • Ensure that comprehensive tests are developed and run to verify the new video track handling functionality.
  • Review the overall testing strategy to include unit and integration tests that cover the changes made in whep.demo.js.
Analysis chain

The modification to handle incoming video tracks by adding them to a new MediaStream object is a good practice, as it allows for more flexible management of media streams. Ensure that this change is tested thoroughly to confirm that it integrates well with the rest of the application.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Test the new video track handling functionality
echo "Ensure to run comprehensive tests on the new video track management."

Length of output: 146



Script:

#!/bin/bash
# Search for test files related to whep.demo.js
fd 'whep.demo.test.js' --exec cat {}

# Search for references to the ontrack event handler in the codebase
rg 'ontrack ='

Length of output: 111

Comment on lines 12 to 14
fn from_str(s: &str) -> Result<Self, Self::Err> {
todo!()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the from_str method for ClusterConnId or mark it with a TODO comment for future implementation.

Comment on lines +27 to +31
// sendEncodings: [
// { rid: "0", active: true, scaleResolutionDownBy: 2},
// { rid: "1", active: true, scaleResolutionDownBy: 2},
// { rid: "2", active: true },
// ],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the reason for commenting out the sendEncodings configuration in the addTransceiver function or mark it as a temporary change if applicable.

Comment on lines 65 to 69
pub trait Transport {
fn on_tick<'a>(&mut self, now: Instant) -> Option<TransportOutput<'a>>;
fn on_input<'a>(&mut self, now: Instant, input: TransportInput<'a>) -> Option<TransportOutput<'a>>;
fn pop_output<'a>(&mut self, now: Instant) -> Option<TransportOutput<'a>>;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of on_tick, on_input, and pop_output methods in the Transport trait or mark them with TODO comments for future implementation.

Comment on lines 35 to 56
pub fn on_tick<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
let out = self.transport.on_tick(now)?;
self.process_transport_output(out)
}

pub fn on_event<'a>(&mut self, now: Instant, input: Input<'a>) -> Option<Output<'a>> {
let input = match input {
Input::Net(net) => TransportInput::Net(net),
_ => todo!(),
};
let out = self.transport.on_input(now, input)?;
self.process_transport_output(out)
}

pub fn pop_output<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
let out = self.transport.pop_output(now)?;
self.process_transport_output(out)
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<'a>> {
todo!()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of on_tick, on_event, pop_output, and shutdown methods in the Endpoint structure or mark them with TODO comments for future implementation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

Comment on lines 12 to 16
impl FromStr for ClusterConnId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
todo!()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of from_str for ClusterConnId or mark it with a TODO comment for future implementation.

@@ -0,0 +1,57 @@
# Media Server Core logic

This module implement core logic of media server. It is responsible for:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the verb form for better readability: "This module implements the core logic of the media server."

- This module implement core logic of media server.
+ This module implements the core logic of the media server.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
This module implement core logic of media server. It is responsible for:
This module implements the core logic of the media server. It is responsible for:


## Modules details

- Transport: interact with sdk or clients
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the spelling mistake: "SDK".

- interact with sdk or clients
+ interact with SDK or clients

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
- Transport: interact with sdk or clients
- Transport: interact with SDK or clients


- Transport: interact with sdk or clients
- Cluster: interact with atm0s-sdn, convert local action into atm0s-sdn APIs.
- Endpoint: interact with transport in/out and cluster in/out, it act as a bridge between transport and cluster.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the correct verb form: "acts".

- it act as a bridge between transport and cluster.
+ it acts as a bridge between transport and cluster.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
- Endpoint: interact with transport in/out and cluster in/out, it act as a bridge between transport and cluster.
- Endpoint: interact with transport in/out and cluster in/out, it acts as a bridge between transport and cluster.

- RemoteTrack: track receiving media from client.
- LocalTrack: track sending data to client.

We design each module is very simple responsebility, and interact to each other with general event and control. We need design for avoiding bias into any protocol, instead we design it for room based interactive media server.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "very simple responsibility" with "clearly defined responsibilities" for better clarity and professionalism.

- We design each module is very simple responsibility, and interact to each other with general event and control.
+ We design each module with clearly defined responsibilities, interacting with each other through general events and controls.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
We design each module is very simple responsebility, and interact to each other with general event and control. We need design for avoiding bias into any protocol, instead we design it for room based interactive media server.
We design each module with clearly defined responsibilities, interacting with each other through general events and controls. We need design for avoiding bias into any protocol, instead we design it for room based interactive media server.

PeerJoined(PeerId),
PeerLeaved(PeerId),
TrackStarted(PeerId, TrackName, TrackMeta),
TrackStoped(PeerId, TrackName),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the typographical error in the TrackStopped event.

- TrackStoped
+ TrackStopped

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
TrackStoped(PeerId, TrackName),
TrackStopped(PeerId, TrackName),

Comment on lines 80 to 94
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
}

pub fn on_input(&mut self, now: Instant, input: Input<Owner>) -> Option<Output<Owner>> {
todo!()
}

pub fn pop_output(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<Owner>> {
todo!()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of on_tick, on_input, pop_output, and shutdown methods in MediaCluster or mark them with TODO comments for future implementation.

Comment on lines 26 to 36
pub fn on_tick<'a, ExtIn>(&mut self, now: Instant) -> Option<InternalOutput<'a, ExtIn>> {
None
}

pub fn pop_output<'a, ExtIn>(&mut self, now: Instant) -> Option<InternalOutput<'a, ExtIn>> {
None
}

pub fn shutdown<'a, ExtIn>(&mut self, now: Instant) -> Option<InternalOutput<'a, ExtIn>> {
None
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of on_tick, pop_output, and shutdown methods in EndpointInternal or mark them with TODO comments for future implementation.

Comment on lines 41 to 87
pub fn on_transport_event<'a, ExtIn, ExtOut>(&mut self, now: Instant, event: TransportEvent<'a, ExtOut>) -> Option<InternalOutput<'a, ExtIn>> {
match event {
TransportEvent::Net(out) => Some(InternalOutput::Net(out)),
TransportEvent::State(state) => self.on_transport_state_changed(now, state),
TransportEvent::RemoteTrack(track, event) => self.on_transport_remote_track(now, track, event),
TransportEvent::LocalTrack(track, event) => self.on_transport_local_track(now, track, event),
TransportEvent::Stats(stats) => self.on_transport_stats(now, stats),
TransportEvent::Control(control) => self.on_transport_control(now, control),
TransportEvent::Ext(_) => panic!("should not get here"),
}
}

fn on_transport_state_changed<'a, ExtIn>(&mut self, now: Instant, state: TransportState) -> Option<InternalOutput<'a, ExtIn>> {
match state {
TransportState::Connecting => todo!(),
TransportState::ConnectError(_) => todo!(),
TransportState::Connected => todo!(),
TransportState::Reconnecting => todo!(),
TransportState::Disconnected(_) => todo!(),
}
}

fn on_transport_remote_track<'a, ExtIn>(&mut self, now: Instant, track: RemoteTrackId, event: RemoteTrackEvent) -> Option<InternalOutput<'a, ExtIn>> {
match event {
RemoteTrackEvent::Started { name } => todo!(),
RemoteTrackEvent::Paused => todo!(),
RemoteTrackEvent::Media(_) => todo!(),
RemoteTrackEvent::Ended => todo!(),
}
}

fn on_transport_local_track<'a, ExtIn>(&mut self, now: Instant, track: LocalTrackId, event: LocalTrackEvent) -> Option<InternalOutput<'a, ExtIn>> {
match event {
LocalTrackEvent::Started { name } => todo!(),
LocalTrackEvent::Paused => todo!(),
LocalTrackEvent::RequestKeyFrame => todo!(),
LocalTrackEvent::Ended => todo!(),
}
}

fn on_transport_control<'a, ExtIn>(&mut self, now: Instant, control: ClientEndpointControl) -> Option<InternalOutput<'a, ExtIn>> {
todo!()
}

fn on_transport_stats<'a, ExtIn>(&mut self, now: Instant, stats: TransportStats) -> Option<InternalOutput<'a, ExtIn>> {
todo!()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of event handling methods in EndpointInternal or mark them with TODO comments for future implementation.

Comment on lines 92 to 113
pub fn on_cluster_event<'a, ExtIn>(&mut self, now: Instant, event: ClusterEndpointEvent) -> Option<InternalOutput<'a, ExtIn>> {
match event {
ClusterEndpointEvent::PeerJoined(peer) => Some(InternalOutput::Transport(TransportControl::Event(ClientEndpointEvent::PeerJoined(peer)))),
ClusterEndpointEvent::PeerLeaved(peer) => Some(InternalOutput::Transport(TransportControl::Event(ClientEndpointEvent::PeerJoined(peer)))),
ClusterEndpointEvent::TrackStarted(peer, track, meta) => Some(InternalOutput::Transport(TransportControl::Event(ClientEndpointEvent::PeerTrackStarted(peer, track, meta)))),
ClusterEndpointEvent::TrackStoped(peer, track) => Some(InternalOutput::Transport(TransportControl::Event(ClientEndpointEvent::PeerTrackStopped(peer, track)))),
ClusterEndpointEvent::RemoteTrack(track, event) => self.on_cluster_remote_track(now, track, event),
ClusterEndpointEvent::LocalTrack(track, event) => self.on_cluster_local_track(now, track, event),
}
}

fn on_cluster_remote_track<'a, ExtIn>(&mut self, now: Instant, id: RemoteTrackId, event: ClusterRemoteTrackEvent) -> Option<InternalOutput<'a, ExtIn>> {
match event {
_ => todo!(),
}
}

fn on_cluster_local_track<'a, ExtIn>(&mut self, now: Instant, id: LocalTrackId, event: ClusterLocalTrackEvent) -> Option<InternalOutput<'a, ExtIn>> {
match event {
_ => todo!(),
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of cluster event handling methods in EndpointInternal or mark them with TODO comments for future implementation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

None
}

//TODO try to get other tracks if avaiable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the typo in the comment.

- //TODO try to get other tracks if avaiable
+ //TODO try to get other tracks if available

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
//TODO try to get other tracks if avaiable
//TODO try to get other tracks if available

pub fn on_cluster_event<'a>(&mut self, now: Instant, event: ClusterEndpointEvent) -> Option<InternalOutput> {
match event {
ClusterEndpointEvent::TrackStarted(peer, track, meta) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStarted(peer, track, meta))),
ClusterEndpointEvent::TrackStoped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the typo in the enum variant name.

- ClusterEndpointEvent::TrackStoped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))),
+ ClusterEndpointEvent::TrackStopped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))),

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
ClusterEndpointEvent::TrackStoped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))),
ClusterEndpointEvent::TrackStopped(peer, track) => Some(InternalOutput::Event(EndpointEvent::PeerTrackStopped(peer, track))),

} else {
let (peer, name, _meta) = self.remote_tracks.remove(&track)?;
log::info!("[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", self.room, peer, name, peers);
Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStoped(peer, name)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the typo in the enum variant name.

- Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStoped(peer, name)))
+ Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStopped(peer, name)))

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStoped(peer, name)))
Some(Output::Endpoint(peers, ClusterEndpointEvent::TrackStopped(peer, name)))

@giangndm giangndm marked this pull request as draft April 23, 2024 04:48
@8xFF 8xFF deleted a comment from coderabbitai bot Apr 23, 2024
Copy link

coderabbitai bot commented Apr 23, 2024

Important

Auto Review Skipped

More than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review.

73 files out of 286 files are above the max files limit of 50. Please upgrade to Pro plan to get higher limits.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

#[derive(Debug, PartialEq, Eq, PartialOrd)]
pub struct F16u(u16);

impl Into<f32> for F16u {

Check warning

Code scanning / clippy

an implementation of From is preferred since it gives you Into<_> for free where the reverse isn't true Warning

an implementation of From is preferred since it gives you Into<_> for free where the reverse isn't true
packages/media_utils/src/f16.rs Fixed Show fixed Hide fixed
#[derive(Debug, PartialEq, Eq, PartialOrd)]
pub struct F16i(i16);

impl Into<f32> for F16i {

Check warning

Code scanning / clippy

an implementation of From is preferred since it gives you Into<_> for free where the reverse isn't true Warning

an implementation of From is preferred since it gives you Into<_> for free where the reverse isn't true
packages/media_utils/src/f16.rs Fixed Show fixed Hide fixed
Some(value)
}

pub fn len(&self) -> usize {

Check warning

Code scanning / clippy

struct Small2dMap has a public len method, but no is_empty method Warning

struct Small2dMap has a public len method, but no is\_empty method
Copy link

codecov bot commented Apr 23, 2024

Codecov Report

Attention: Patch coverage is 1.89394% with 2072 lines in your changes are missing coverage. Please review.

Project coverage is 1.81%. Comparing base (a1453a0) to head (3cbd212).

❗ Current head 3cbd212 differs from pull request most recent head 443ce20. Consider uploading reports for the commit 443ce20 to get more accurate results

Files Patch % Lines
packages/media_core/src/endpoint/internal.rs 0.00% 196 Missing ⚠️
packages/media_runner/src/worker.rs 0.00% 187 Missing ⚠️
packages/transport_webrtc/src/transport/whep.rs 0.00% 184 Missing ⚠️
bin/src/http/api_media.rs 0.00% 136 Missing ⚠️
packages/transport_webrtc/src/transport/whip.rs 0.00% 134 Missing ⚠️
packages/media_core/src/cluster/room.rs 0.00% 132 Missing ⚠️
packages/transport_webrtc/src/transport.rs 0.00% 107 Missing ⚠️
bin/src/server/media/runtime_worker.rs 0.00% 80 Missing ⚠️
packages/media_core/src/endpoint.rs 0.00% 78 Missing ⚠️
packages/media_core/src/cluster/room/metadata.rs 0.00% 77 Missing ⚠️
... and 26 more
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #257       +/-   ##
==========================================
- Coverage   50.13%   1.81%   -48.33%     
==========================================
  Files         156      36      -120     
  Lines       13971    2201    -11770     
==========================================
- Hits         7005      40     -6965     
+ Misses       6966    2161     -4805     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@giangndm giangndm marked this pull request as ready for review April 23, 2024 13:12
@giangndm giangndm merged commit 7911144 into 8xFF:master Apr 23, 2024
8 checks passed
@github-actions github-actions bot mentioned this pull request Mar 4, 2024
@giangndm giangndm deleted the refactor-sans-io-runtime branch June 6, 2024 00:44
giangndm added a commit to giangndm/8xFF-decentralized-media-server that referenced this pull request Nov 26, 2024
This PR switched to use sans-io architecture with sans-io-runtime instead of async-runtime. The main reason is for performance and simplicity in data flow, everything will be sync now, make it easier for predicting time consume
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant