Skip to content

Commit

Permalink
Split client to async and blocking
Browse files Browse the repository at this point in the history
Reqwest uses tokio blocking internally as well.
  • Loading branch information
2e3s authored and ErikBjare committed Sep 12, 2023
1 parent 2dccbf8 commit 36c86ab
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aw-client-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ serde = "1.0"
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
aw-models = { path = "../aw-models" }
tokio = { version = "1.28.2", features = ["rt"] }

[dev-dependencies]
aw-datastore = { path = "../aw-datastore" }
Expand Down
77 changes: 77 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::collections::HashMap;
use std::future::Future;
use std::vec::Vec;

use chrono::{DateTime, Utc};

use aw_models::{Bucket, Event};

use super::AwClient as AsyncAwClient;

pub struct AwClient {
client: AsyncAwClient,
pub baseurl: String,
pub name: String,
pub hostname: String,
}

impl std::fmt::Debug for AwClient {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "AwClient(baseurl={:?})", self.client.baseurl)
}
}

fn block_on<F: Future>(f: F) -> F::Output {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build shell runtime")
.block_on(f)
}

macro_rules! proxy_method
{
($name:tt, $ret:ty, $($v:ident: $t:ty),*) => {
pub fn $name(&self, $($v: $t),*) -> Result<$ret, reqwest::Error>
{ block_on(self.client.$name($($v),*)) }
};
}

impl AwClient {
pub fn new(ip: &str, port: &str, name: &str) -> AwClient {
let async_client = AsyncAwClient::new(ip, port, name);

AwClient {
baseurl: async_client.baseurl.clone(),
name: async_client.name.clone(),
hostname: async_client.hostname.clone(),
client: async_client,
}
}

proxy_method!(get_bucket, Bucket, bucketname: &str);
proxy_method!(get_buckets, HashMap<String, Bucket>,);
proxy_method!(create_bucket, (), bucket: &Bucket);
proxy_method!(create_bucket_simple, (), bucketname: &str, buckettype: &str);
proxy_method!(delete_bucket, (), bucketname: &str);
proxy_method!(
get_events,
Vec<Event>,
bucketname: &str,
start: Option<DateTime<Utc>>,
stop: Option<DateTime<Utc>>,
limit: Option<u64>
);
proxy_method!(insert_event, (), bucketname: &str, event: &Event);
proxy_method!(insert_events, (), bucketname: &str, events: Vec<Event>);
proxy_method!(
heartbeat,
(),
bucketname: &str,
event: &Event,
pulsetime: f64
);
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
proxy_method!(get_event_count, i64, bucketname: &str);
proxy_method!(get_info, aw_models::Info,);
}
77 changes: 51 additions & 26 deletions aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ extern crate chrono;
extern crate gethostname;
extern crate reqwest;
extern crate serde_json;
extern crate tokio;

pub mod blocking;

use std::collections::HashMap;
use std::vec::Vec;
Expand All @@ -13,7 +16,7 @@ use serde_json::Map;
pub use aw_models::{Bucket, BucketMetadata, Event};

