diff --git a/protocol/rtmp/src/session/client_session.rs b/protocol/rtmp/src/session/client_session.rs index cf21258..74ec945 100644 --- a/protocol/rtmp/src/session/client_session.rs +++ b/protocol/rtmp/src/session/client_session.rs @@ -27,6 +27,7 @@ use { }, indexmap::IndexMap, std::sync::Arc, + std::time::Duration, //crate::utils::print::print, streamhub::define::StreamHubEventSender, tokio::{net::TcpStream, sync::Mutex}, @@ -67,6 +68,7 @@ pub enum ClientSessionType { } pub struct ClientSession { io: Arc>>, + timeout: Option, common: Common, handshaker: SimpleHandshakeClient, unpacketizer: ChunkUnpacketizer, @@ -115,6 +117,7 @@ impl ClientSession { Self { io: Arc::clone(&net_io), + timeout: None, common, handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)), unpacketizer: ChunkUnpacketizer::new(), @@ -129,6 +132,10 @@ impl ClientSession { gop_num, } } + + pub fn set_timeout(&mut self, timeout: Duration){ + self.timeout = Some(timeout) + } pub async fn run(&mut self) -> Result<(), SessionError> { loop { @@ -169,7 +176,10 @@ impl ClientSession { ClientSessionState::WaitStateChange => {} } - let data = self.io.lock().await.read().await?; + let data = match self.timeout { + None => self.io.lock().await.read().await?, + Some(t) => self.io.lock().await.read_timeout(t).await?, + }; self.unpacketizer.extend_data(&data[..]); loop {