Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
adding async websocket (#16)
Browse files Browse the repository at this point in the history
- using tokio-tungstenite to achieve the async WebSocket goal
- refactoring blocking and async WebSocket worker interface
- adding a new feature **async-websocket**
  • Loading branch information
tommady authored May 12, 2022
1 parent 4a3b9e7 commit 83e2cd4
Show file tree
Hide file tree
Showing 14 changed files with 678 additions and 261 deletions.
14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,31 @@ edition = "2021"
[features]
default = []
websocket = ["tungstenite", "log"]
async-websocket = ["tokio-tungstenite", "log", "tokio", "futures-util"]

[package.metadata.docs.rs]
features = ["websocket"]
features = ["websocket", "async-websocket"]

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
ureq = { version = "2.4", features = ["json"] }
time = { version = "0.3", features = ["serde", "parsing"] }
log = { version = "0.4", optional = true }
futures-util = { version = "0.3", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"], optional = true }

[dependencies.tungstenite]
version = "0.17"
default-features = false
features = ["rustls-tls-webpki-roots"]
optional = true

[dependencies.tokio-tungstenite]
version = "0.17"
features = ["rustls-tls-webpki-roots"]
optional = true

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] }
serial_test = "*"
10 changes: 10 additions & 0 deletions examples/async_websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use fugle::websocket::IntradayBuilder;

#[tokio::main]
async fn main() {
let mut ws = IntradayBuilder::new().symbol_id("2884").odd_lot().build();

println!("{:?}", ws.async_chart().await.unwrap().recv().await);
println!("{:?}", ws.async_meta().await.unwrap().recv().await);
println!("{:?}", ws.async_quote().await.unwrap().recv().await);
}
9 changes: 9 additions & 0 deletions examples/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use fugle::websocket::IntradayBuilder;

fn main() {
let mut ws = IntradayBuilder::new().symbol_id("2884").odd_lot().build();

println!("{:?}", ws.chart().unwrap().recv().unwrap());
println!("{:?}", ws.meta().unwrap().recv().unwrap());
println!("{:?}", ws.quote().unwrap().recv().unwrap());
}
9 changes: 0 additions & 9 deletions examples/ws.rs

This file was deleted.

8 changes: 4 additions & 4 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub enum FugleError {
// error from serde_json lib
SerdeJson(serde_json::Error),
// error from tungstenite lib
#[cfg(feature = "websocket")]
#[cfg(any(feature = "websocket", feature = "async-websocket"))]
Tungstenite(tungstenite::Error),
// error from ureq lib
Ureq(Box<ureq::Error>),
Expand All @@ -83,7 +83,7 @@ impl std::fmt::Display for FugleError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
FugleError::SerdeJson(ref e) => write!(f, "Serde_json Lib error: {}", e),
#[cfg(feature = "websocket")]
#[cfg(any(feature = "websocket", feature = "async-websocket"))]
FugleError::Tungstenite(ref e) => write!(f, "Tungstenite Lib error: {}", e),
FugleError::Ureq(ref e) => write!(f, "Ureq Lib error: {}", e),
FugleError::StdIO(ref e) => write!(f, "std io json Deserialize error: {}", e),
Expand All @@ -103,7 +103,7 @@ impl std::error::Error for FugleError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
FugleError::SerdeJson(ref e) => Some(e),
#[cfg(feature = "websocket")]
#[cfg(any(feature = "websocket", feature = "async-websocket"))]
FugleError::Tungstenite(ref e) => Some(e),
FugleError::Ureq(ref e) => Some(e),
FugleError::StdIO(ref e) => Some(e),
Expand Down Expand Up @@ -139,7 +139,7 @@ impl From<ureq::Error> for FugleError {
}
}

