Skip to content

Commit

Permalink
fix: crash assert on destroy (#449)
Browse files Browse the repository at this point in the history
* new resource clean strategy with new sans-io-runtime lib

* avoid shutdown loop when rtp timeout with rtpengine
  • Loading branch information
giangndm authored Nov 13, 2024
1 parent c563f2d commit 436d7ea
Show file tree
Hide file tree
Showing 50 changed files with 1,203 additions and 940 deletions.
133 changes: 66 additions & 67 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ members = [
]

[workspace.dependencies]
sans-io-runtime = { version = "0.2", default-features = false }
atm0s-sdn = { version = "0.2.2", default-features = false }
sans-io-runtime = { version = "0.3", default-features = false }
atm0s-sdn = { version = "0.2", default-features = false }
atm0s-sdn-network = { version = "0.6", default-features = false }
tokio = "1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand All @@ -37,3 +38,4 @@ prost = "0.13"
indexmap = "2.2"
spin = "0.9"
httpmock = "0.7"
test-log = "0.2"
18 changes: 16 additions & 2 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct MediaRuntimeWorker<ES: 'static + MediaEdgeSecure> {
index: u16,
worker: MediaServerWorker<ES>,
queue: VecDeque<Output>,
shutdown: bool,
}

impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, Event, ICfg<ES>, SCfg> for MediaRuntimeWorker<ES> {
Expand All @@ -72,7 +73,12 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
cfg.media,
);
log::info!("created worker");
MediaRuntimeWorker { index, worker, queue }
MediaRuntimeWorker {
index,
worker,
queue,
shutdown: false,
}
}

fn worker_index(&self) -> u16 {
Expand All @@ -83,6 +89,10 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
self.worker.tasks()
}

fn is_empty(&self) -> bool {
self.shutdown && self.queue.is_empty() && self.worker.is_empty()
}

fn spawn(&mut self, _now: Instant, _cfg: SCfg) {
panic!("Not supported")
}
Expand All @@ -104,7 +114,11 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
}

fn on_shutdown(&mut self, now: Instant) {
self.worker.shutdown(now);
if self.shutdown {
return;
}
self.shutdown = true;
self.worker.on_shutdown(now);
}
}

Expand Down
18 changes: 18 additions & 0 deletions docs/contributor-guide/resource_clear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Resource Clear

This server is built on top of [sans-io-runtime](https://github.com/atm0s-org/sans-io-runtime), which is a runtime for building server applications. With this reason we don't use mechanism of `Drop` to release resource. Instead, we use a queue to manage the resource release.

We have 2 types of tasks:

- Self-managed tasks: for example, the endpoint task
- Dependent tasks: for example, the cluster room task

For self-managed tasks, the task itself determines when to kill itself. For dependent tasks, the task will be killed when all of its dependent tasks are un-linked. Each task type has a different way to handle task termination.

## Self-managed task

Example: The endpoint task can automatically release resources when the client disconnects.

## Dependent task

Example: The cluster room task needs to wait for all dependent tasks (endpoint track, mixer, or data channel) to be un-linked before destroying itself.
2 changes: 2 additions & 0 deletions packages/audio_mixer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const SWITCH_AUDIO_THRESHOLD: i16 = 30;
/// if no audio pkt received in AUDIO_SLOT_TIMEOUT, set audio level to SILENT_LEVEL
const AUDIO_SLOT_TIMEOUT: Duration = Duration::from_millis(1000);

#[derive(Debug)]
struct SourceState {
last_changed_at: Instant,
slot: Option<usize>,
Expand All @@ -24,6 +25,7 @@ struct OutputSlotState<Src> {

/// Implement lightweight audio mixer with mix-minus feature
/// We will select n highest audio-level tracks
#[derive(Debug)]
pub struct AudioMixer<Src> {
len: usize,
sources: HashMap<Src, SourceState>,
Expand Down
Loading

0 comments on commit 436d7ea

Please sign in to comment.