From 3052cf36759a2f47ee10ef218ac5b5d28ff1d9de Mon Sep 17 00:00:00 2001 From: hongcha Date: Sat, 23 Dec 2023 13:20:11 +0800 Subject: [PATCH] fix: metrics --- src/forward/forward_internal.rs | 13 +++++++++---- src/forward/mod.rs | 10 ++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/forward/forward_internal.rs b/src/forward/forward_internal.rs index 14a5ddce..573ab991 100644 --- a/src/forward/forward_internal.rs +++ b/src/forward/forward_internal.rs @@ -29,8 +29,8 @@ use webrtc::track::track_local::{TrackLocal, TrackLocalWriter}; use webrtc::track::track_remote::TrackRemote; use crate::forward::info::Layer; -use crate::media; use crate::AppError; +use crate::{media, metrics}; use super::rtcp::RtcpMessage; use super::track_match; @@ -160,6 +160,7 @@ impl PeerForwardInternal { } info!("[{}] [anchor] set {}", self.id, peer.get_stats_id()); *anchor = Some(peer); + metrics::PUBLISH.inc(); Ok(()) } @@ -180,14 +181,15 @@ impl PeerForwardInternal { subscribe_group.clear(); *anchor = None; info!("[{}] [anchor] set none", self.id); + metrics::PUBLISH.dec(); Ok(()) } pub async fn add_subscribe(&self, peer: Arc) -> Result<()> { let mut subscribe_peers = self.subscribe_group.write().await; subscribe_peers.push(PeerWrap(peer.clone())); - drop(subscribe_peers); info!("[{}] [subscribe] [{}] up", self.id, peer.get_stats_id()); + metrics::SUBSCRIBE.inc(); Ok(()) } @@ -298,9 +300,12 @@ impl PeerForwardInternal { subscription_group.remove(&peer_wrap); } let mut subscribe_peers = self.subscribe_group.write().await; + let size = subscribe_peers.len(); subscribe_peers.retain(|x| x != &peer_wrap); - drop(subscribe_peers); - info!("[{}] [subscribe] [{}] down", self.id, peer.get_stats_id()); + if size != subscribe_peers.len() { + info!("[{}] [subscribe] [{}] down", self.id, peer.get_stats_id()); + metrics::SUBSCRIBE.dec(); + } Ok(()) } diff --git a/src/forward/mod.rs b/src/forward/mod.rs index ca72a5a6..1d3a8d9f 100644 --- a/src/forward/mod.rs +++ b/src/forward/mod.rs @@ -14,8 +14,8 @@ use webrtc::sdp::{MediaDescription, SessionDescription}; use crate::forward::forward_internal::{get_peer_key, PeerForwardInternal}; use crate::forward::info::Layer; +use crate::media; use crate::AppError; -use crate::{media, metrics}; mod forward_internal; pub mod info; @@ -72,11 +72,7 @@ impl PeerForward { RTCPeerConnectionState::Failed | RTCPeerConnectionState::Disconnected => { let _ = pc.close().await; } - RTCPeerConnectionState::Connected => { - metrics::PUBLISH.inc(); - } RTCPeerConnectionState::Closed => { - metrics::PUBLISH.dec(); let _ = internal.remove_anchor(pc).await; } _ => {} @@ -127,7 +123,6 @@ impl PeerForward { let _ = pc.close().await; } RTCPeerConnectionState::Closed => { - metrics::SUBSCRIBE.dec(); let _ = internal.remove_subscribe(pc).await; } _ => {} @@ -140,8 +135,7 @@ impl PeerForward { peer_complete(offer, peer.clone()).await?, get_peer_key(peer.clone()), ); - metrics::SUBSCRIBE.inc(); - let _ = self.internal.add_subscribe(peer).await?; + self.internal.add_subscribe(peer).await?; Ok((sdp, key)) }