#[cfg(feature = "websocket")]
#[cfg(any(feature = "websocket", feature = "async-websocket"))]
impl From<tungstenite::Error> for FugleError {
#[cfg_attr(coverage, no_coverage)]
fn from(err: tungstenite::Error) -> FugleError {
Expand Down
2 changes: 1 addition & 1 deletion src/intraday/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct IntradayBuilder<'a> {
}

impl<'a> Default for IntradayBuilder<'a> {
fn default() -> IntradayBuilder<'static> {
fn default() -> IntradayBuilder<'a> {
IntradayBuilder::new()
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
//! Websocket
//! ```rust no_run
//! # fn main() -> fugle::schema::Result<()> {
//! # use fugle::ws::Intraday;
//! # use fugle::websocket::IntradayBuilder;
//!
//! let mut lis = Intraday::new("demo");
//! let mut ws = IntradayBuilder::new().symbol_id("2884").odd_lot().build();
//!
//! let rx = lis.chart("2884", true)?;
//! let rx = ws.chart()?;
//! let response = rx.recv()?;
//!
//! # Ok(())
Expand All @@ -63,11 +63,11 @@
//! Websocket
//! ```rust no_run
//! # fn main() -> fugle::schema::Result<()> {
//! # use fugle::ws::Intraday;;
//! # use fugle::websocket::IntradayBuilder;;
//!
//! let mut lis = Intraday::new("demo");
//! let mut ws = IntradayBuilder::new().symbol_id("2884").odd_lot().build();
//!
//! let rx = lis.quote("2884", true)?;
//! let rx = ws.quote()?;
//! let response = rx.recv()?;
//!
//! # Ok(())
Expand All @@ -91,11 +91,11 @@
//! Websocket
//! ```rust no_run
//! # fn main() -> fugle::schema::Result<()> {
//! # use fugle::ws::Intraday;;
//! # use fugle::websocket::IntradayBuilder;;
//!
//! let mut lis = Intraday::new("demo");
//! let mut lis = IntradayBuilder::new().symbol_id("2884").odd_lot().build();
//!
//! let rx = lis.meta("2884", true)?;
//! let rx = lis.meta()?;
//! let response = rx.recv()?;
//!
//! # Ok(())
Expand Down Expand Up @@ -156,5 +156,5 @@ pub mod errors;
pub mod intraday;
pub mod marketdata;
pub mod schema;
#[cfg(feature = "websocket")]
pub mod ws;
#[cfg(any(feature = "websocket", feature = "async-websocket"))]
pub mod websocket;
86 changes: 86 additions & 0 deletions src/websocket/intraday/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

use futures_util::StreamExt;
use log::error;
use tokio::sync::mpsc::UnboundedSender;
use tokio_tungstenite::connect_async;

use crate::schema::Result;

pub(crate) struct Async {
pub(crate) routine: Option<tokio::task::JoinHandle<()>>,
}

impl Async {
pub(crate) async fn new<T>(
uri: &str,
sender: UnboundedSender<T>,
done: Arc<AtomicBool>,
) -> Result<Async>
where
T: for<'de> serde::Deserialize<'de> + Send + 'static,
{
let (mut socket, _) = connect_async(uri).await?;

let routine = tokio::spawn(async move {
while !done.load(Ordering::SeqCst) {
if let Some(Ok(msg)) = socket.next().await {
if let Ok(m) = msg.to_text() {
if let Ok(m) = serde_json::from_str(m) {
if let Err(e) = sender.send(m) {
error!("{}", e);
}
}
}
}
}
let _ = socket.close(None).await;
});

Ok(Async {
routine: Some(routine),
})
}
}

impl super::Worker for Async {
fn stop(&mut self) {
if let Some(routine) = self.routine.take() {
drop(routine)
}
}
}

#[cfg(test)]
mod test {
use tokio::{
sync::mpsc::unbounded_channel,
time::{sleep, Duration},
};

use super::{
super::{QuoteResponse, Worker, INTRADAY_QUOTE},
*,
};

#[tokio::test]
async fn test_async_worker_stop() {
let (tx, _) = unbounded_channel::<QuoteResponse>();
let done = Arc::new(AtomicBool::new(false));
let mut worker = Async::new(
&format!("{}?symbolId=2884&apiToken=demo", INTRADAY_QUOTE),
tx,
done.clone(),
)
.await
.unwrap();

done.store(true, Ordering::SeqCst);
sleep(Duration::from_millis(3)).await;

worker.stop();
}
}
80 changes: 80 additions & 0 deletions src/websocket/intraday/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Sender,
Arc,
},
thread,
};

use log::error;
use tungstenite::connect;

use crate::schema::Result;

pub(crate) struct Block {
pub(crate) thread: Option<thread::JoinHandle<()>>,
}

impl Block {
pub(crate) fn new<T>(uri: &str, sender: Sender<T>, done: Arc<AtomicBool>) -> Result<Block>
where
T: for<'de> serde::Deserialize<'de> + Send + 'static,
{
let (mut socket, _) = connect(uri)?;

let thread = thread::spawn(move || {
while !done.load(Ordering::SeqCst) {
if let Ok(msg) = socket.read_message() {
if let Ok(m) = msg.to_text() {
if let Ok(m) = serde_json::from_str(m) {
if let Err(e) = sender.send(m) {
error!("{}", e);
}
}
}
}
}
let _ = socket.close(None);
});

Ok(Block {
thread: Some(thread),
})
}
}

impl super::Worker for Block {
fn stop(&mut self) {
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}

#[cfg(test)]
mod test {
use std::{sync::mpsc::channel, thread::sleep, time::Duration};

use super::{
super::{QuoteResponse, Worker, INTRADAY_QUOTE},
*,
};

#[test]
fn test_block_worker_stop() {
let (tx, _) = channel::<QuoteResponse>();
let done = Arc::new(AtomicBool::new(false));
let mut worker = Block::new(
&format!("{}?symbolId=2884&apiToken=demo", INTRADAY_QUOTE),
tx,
done.clone(),
)
.unwrap();

done.store(true, Ordering::SeqCst);
sleep(Duration::from_millis(3));

worker.stop();
}
}
Loading

0 comments on commit 83e2cd4

Please sign in to comment.