pub struct AwClient {
client: reqwest::blocking::Client,
client: reqwest::Client,
pub baseurl: String,
pub name: String,
pub hostname: String,
Expand All @@ -28,7 +31,7 @@ impl std::fmt::Debug for AwClient {
impl AwClient {
pub fn new(ip: &str, port: &str, name: &str) -> AwClient {
let baseurl = format!("http://{ip}:{port}");
let client = reqwest::blocking::Client::builder()
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap();
Expand All @@ -41,24 +44,31 @@ impl AwClient {
}
}

pub fn get_bucket(&self, bucketname: &str) -> Result<Bucket, reqwest::Error> {
pub async fn get_bucket(&self, bucketname: &str) -> Result<Bucket, reqwest::Error> {
let url = format!("{}/api/0/buckets/{}", self.baseurl, bucketname);
let bucket = self.client.get(url).send()?.error_for_status()?.json()?;
let bucket = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(bucket)
}

pub fn get_buckets(&self) -> Result<HashMap<String, Bucket>, reqwest::Error> {
pub async fn get_buckets(&self) -> Result<HashMap<String, Bucket>, reqwest::Error> {
let url = format!("{}/api/0/buckets/", self.baseurl);
self.client.get(url).send()?.json()
self.client.get(url).send().await?.json().await
}

pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), reqwest::Error> {
pub async fn create_bucket(&self, bucket: &Bucket) -> Result<(), reqwest::Error> {
let url = format!("{}/api/0/buckets/{}", self.baseurl, bucket.id);
self.client.post(url).json(bucket).send()?;
self.client.post(url).json(bucket).send().await?;
Ok(())
}

pub fn create_bucket_simple(
pub async fn create_bucket_simple(
&self,
bucketname: &str,
buckettype: &str,
Expand All @@ -75,16 +85,16 @@ impl AwClient {
created: None,
last_updated: None,
};
self.create_bucket(&bucket)
self.create_bucket(&bucket).await
}

pub fn delete_bucket(&self, bucketname: &str) -> Result<(), reqwest::Error> {
pub async fn delete_bucket(&self, bucketname: &str) -> Result<(), reqwest::Error> {
let url = format!("{}/api/0/buckets/{}", self.baseurl, bucketname);
self.client.delete(url).send()?;
self.client.delete(url).send().await?;
Ok(())
}

pub fn get_events(
pub async fn get_events(
&self,
bucketname: &str,
start: Option<DateTime<Utc>>,
Expand All @@ -109,27 +119,31 @@ impl AwClient {
url.query_pairs_mut()
.append_pair("limit", s.to_string().as_str());
};
self.client.get(url).send()?.json()
self.client.get(url).send().await?.json().await
}

pub fn insert_event(&self, bucketname: &str, event: &Event) -> Result<(), reqwest::Error> {
pub async fn insert_event(
&self,
bucketname: &str,
event: &Event,
) -> Result<(), reqwest::Error> {
let url = format!("{}/api/0/buckets/{}/events", self.baseurl, bucketname);
let eventlist = vec![event.clone()];
self.client.post(url).json(&eventlist).send()?;
self.client.post(url).json(&eventlist).send().await?;
Ok(())
}

pub fn insert_events(
pub async fn insert_events(
&self,
bucketname: &str,
events: Vec<Event>,
) -> Result<(), reqwest::Error> {
let url = format!("{}/api/0/buckets/{}/events", self.baseurl, bucketname);
self.client.post(url).json(&events).send()?;
self.client.post(url).json(&events).send().await?;
Ok(())
}

pub fn heartbeat(
pub async fn heartbeat(
&self,
bucketname: &str,
event: &Event,
Expand All @@ -139,31 +153,42 @@ impl AwClient {
"{}/api/0/buckets/{}/heartbeat?pulsetime={}",
self.baseurl, bucketname, pulsetime
);
self.client.post(url).json(&event).send()?;
self.client.post(url).json(&event).send().await?;
Ok(())
}

pub fn delete_event(&self, bucketname: &str, event_id: i64) -> Result<(), reqwest::Error> {
pub async fn delete_event(
&self,
bucketname: &str,
event_id: i64,
) -> Result<(), reqwest::Error> {
let url = format!(
"{}/api/0/buckets/{}/events/{}",
self.baseurl, bucketname, event_id
);
self.client.delete(url).send()?;
self.client.delete(url).send().await?;
Ok(())
}

pub fn get_event_count(&self, bucketname: &str) -> Result<i64, reqwest::Error> {
pub async fn get_event_count(&self, bucketname: &str) -> Result<i64, reqwest::Error> {
let url = format!("{}/api/0/buckets/{}/events/count", self.baseurl, bucketname);
let res = self.client.get(url).send()?.error_for_status()?.text()?;
let res = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.text()
.await?;
let count: i64 = match res.trim().parse() {
Ok(count) => count,
Err(err) => panic!("could not parse get_event_count response: {err:?}"),
};
Ok(count)
}

pub fn get_info(&self) -> Result<aw_models::Info, reqwest::Error> {
pub async fn get_info(&self) -> Result<aw_models::Info, reqwest::Error> {
let url = format!("{}/api/0/info", self.baseurl);
self.client.get(url).send()?.json()
self.client.get(url).send().await?.json().await
}
}
4 changes: 2 additions & 2 deletions aw-client-rust/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern crate tokio_test;

#[cfg(test)]
mod test {
use aw_client_rust::AwClient;
use aw_client_rust::blocking::AwClient;
use aw_client_rust::Event;
use chrono::{DateTime, Duration, Utc};
use serde_json::Map;
Expand Down Expand Up @@ -51,7 +51,7 @@ mod test {
let shutdown_handler = server.shutdown();

thread::spawn(move || {
let launch = block_on(server.launch()).unwrap();
let _ = block_on(server.launch()).unwrap();
});

shutdown_handler
Expand Down
2 changes: 1 addition & 1 deletion aw-sync/src/accessmethod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use aw_client_rust::AwClient;
use aw_client_rust::blocking::AwClient;
use chrono::{DateTime, Utc};
use reqwest::StatusCode;

Expand Down
2 changes: 1 addition & 1 deletion aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::path::PathBuf;
use chrono::{DateTime, Datelike, TimeZone, Utc};
use clap::{Parser, Subcommand};

use aw_client_rust::AwClient;
use aw_client_rust::blocking::AwClient;

mod accessmethod;
mod sync;
Expand Down
2 changes: 1 addition & 1 deletion aw-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};

use aw_client_rust::AwClient;
use aw_client_rust::blocking::AwClient;
use chrono::{DateTime, Utc};

use aw_datastore::{Datastore, DatastoreError};
Expand Down

0 comments on commit 36c86ab

Please sign in to comment.