diff --git a/src/core/services/statistics/mod.rs b/src/core/services/statistics/mod.rs index 0e7735be..82ff359a 100644 --- a/src/core/services/statistics/mod.rs +++ b/src/core/services/statistics/mod.rs @@ -67,19 +67,26 @@ pub async fn get_metrics(tracker: Arc) -> TrackerMetrics { TrackerMetrics { torrents_metrics, protocol_metrics: Metrics { + // TCP tcp4_connections_handled: stats.tcp4_connections_handled, tcp4_announces_handled: stats.tcp4_announces_handled, tcp4_scrapes_handled: stats.tcp4_scrapes_handled, tcp6_connections_handled: stats.tcp6_connections_handled, tcp6_announces_handled: stats.tcp6_announces_handled, tcp6_scrapes_handled: stats.tcp6_scrapes_handled, + // UDP + udp_requests_aborted: stats.udp_requests_aborted, + udp4_requests: stats.udp4_requests, udp4_connections_handled: stats.udp4_connections_handled, udp4_announces_handled: stats.udp4_announces_handled, udp4_scrapes_handled: stats.udp4_scrapes_handled, + udp4_responses: stats.udp4_responses, udp4_errors_handled: stats.udp4_errors_handled, + udp6_requests: stats.udp6_requests, udp6_connections_handled: stats.udp6_connections_handled, udp6_announces_handled: stats.udp6_announces_handled, udp6_scrapes_handled: stats.udp6_scrapes_handled, + udp6_responses: stats.udp6_responses, udp6_errors_handled: stats.udp6_errors_handled, }, } diff --git a/src/core/statistics.rs b/src/core/statistics.rs index b106b269..6df7c496 100644 --- a/src/core/statistics.rs +++ b/src/core/statistics.rs @@ -44,13 +44,18 @@ pub enum Event { Tcp4Scrape, Tcp6Announce, Tcp6Scrape, + Udp4RequestAborted, + Udp4Request, Udp4Connect, Udp4Announce, Udp4Scrape, + Udp4Response, Udp4Error, + Udp6Request, Udp6Connect, Udp6Announce, Udp6Scrape, + Udp6Response, Udp6Error, } @@ -72,26 +77,40 @@ pub struct Metrics { pub tcp4_announces_handled: u64, /// Total number of TCP (HTTP tracker) `scrape` requests from IPv4 peers. pub tcp4_scrapes_handled: u64, + /// Total number of TCP (HTTP tracker) connections from IPv6 peers. pub tcp6_connections_handled: u64, /// Total number of TCP (HTTP tracker) `announce` requests from IPv6 peers. pub tcp6_announces_handled: u64, /// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers. pub tcp6_scrapes_handled: u64, + + /// Total number of UDP (UDP tracker) requests aborted. + pub udp_requests_aborted: u64, + + /// Total number of UDP (UDP tracker) requests from IPv4 peers. + pub udp4_requests: u64, /// Total number of UDP (UDP tracker) connections from IPv4 peers. pub udp4_connections_handled: u64, /// Total number of UDP (UDP tracker) `announce` requests from IPv4 peers. pub udp4_announces_handled: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. pub udp4_scrapes_handled: u64, + /// Total number of UDP (UDP tracker) responses from IPv4 peers. + pub udp4_responses: u64, /// Total number of UDP (UDP tracker) `error` requests from IPv4 peers. pub udp4_errors_handled: u64, + + /// Total number of UDP (UDP tracker) requests from IPv6 peers. + pub udp6_requests: u64, /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers. pub udp6_connections_handled: u64, /// Total number of UDP (UDP tracker) `announce` requests from IPv6 peers. pub udp6_announces_handled: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers. pub udp6_scrapes_handled: u64, + /// Total number of UDP (UDP tracker) responses from IPv6 peers. + pub udp6_responses: u64, /// Total number of UDP (UDP tracker) `error` requests from IPv6 peers. pub udp6_errors_handled: u64, } @@ -164,7 +183,15 @@ async fn event_handler(event: Event, stats_repository: &Repo) { stats_repository.increase_tcp6_connections().await; } + // UDP + Event::Udp4RequestAborted => { + stats_repository.increase_udp_requests_aborted().await; + } + // UDP4 + Event::Udp4Request => { + stats_repository.increase_udp4_requests().await; + } Event::Udp4Connect => { stats_repository.increase_udp4_connections().await; } @@ -174,11 +201,17 @@ async fn event_handler(event: Event, stats_repository: &Repo) { Event::Udp4Scrape => { stats_repository.increase_udp4_scrapes().await; } + Event::Udp4Response => { + stats_repository.increase_udp4_responses().await; + } Event::Udp4Error => { stats_repository.increase_udp4_errors().await; } // UDP6 + Event::Udp6Request => { + stats_repository.increase_udp6_requests().await; + } Event::Udp6Connect => { stats_repository.increase_udp6_connections().await; } @@ -188,6 +221,9 @@ async fn event_handler(event: Event, stats_repository: &Repo) { Event::Udp6Scrape => { stats_repository.increase_udp6_scrapes().await; } + Event::Udp6Response => { + stats_repository.increase_udp6_responses().await; + } Event::Udp6Error => { stats_repository.increase_udp6_errors().await; } @@ -276,6 +312,18 @@ impl Repo { drop(stats_lock); } + pub async fn increase_udp_requests_aborted(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp_requests_aborted += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_requests(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_requests += 1; + drop(stats_lock); + } + pub async fn increase_udp4_connections(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp4_connections_handled += 1; @@ -294,12 +342,24 @@ impl Repo { drop(stats_lock); } + pub async fn increase_udp4_responses(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_responses += 1; + drop(stats_lock); + } + pub async fn increase_udp4_errors(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp4_errors_handled += 1; drop(stats_lock); } + pub async fn increase_udp6_requests(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_requests += 1; + drop(stats_lock); + } + pub async fn increase_udp6_connections(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp6_connections_handled += 1; @@ -318,6 +378,12 @@ impl Repo { drop(stats_lock); } + pub async fn increase_udp6_responses(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_responses += 1; + drop(stats_lock); + } + pub async fn increase_udp6_errors(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp6_errors_handled += 1; diff --git a/src/servers/apis/v1/context/stats/resources.rs b/src/servers/apis/v1/context/stats/resources.rs index de6f6ca8..e7057f30 100644 --- a/src/servers/apis/v1/context/stats/resources.rs +++ b/src/servers/apis/v1/context/stats/resources.rs @@ -26,26 +26,40 @@ pub struct Stats { pub tcp4_announces_handled: u64, /// Total number of TCP (HTTP tracker) `scrape` requests from IPv4 peers. pub tcp4_scrapes_handled: u64, + /// Total number of TCP (HTTP tracker) connections from IPv6 peers. pub tcp6_connections_handled: u64, /// Total number of TCP (HTTP tracker) `announce` requests from IPv6 peers. pub tcp6_announces_handled: u64, /// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers. pub tcp6_scrapes_handled: u64, + + /// Total number of UDP (UDP tracker) requests aborted. + pub udp_requests_aborted: u64, + + /// Total number of UDP (UDP tracker) requests from IPv4 peers. + pub udp4_requests: u64, /// Total number of UDP (UDP tracker) connections from IPv4 peers. pub udp4_connections_handled: u64, /// Total number of UDP (UDP tracker) `announce` requests from IPv4 peers. pub udp4_announces_handled: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. pub udp4_scrapes_handled: u64, + /// Total number of UDP (UDP tracker) responses from IPv4 peers. + pub udp4_responses: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. pub udp4_errors_handled: u64, + + /// Total number of UDP (UDP tracker) requests from IPv6 peers. + pub udp6_requests: u64, /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers. pub udp6_connections_handled: u64, /// Total number of UDP (UDP tracker) `announce` requests from IPv6 peers. pub udp6_announces_handled: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers. pub udp6_scrapes_handled: u64, + /// Total number of UDP (UDP tracker) responses from IPv6 peers. + pub udp6_responses: u64, /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers. pub udp6_errors_handled: u64, } @@ -57,19 +71,26 @@ impl From for Stats { seeders: metrics.torrents_metrics.complete, completed: metrics.torrents_metrics.downloaded, leechers: metrics.torrents_metrics.incomplete, + // TCP tcp4_connections_handled: metrics.protocol_metrics.tcp4_connections_handled, tcp4_announces_handled: metrics.protocol_metrics.tcp4_announces_handled, tcp4_scrapes_handled: metrics.protocol_metrics.tcp4_scrapes_handled, tcp6_connections_handled: metrics.protocol_metrics.tcp6_connections_handled, tcp6_announces_handled: metrics.protocol_metrics.tcp6_announces_handled, tcp6_scrapes_handled: metrics.protocol_metrics.tcp6_scrapes_handled, + // UDP + udp_requests_aborted: metrics.protocol_metrics.udp_requests_aborted, + udp4_requests: metrics.protocol_metrics.udp4_requests, udp4_connections_handled: metrics.protocol_metrics.udp4_connections_handled, udp4_announces_handled: metrics.protocol_metrics.udp4_announces_handled, udp4_scrapes_handled: metrics.protocol_metrics.udp4_scrapes_handled, + udp4_responses: metrics.protocol_metrics.udp4_responses, udp4_errors_handled: metrics.protocol_metrics.udp4_errors_handled, + udp6_requests: metrics.protocol_metrics.udp6_requests, udp6_connections_handled: metrics.protocol_metrics.udp6_connections_handled, udp6_announces_handled: metrics.protocol_metrics.udp6_announces_handled, udp6_scrapes_handled: metrics.protocol_metrics.udp6_scrapes_handled, + udp6_responses: metrics.protocol_metrics.udp6_responses, udp6_errors_handled: metrics.protocol_metrics.udp6_errors_handled, } } @@ -94,20 +115,27 @@ mod tests { torrents: 4 }, protocol_metrics: Metrics { + // TCP tcp4_connections_handled: 5, tcp4_announces_handled: 6, tcp4_scrapes_handled: 7, tcp6_connections_handled: 8, tcp6_announces_handled: 9, tcp6_scrapes_handled: 10, - udp4_connections_handled: 11, - udp4_announces_handled: 12, - udp4_scrapes_handled: 13, - udp4_errors_handled: 14, - udp6_connections_handled: 15, - udp6_announces_handled: 16, - udp6_scrapes_handled: 17, - udp6_errors_handled: 18 + // UDP + udp_requests_aborted: 11, + udp4_requests: 12, + udp4_connections_handled: 13, + udp4_announces_handled: 14, + udp4_scrapes_handled: 15, + udp4_responses: 16, + udp4_errors_handled: 17, + udp6_requests: 18, + udp6_connections_handled: 19, + udp6_announces_handled: 20, + udp6_scrapes_handled: 21, + udp6_responses: 22, + udp6_errors_handled: 23 } }), Stats { @@ -115,20 +143,27 @@ mod tests { seeders: 1, completed: 2, leechers: 3, + // TCP tcp4_connections_handled: 5, tcp4_announces_handled: 6, tcp4_scrapes_handled: 7, tcp6_connections_handled: 8, tcp6_announces_handled: 9, tcp6_scrapes_handled: 10, - udp4_connections_handled: 11, - udp4_announces_handled: 12, - udp4_scrapes_handled: 13, - udp4_errors_handled: 14, - udp6_connections_handled: 15, - udp6_announces_handled: 16, - udp6_scrapes_handled: 17, - udp6_errors_handled: 18 + // UDP + udp_requests_aborted: 11, + udp4_requests: 12, + udp4_connections_handled: 13, + udp4_announces_handled: 14, + udp4_scrapes_handled: 15, + udp4_responses: 16, + udp4_errors_handled: 17, + udp6_requests: 18, + udp6_connections_handled: 19, + udp6_announces_handled: 20, + udp6_scrapes_handled: 21, + udp6_responses: 22, + udp6_errors_handled: 23 } ); } diff --git a/src/servers/apis/v1/context/stats/responses.rs b/src/servers/apis/v1/context/stats/responses.rs index 4fd8be94..6b214d0c 100644 --- a/src/servers/apis/v1/context/stats/responses.rs +++ b/src/servers/apis/v1/context/stats/responses.rs @@ -47,6 +47,12 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { tracker_metrics.protocol_metrics.tcp6_scrapes_handled )); + lines.push(format!( + "udp_requests_aborted {}", + tracker_metrics.protocol_metrics.udp_requests_aborted + )); + + lines.push(format!("udp4_requests {}", tracker_metrics.protocol_metrics.udp4_requests)); lines.push(format!( "udp4_connections_handled {}", tracker_metrics.protocol_metrics.udp4_connections_handled @@ -59,11 +65,13 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { "udp4_scrapes_handled {}", tracker_metrics.protocol_metrics.udp4_scrapes_handled )); + lines.push(format!("udp4_responses {}", tracker_metrics.protocol_metrics.udp4_responses)); lines.push(format!( "udp4_errors_handled {}", tracker_metrics.protocol_metrics.udp4_errors_handled )); + lines.push(format!("udp6_requests {}", tracker_metrics.protocol_metrics.udp6_requests)); lines.push(format!( "udp6_connections_handled {}", tracker_metrics.protocol_metrics.udp6_connections_handled @@ -76,6 +84,7 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { "udp6_scrapes_handled {}", tracker_metrics.protocol_metrics.udp6_scrapes_handled )); + lines.push(format!("udp6_responses {}", tracker_metrics.protocol_metrics.udp6_responses)); lines.push(format!( "udp6_errors_handled {}", tracker_metrics.protocol_metrics.udp6_errors_handled diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index c8bac809..d6827346 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Duration; @@ -11,7 +11,7 @@ use tracing::instrument; use super::request_buffer::ActiveRequests; use crate::bootstrap::jobs::Started; -use crate::core::Tracker; +use crate::core::{statistics, Tracker}; use crate::servers::logging::STARTED_ON; use crate::servers::registar::ServiceHealthCheckJob; use crate::servers::signals::{shutdown_signal_with_message, Halted}; @@ -140,6 +140,15 @@ impl Launcher { } }; + match req.from.ip() { + IpAddr::V4(_) => { + tracker.send_stats_event(statistics::Event::Udp4Request).await; + } + IpAddr::V6(_) => { + tracker.send_stats_event(statistics::Event::Udp6Request).await; + } + } + // We spawn the new task even if there active requests buffer is // full. This could seem counterintuitive because we are accepting // more request and consuming more memory even if the server is @@ -157,7 +166,12 @@ impl Launcher { continue; } - active_requests.force_push(abort_handle, &local_addr).await; + let old_request_aborted = active_requests.force_push(abort_handle, &local_addr).await; + + if old_request_aborted { + // Evicted task from active requests buffer was aborted. + tracker.send_stats_event(statistics::Event::Udp4RequestAborted).await; + } } else { tokio::task::yield_now().await; diff --git a/src/servers/udp/server/processor.rs b/src/servers/udp/server/processor.rs index 703367f3..fc39f28b 100644 --- a/src/servers/udp/server/processor.rs +++ b/src/servers/udp/server/processor.rs @@ -1,12 +1,12 @@ use std::io::Cursor; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::Response; use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; -use crate::core::Tracker; +use crate::core::{statistics, Tracker}; use crate::servers::udp::handlers::CookieTimeValues; use crate::servers::udp::{handlers, RawRequest}; @@ -64,6 +64,15 @@ impl Processor { } else { tracing::debug!(%bytes_count, %sent_bytes, "sent {response_type}"); } + + match target.ip() { + IpAddr::V4(_) => { + self.tracker.send_stats_event(statistics::Event::Udp4Response).await; + } + IpAddr::V6(_) => { + self.tracker.send_stats_event(statistics::Event::Udp6Response).await; + } + } } Err(error) => tracing::warn!(%bytes_count, %error, ?payload, "failed to send"), }; diff --git a/src/servers/udp/server/request_buffer.rs b/src/servers/udp/server/request_buffer.rs index ffbd9565..03cb6040 100644 --- a/src/servers/udp/server/request_buffer.rs +++ b/src/servers/udp/server/request_buffer.rs @@ -41,6 +41,8 @@ impl ActiveRequests { /// 1. Removing finished tasks. /// 2. Removing the oldest unfinished task if no finished tasks are found. /// + /// Returns `true` if a task was removed, `false` otherwise. + /// /// # Panics /// /// This method will panic if it cannot make space for adding a new handle. @@ -49,17 +51,19 @@ impl ActiveRequests { /// /// * `abort_handle` - The `AbortHandle` for the UDP request processor task. /// * `local_addr` - A string slice representing the local address for logging. - pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) { + pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) -> bool { // Attempt to add the new handle to the buffer. match self.rb.try_push(new_task) { Ok(()) => { // Successfully added the task, no further action needed. + false } Err(new_task) => { // Buffer is full, attempt to make space. let mut finished: u64 = 0; let mut unfinished_task = None; + let mut old_task_aborted = false; for old_task in self.rb.pop_iter() { // We found a finished tasks ... increase the counter and @@ -96,6 +100,7 @@ impl ActiveRequests { if finished == 0 { // We make place aborting this task. old_task.abort(); + old_task_aborted = true; tracing::warn!( target: UDP_TRACKER_LOG_TARGET, @@ -134,7 +139,9 @@ impl ActiveRequests { if !new_task.is_finished() { self.rb.try_push(new_task).expect("it should have space for this new task."); } + + old_task_aborted } - }; + } } } diff --git a/tests/servers/api/v1/contract/context/stats.rs b/tests/servers/api/v1/contract/context/stats.rs index 463dc563..7853450e 100644 --- a/tests/servers/api/v1/contract/context/stats.rs +++ b/tests/servers/api/v1/contract/context/stats.rs @@ -34,19 +34,26 @@ async fn should_allow_getting_tracker_statistics() { seeders: 1, completed: 0, leechers: 0, + // TCP tcp4_connections_handled: 0, tcp4_announces_handled: 0, tcp4_scrapes_handled: 0, tcp6_connections_handled: 0, tcp6_announces_handled: 0, tcp6_scrapes_handled: 0, + // UDP + udp_requests_aborted: 0, + udp4_requests: 0, udp4_connections_handled: 0, udp4_announces_handled: 0, udp4_scrapes_handled: 0, + udp4_responses: 0, udp4_errors_handled: 0, + udp6_requests: 0, udp6_connections_handled: 0, udp6_announces_handled: 0, udp6_scrapes_handled: 0, + udp6_responses: 0, udp6_errors_handled: 0, }, )