From 31c1a8eade2ef09446d715c358c1071d133d96ac Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Wed, 27 Nov 2024 00:18:48 +0700 Subject: [PATCH] refactor: remove ngcontrol protocol and update sysinfo lib (#465) --- Cargo.lock | 34 +--- Cargo.toml | 5 +- bin/Cargo.toml | 5 +- bin/src/lib.rs | 1 - bin/src/ng_controller.rs | 126 ------------ bin/src/server/gateway.rs | 17 +- bin/src/server/media.rs | 19 -- bin/src/server/standalone.rs | 1 - packages/rtpengine_ngcontrol/Cargo.toml | 10 - packages/rtpengine_ngcontrol/src/commands.rs | 183 ------------------ packages/rtpengine_ngcontrol/src/lib.rs | 5 - packages/rtpengine_ngcontrol/src/transport.rs | 12 -- .../rtpengine_ngcontrol/src/transport/udp.rs | 54 ------ 13 files changed, 7 insertions(+), 465 deletions(-) delete mode 100644 bin/src/ng_controller.rs delete mode 100644 packages/rtpengine_ngcontrol/Cargo.toml delete mode 100644 packages/rtpengine_ngcontrol/src/commands.rs delete mode 100644 packages/rtpengine_ngcontrol/src/lib.rs delete mode 100644 packages/rtpengine_ngcontrol/src/transport.rs delete mode 100644 packages/rtpengine_ngcontrol/src/transport/udp.rs diff --git a/Cargo.lock b/Cargo.lock index 3c421827..17750a18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -496,7 +496,6 @@ dependencies = [ "rand 0.8.5", "rcgen", "reqwest", - "rtpengine-ngcontrol", "rust-embed", "rustls", "sans-io-runtime", @@ -5201,16 +5200,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4ed274a5b3d36c4434cff6a4de1b42f43e64ae326b1cfa72d13d9037a314355" -[[package]] -name = "rtpengine-ngcontrol" -version = "0.1.0" -dependencies = [ - "log", - "serde", - "serde_bencode", - "tokio", -] - [[package]] name = "rust-embed" version = "8.5.0" @@ -5857,25 +5846,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_bencode" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a70dfc7b7438b99896e7f8992363ab8e2c4ba26aa5ec675d32d1c3c2c33d413e" -dependencies = [ - "serde", - "serde_bytes", -] - -[[package]] -name = "serde_bytes" -version = "0.11.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" -dependencies = [ - "serde", -] - [[package]] name = "serde_derive" version = "1.0.215" @@ -6579,9 +6549,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.31.4" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" +checksum = "4c33cd241af0f2e9e3b5c32163b873b29956890b5342e6745b917ce9d490f4af" dependencies = [ "core-foundation-sys", "libc", diff --git a/Cargo.toml b/Cargo.toml index 1a71fcf8..61d68b5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,13 +15,12 @@ members = [ "packages/media_record", "packages/media_codecs", "packages/multi_tenancy", - "packages/rtpengine_ngcontrol", ] [workspace.dependencies] sans-io-runtime = { version = "0.3", default-features = false } -atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } -atm0s-sdn-network = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", branch = "feat-router-dump", default-features = false } +atm0s-sdn = { version = "0.2", default-features = false } +atm0s-sdn-network = { version = "0.6", default-features = false } tokio = "1.37" tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] } convert-enum = "0.1" diff --git a/bin/Cargo.toml b/bin/Cargo.toml index f25a5872..afe5d5fe 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -25,7 +25,6 @@ media-server-connector = { path = "../packages/media_connector", optional = true media-server-record = { path = "../packages/media_record", default-features=false, optional = true } media-server-utils = { path = "../packages/media_utils", optional = true } media-server-multi-tenancy = { path = "../packages/multi_tenancy", optional = true } -rtpengine-ngcontrol = { path = "../packages/rtpengine_ngcontrol", optional = true } local-ip-address = "0.6" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } @@ -36,7 +35,7 @@ num_enum = { workspace = true } derive_more = { workspace = true, features = ["full"] } rcgen = { version = "0.13", optional = true } maxminddb = { version = "0.24", optional = true } -sysinfo = { version = "0.31", optional = true } +sysinfo = { version = "0.32", optional = true } hex = { version = "0.4", optional = true } mime_guess = { version = "2.0", optional = true } reqwest = { version = "0.12", features = ["json"]} @@ -46,7 +45,7 @@ sentry = "0.34" default = ["console", "gateway", "media", "connector", "standalone", "cert_utils"] standalone = ["console", "gateway", "media", "connector"] gateway = ["media-server-gateway", "media-server-connector", "quinn_vnet", "node_metrics", "maxminddb", "rust-embed", "media-server-multi-tenancy"] -media = ["media-server-runner", "media-server-record", "quinn_vnet", "node_metrics", "rtpengine-ngcontrol"] +media = ["media-server-runner", "media-server-record", "quinn_vnet", "node_metrics"] console = [] connector = ["quinn_vnet", "media-server-connector", "media-server-utils", "media-server-multi-tenancy"] cert_utils = ["rcgen", "rustls"] diff --git a/bin/src/lib.rs b/bin/src/lib.rs index bb18ee35..416f8520 100644 --- a/bin/src/lib.rs +++ b/bin/src/lib.rs @@ -9,7 +9,6 @@ use media_server_protocol::cluster::ZoneId; mod errors; mod http; -mod ng_controller; #[cfg(feature = "node_metrics")] mod node_metrics; #[cfg(feature = "quinn_vnet")] diff --git a/bin/src/ng_controller.rs b/bin/src/ng_controller.rs deleted file mode 100644 index c97a70fd..00000000 --- a/bin/src/ng_controller.rs +++ /dev/null @@ -1,126 +0,0 @@ -use media_server_protocol::cluster::gen_cluster_session_id; -use media_server_protocol::endpoint::ClusterConnId; -use media_server_protocol::tokens::RtpEngineToken; -use media_server_protocol::transport::{rtpengine, RpcReq, RpcRes}; -use media_server_secure::MediaEdgeSecure; -use media_server_utils::select2; -use rtpengine_ngcontrol::{NgCmdResult, NgCommand, NgRequest, NgResponse, NgTransport}; -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -use crate::rpc::Rpc; - -pub struct NgControllerServer { - transport: T, - secure: Arc, - rpc_sender: Sender, RpcRes>>, - answer_tx: Sender<(String, RpcRes, SocketAddr)>, - answer_rx: Receiver<(String, RpcRes, SocketAddr)>, -} - -impl NgControllerServer { - pub fn new(transport: T, secure: Arc, rpc_sender: Sender, RpcRes>>) -> Self { - let (answer_tx, answer_rx) = channel(10); - Self { - transport, - secure, - rpc_sender, - answer_tx, - answer_rx, - } - } - - pub async fn recv(&mut self) -> Option<()> { - match select2::or(self.transport.recv(), self.answer_rx.recv()).await { - select2::OrOutput::Left(Some((req, remote))) => self.process_req(req, remote).await, - select2::OrOutput::Right(Some((id, res, dest))) => self.process_res(id, res, dest).await, - select2::OrOutput::Left(None) => None, - select2::OrOutput::Right(None) => None, - } - } - - async fn process_req(&mut self, req: NgRequest, remote: SocketAddr) -> Option<()> { - let rpc_req = match req.command { - NgCommand::Ping => { - self.transport.send(req.answer(NgCmdResult::Pong { result: "ok".to_string() }), remote).await; - return Some(()); - } - NgCommand::Offer { ref sdp, ref atm0s_token, .. } | NgCommand::Answer { ref sdp, ref atm0s_token, .. } => { - if let Some((app_ctx, token)) = self.secure.decode_token::(atm0s_token) { - let session_id = gen_cluster_session_id(); - rtpengine::RpcReq::CreateAnswer(rtpengine::RtpCreateAnswerRequest { - app: app_ctx, - session_id, - room: token.room.into(), - peer: token.peer.into(), - sdp: sdp.clone(), - record: token.record, - extra_data: token.extra_data, - }) - } else { - self.send_err(&req, "TOKEN_FAILED", "Token parse error", remote).await; - return Some(()); - } - } - NgCommand::Delete { ref conn_id, .. } => { - if let Ok(conn) = ClusterConnId::from_str(conn_id) { - rtpengine::RpcReq::Delete(conn) - } else { - self.send_err(&req, "NOT_FOUND", "Connection parse error", remote).await; - return Some(()); - } - } - }; - - let (rpc_req, rx) = Rpc::new(RpcReq::RtpEngine(rpc_req)); - let answer_tx = self.answer_tx.clone(); - let req_id = req.id.clone(); - tokio::spawn(async move { - match rx.await { - Ok(res) => { - if let Err(e) = answer_tx.send((req_id, res, remote)).await { - log::error!("[NgControllerServer] send answer to main task error {e:?}"); - } - } - Err(_err) => { - //TODO send error - } - } - }); - self.rpc_sender.send(rpc_req).await.ok() - } - - async fn process_res(&mut self, id: String, res: RpcRes, dest: SocketAddr) -> Option<()> { - let result = match res { - RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Ok((conn, sdp)))) => NgCmdResult::Answer { - result: "ok".to_string(), - conn: Some(conn.to_string()), - sdp: Some(sdp), - }, - RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Ok(_conn))) => NgCmdResult::Delete { result: "ok".to_string() }, - RpcRes::RtpEngine(rtpengine::RpcRes::CreateAnswer(Err(res))) | RpcRes::RtpEngine(rtpengine::RpcRes::Delete(Err(res))) => NgCmdResult::Error { - result: res.code.to_string(), - error_reason: res.message, - }, - _ => { - return Some(()); - } - }; - self.transport.send(NgResponse { id, result }, dest).await; - Some(()) - } - - async fn send_err(&self, req: &NgRequest, result: &str, err: &str, remote: SocketAddr) { - self.transport - .send( - req.answer(NgCmdResult::Error { - error_reason: err.to_string(), - result: result.to_string(), - }), - remote, - ) - .await; - } -} diff --git a/bin/src/server/gateway.rs b/bin/src/server/gateway.rs index 55738ae5..0126c11c 100644 --- a/bin/src/server/gateway.rs +++ b/bin/src/server/gateway.rs @@ -17,14 +17,12 @@ use media_server_protocol::{ rpc::quinn::{QuinnClient, QuinnServer}, }; use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; -use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; use tokio::sync::mpsc::channel; use crate::{ http::{run_gateway_http_server, NodeApiCtx}, - ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_client, make_quinn_server, VirtualNetwork}, NodeConfig, @@ -119,20 +117,6 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let gateway_secure = MediaGatewaySecureJwt::new(node.secret.as_bytes(), app_storage.clone()); let gateway_secure = Arc::new(gateway_secure); - let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); - // Running ng controller for Voip - if let Some(ngproto_addr) = args.rtpengine_cmd_addr { - let req_tx = req_tx.clone(); - let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; - let secure2 = edge_secure.clone(); - tokio::spawn(async move { - log::info!("[MediaGateway] start ng_controller task"); - let mut server = NgControllerServer::new(rtpengine_udp, secure2, req_tx); - while server.recv().await.is_some() {} - log::info!("[MediaGateway] stop ng_controller task"); - }); - } - // Setup Sdn let node_id = node.node_id; let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt); @@ -165,6 +149,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option, node: Nod let (selector, mut requester) = build_dest_selector(); // Setup HTTP server + let (req_tx, mut req_rx) = tokio::sync::mpsc::channel(1024); let (dump_tx, mut dump_rx) = channel(10); if let Some(http_port) = http_port { let req_tx = req_tx.clone(); diff --git a/bin/src/server/media.rs b/bin/src/server/media.rs index 5b5a15be..80b3c1d5 100644 --- a/bin/src/server/media.rs +++ b/bin/src/server/media.rs @@ -25,14 +25,12 @@ use media_server_runner::{MediaConfig, UserData, SE}; use media_server_secure::jwt::{MediaEdgeSecureJwt, MediaGatewaySecureJwt}; use media_server_utils::now_ms; use rand::random; -use rtpengine_ngcontrol::NgUdpTransport; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use sans_io_runtime::{backend::PollingBackend, Controller}; use tokio::sync::mpsc::channel; use crate::{ http::{run_media_http_server, NodeApiCtx}, - ng_controller::NgControllerServer, node_metrics::NodeMetricsCollector, quinn::{make_quinn_server, VirtualNetwork}, server::media::runtime_worker::MediaRuntimeWorker, @@ -60,10 +58,6 @@ pub struct Args { #[arg(env, long, default_value_t = 0)] pub webrtc_port_seed: u16, - /// The port for binding the RTPengine command UDP socket. - #[arg(env, long)] - pub rtpengine_cmd_addr: Option, - /// The IP address for RTPengine RTP listening. /// Default: 127.0.0.1 #[arg(env, long, default_value = "127.0.0.1")] @@ -119,19 +113,6 @@ pub async fn run_media_server(workers: usize, http_port: Option, node: Node }); } - //Running ng controller for Voip - if let Some(ngproto_addr) = args.rtpengine_cmd_addr { - let req_tx = req_tx.clone(); - let rtpengine_udp = NgUdpTransport::new(ngproto_addr).await; - let secure = secure.clone(); - tokio::spawn(async move { - log::info!("[MediaServer] start ng_controller task"); - let mut server = NgControllerServer::new(rtpengine_udp, secure, req_tx); - while server.recv().await.is_some() {} - log::info!("[MediaServer] stop ng_controller task"); - }); - } - let node_id = node.node_id; let node_session = random(); diff --git a/bin/src/server/standalone.rs b/bin/src/server/standalone.rs index 0d04bf26..81c66b83 100644 --- a/bin/src/server/standalone.rs +++ b/bin/src/server/standalone.rs @@ -226,7 +226,6 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) { enable_token_api: false, ice_lite: false, webrtc_port_seed: 0, - rtpengine_cmd_addr: None, rtpengine_listen_ip, ccu_per_core: 200, record_cache, diff --git a/packages/rtpengine_ngcontrol/Cargo.toml b/packages/rtpengine_ngcontrol/Cargo.toml deleted file mode 100644 index 826b3277..00000000 --- a/packages/rtpengine_ngcontrol/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "rtpengine-ngcontrol" -version = "0.1.0" -edition = "2021" - -[dependencies] -tokio = { version = "1.37", features = ["sync", "net"] } -serde = { version = "1.0", features = ["derive"] } -serde_bencode = { version = "0.2.4" } -log = { workspace = true } diff --git a/packages/rtpengine_ngcontrol/src/commands.rs b/packages/rtpengine_ngcontrol/src/commands.rs deleted file mode 100644 index 293818c6..00000000 --- a/packages/rtpengine_ngcontrol/src/commands.rs +++ /dev/null @@ -1,183 +0,0 @@ -use std::str::FromStr; - -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(tag = "command")] -pub enum NgCommand { - #[serde(rename = "ping")] - Ping, - - #[serde(rename = "offer")] - Offer { - sdp: String, - #[serde(rename = "call-id")] - call_id: String, - #[serde(rename = "from-tag")] - from_tag: String, - #[serde(rename = "ICE")] - ice: Option, - #[serde(rename = "atm0s-token")] - atm0s_token: String, - }, - - #[serde(rename = "answer")] - Answer { - sdp: String, - #[serde(rename = "call-id")] - call_id: String, - #[serde(rename = "from-tag")] - from_tag: String, - #[serde(rename = "to-tag")] - to_tag: String, - #[serde(rename = "ICE")] - ice: Option, - #[serde(rename = "atm0s-token")] - atm0s_token: String, - }, - - #[serde(rename = "delete")] - Delete { - #[serde(rename = "call-id")] - call_id: String, - #[serde(rename = "from-tag")] - from_tag: String, - #[serde(rename = "to-tag")] - to_tag: Option, - #[serde(rename = "conn-id")] - conn_id: String, - }, -} - -impl FromStr for NgCommand { - type Err = serde_bencode::Error; - fn from_str(msg: &str) -> Result { - serde_bencode::de::from_str(msg) - } -} - -#[allow(clippy::to_string_trait_impl)] -impl ToString for NgCommand { - fn to_string(&self) -> String { - serde_bencode::ser::to_string(self).expect("Should convert NgCommand to string") - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(untagged)] -pub enum NgCmdResult { - Pong { - result: String, - }, - Answer { - result: String, - conn: Option, - sdp: Option, - }, - Delete { - result: String, - }, - Error { - result: String, - #[serde(rename = "error-reason")] - error_reason: String, - }, -} - -impl FromStr for NgCmdResult { - type Err = serde_bencode::Error; - fn from_str(msg: &str) -> Result { - serde_bencode::de::from_str(msg) - } -} - -#[allow(clippy::to_string_trait_impl)] -impl ToString for NgCmdResult { - fn to_string(&self) -> String { - serde_bencode::ser::to_string(self).expect("Should convert NgCmdResult to string") - } -} - -#[derive(Debug, Clone)] -pub struct NgRequest { - pub id: String, - pub command: NgCommand, -} - -impl NgRequest { - pub fn answer(&self, result: NgCmdResult) -> NgResponse { - NgResponse { id: self.id.clone(), result } - } -} - -impl FromStr for NgRequest { - type Err = serde_bencode::Error; - fn from_str(packet: &str) -> Result { - let idx = packet.find(' '); - match idx { - Some(idx) => { - let id = packet[..idx].to_string(); - let body = &packet[idx + 1..]; - Ok(NgRequest { - id, - command: NgCommand::from_str(body)?, - }) - } - None => Err(serde_bencode::Error::MissingField("idx".to_string())), - } - } -} - -#[derive(Debug, Clone)] -pub struct NgResponse { - pub id: String, - pub result: NgCmdResult, -} - -impl NgResponse { - pub fn new(id: String, result: NgCmdResult) -> Self { - Self { id, result } - } - - pub fn to_str(&self) -> String { - let body = serde_bencode::to_string(&self.result).unwrap(); - format!("{} {}", self.id, body) - } -} - -#[cfg(test)] -mod test { - - use std::str::FromStr; - - use super::{NgCmdResult, NgCommand}; - - #[test] - fn ping_command() { - let actual = NgCommand::Ping {}; - let expect: NgCommand = NgCommand::from_str("d7:command4:pinge").unwrap(); - - assert_eq!(expect, actual); - } - - #[test] - fn pong_result() { - assert_eq!(NgCmdResult::Pong { result: "pong".to_string() }, NgCmdResult::from_str("d6:result4:ponge").unwrap()); - - assert_eq!(NgCmdResult::Pong { result: "pong".to_string() }.to_string(), "d6:result4:ponge".to_string()); - } - - #[test] - fn offer_command() { - let input = "d7:call-id24:bvmWdxbe4hkHHHvCl_d-nQ..7:command5:offer8:from-tag8:460d801e3:sdp3:v=011:atm0s-token5:TOKENe"; - let actual = NgCommand::Offer { - sdp: "v=0".to_string(), - call_id: "bvmWdxbe4hkHHHvCl_d-nQ..".to_string(), - from_tag: "460d801e".to_string(), - ice: None, - atm0s_token: "TOKEN".to_string(), - }; - let expect: NgCommand = NgCommand::from_str(input).unwrap(); - assert_eq!(expect, actual); - } -} diff --git a/packages/rtpengine_ngcontrol/src/lib.rs b/packages/rtpengine_ngcontrol/src/lib.rs deleted file mode 100644 index 8ea0e409..00000000 --- a/packages/rtpengine_ngcontrol/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod commands; -mod transport; - -pub use commands::{NgCmdResult, NgCommand, NgRequest, NgResponse}; -pub use transport::{NgTransport, NgUdpTransport}; diff --git a/packages/rtpengine_ngcontrol/src/transport.rs b/packages/rtpengine_ngcontrol/src/transport.rs deleted file mode 100644 index ca45e259..00000000 --- a/packages/rtpengine_ngcontrol/src/transport.rs +++ /dev/null @@ -1,12 +0,0 @@ -mod udp; - -use std::net::SocketAddr; - -use super::commands::{NgRequest, NgResponse}; -pub use udp::NgUdpTransport; - -#[allow(async_fn_in_trait)] -pub trait NgTransport { - async fn send(&self, res: NgResponse, addr: SocketAddr); - async fn recv(&self) -> Option<(NgRequest, SocketAddr)>; -} diff --git a/packages/rtpengine_ngcontrol/src/transport/udp.rs b/packages/rtpengine_ngcontrol/src/transport/udp.rs deleted file mode 100644 index e398b0d8..00000000 --- a/packages/rtpengine_ngcontrol/src/transport/udp.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::{net::SocketAddr, str::FromStr}; -use tokio::net::UdpSocket; - -use crate::commands::{NgRequest, NgResponse}; - -use super::NgTransport; - -pub struct NgUdpTransport { - socket: UdpSocket, -} - -impl NgUdpTransport { - pub async fn new(addr: SocketAddr) -> Self { - let socket = UdpSocket::bind(addr).await.expect("Should listen on {port}"); - log::info!("[NgUdpTransport] listen on addr {addr}"); - Self { socket } - } -} - -impl NgTransport for NgUdpTransport { - async fn send(&self, res: NgResponse, addr: SocketAddr) { - let data = res.to_str(); - log::info!("[NgUdpTransport] send\n========\n{data}\n=========="); - if let Err(e) = self.socket.send_to(data.as_bytes(), addr).await { - log::error!("[NgUdpTransport] send response to {addr} error {e:?}"); - } - } - - async fn recv(&self) -> Option<(NgRequest, SocketAddr)> { - loop { - let mut buf = vec![0; 1024]; - match self.socket.recv_from(&mut buf).await { - Ok((size, addr)) => { - log::info!("[NgUdpTransport] recv {size} from {addr}"); - match std::str::from_utf8(&buf[..size]) { - Ok(str) => { - log::info!("[NgUdpTransport] recv\n========\n{str}\n=========="); - if let Ok(req) = NgRequest::from_str(str) { - log::info!("[NgUdpTransport] recv req: {req:?}"); - break Some((req, addr)); - } - } - Err(err) => { - log::error!("[NgUdpTransport] received invalid utf8 message from {addr}, err {err}"); - } - } - } - Err(err) => { - log::error!("[NgUdpTransport] udp port error {err}"); - } - } - } - } -}