diff --git a/src/lib.rs b/src/lib.rs index e9bfbfe..2a6ea18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ mod utils; mod version; -mod websocket; +pub mod websocket; #[cfg(feature = "enable-tokio-tungstenite")] pub mod tokio_tungstenite; diff --git a/src/tokio_tungstenite.rs b/src/tokio_tungstenite.rs index 40cc1f1..8bc563a 100644 --- a/src/tokio_tungstenite.rs +++ b/src/tokio_tungstenite.rs @@ -1,5 +1,6 @@ use crate::websocket::Stream; -use futures_util::SinkExt; +use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_tungstenite::{ @@ -15,7 +16,14 @@ pub struct BinanceWebSocketClient; impl BinanceWebSocketClient { pub async fn connect_async( url: &str, - ) -> Result<(WebSocketState>, Response), Error> { + ) -> Result< + ( + WebSocketState>, + WebSocketReader>, + Response, + ), + Error, + > { let (socket, response) = connect_async(Url::parse(&url).unwrap()).await?; log::info!("Connected to {}", url); @@ -25,22 +33,33 @@ impl BinanceWebSocketClient { log::debug!("* {}", header); } - Ok((WebSocketState::new(socket), response)) + let (sink, stream) = socket.split(); + + Ok((WebSocketState::new(sink), stream, response)) } - pub async fn connect_async_default( - ) -> Result<(WebSocketState>, Response), Error> { + pub async fn connect_async_default() -> Result< + ( + WebSocketState>, + WebSocketReader>, + Response, + ), + Error, + > { BinanceWebSocketClient::connect_async("wss://stream.binance.com:9443/stream").await } } +pub type WebSocketWriter = SplitSink, tokio_tungstenite::tungstenite::Message>; +pub type WebSocketReader = SplitStream>; + pub struct WebSocketState { - socket: WebSocketStream, + socket: WebSocketWriter, id: u64, } impl WebSocketState { - pub fn new(socket: WebSocketStream) -> Self { + pub fn new(socket: WebSocketWriter) -> Self { Self { socket, id: 0 } } @@ -128,18 +147,6 @@ impl WebSocketState { } pub async fn close(mut self) -> Result<(), Error> { - self.socket.close(None).await - } -} - -impl From> for WebSocketStream { - fn from(conn: WebSocketState) -> WebSocketStream { - conn.socket - } -} - -impl AsMut> for WebSocketState { - fn as_mut(&mut self) -> &mut WebSocketStream { - &mut self.socket + self.socket.close().await } }