From a0eb4ebbe3857e79f2a8af7cd152c5b02259a240 Mon Sep 17 00:00:00 2001 From: Duy Do Date: Thu, 30 Nov 2023 22:17:59 +0700 Subject: [PATCH] Fix slow performace --- Cargo.lock | 58 ++++++++++++++-- crates/cmds-std/Cargo.toml | 8 ++- crates/cmds-std/benches/postgrest.rs | 69 +++++++++++++++++++ crates/cmds-std/src/postgrest/builder_eq.rs | 10 +-- .../cmds-std/src/postgrest/builder_insert.rs | 10 +-- crates/cmds-std/src/postgrest/builder_is.rs | 10 +-- .../cmds-std/src/postgrest/builder_order.rs | 10 +-- .../cmds-std/src/postgrest/builder_select.rs | 18 ++--- .../cmds-std/src/postgrest/builder_update.rs | 10 +-- .../cmds-std/src/postgrest/execute_query.rs | 4 +- crates/cmds-std/src/postgrest/new_query.rs | 4 +- crates/cmds-std/src/postgrest/new_rpc.rs | 6 +- crates/flow-server/Cargo.toml | 2 +- lib/flow-lib/src/context.rs | 10 +++ lib/flow-lib/src/utils/tower_client.rs | 5 +- 15 files changed, 190 insertions(+), 44 deletions(-) create mode 100644 crates/cmds-std/benches/postgrest.rs diff --git a/Cargo.lock b/Cargo.lock index cfa203ab..5acce77c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -707,6 +707,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstyle" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" + [[package]] name = "anyhow" version = "1.0.75" @@ -1562,7 +1568,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", "bitflags 1.3.2", - "clap_lex", + "clap_lex 0.2.4", "indexmap 1.9.3", "once_cell", "strsim 0.10.0", @@ -1570,6 +1576,25 @@ dependencies = [ "textwrap 0.16.0", ] +[[package]] +name = "clap" +version = "4.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" +dependencies = [ + "anstyle", + "clap_lex 0.6.0", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -1579,6 +1604,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clap_lex" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" + [[package]] name = "cmds-pdg" version = "0.0.0" @@ -1662,7 +1693,9 @@ dependencies = [ "anyhow", "bs58 0.4.0", "bytes", + "criterion", "flow-lib", + "futures-executor", "futures-util", "mime_guess", "postgrest", @@ -1960,19 +1993,19 @@ dependencies = [ [[package]] name = "criterion" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" dependencies = [ "anes", - "atty", "cast", "ciborium", - "clap 3.2.25", + "clap 4.4.10", "criterion-plot", + "is-terminal", "itertools", - "lazy_static", "num-traits", + "once_cell", "oorandom", "plotters", "rayon", @@ -3395,6 +3428,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" +dependencies = [ + "hermit-abi 0.3.3", + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -4515,7 +4559,7 @@ dependencies = [ [[package]] name = "postgrest" version = "1.6.0" -source = "git+https://github.com/space-operator/postgrest-rs?rev=5f8e53bde0fd30f91f84242d51dbb75f0e7f9fa9#5f8e53bde0fd30f91f84242d51dbb75f0e7f9fa9" +source = "git+https://github.com/space-operator/postgrest-rs?rev=8e371bb3d42710353d9fc0b5573a9f4cb7d88255#8e371bb3d42710353d9fc0b5573a9f4cb7d88255" dependencies = [ "bytes", "reqwest", diff --git a/crates/cmds-std/Cargo.toml b/crates/cmds-std/Cargo.toml index 095eeae1..201c044b 100644 --- a/crates/cmds-std/Cargo.toml +++ b/crates/cmds-std/Cargo.toml @@ -3,6 +3,10 @@ name = "cmds-std" version = "0.0.0" edition = "2021" +[[bench]] +name = "postgrest" +harness = false + [dependencies] flow-lib = { workspace = true } @@ -17,7 +21,9 @@ rust_decimal = { version = "1.32.0", features = ["serde-with-float"] } tracing = "0.1.40" bytes = "1.5.0" mime_guess = "2.0.4" -postgrest = { git = "https://github.com/space-operator/postgrest-rs", rev = "5f8e53bde0fd30f91f84242d51dbb75f0e7f9fa9" } +postgrest = { git = "https://github.com/space-operator/postgrest-rs", rev = "8e371bb3d42710353d9fc0b5573a9f4cb7d88255" } [dev-dependencies] tokio = { version = "1", features = ["rt", "macros"] } +criterion = "0.5" +futures-executor = "0.3" diff --git a/crates/cmds-std/benches/postgrest.rs b/crates/cmds-std/benches/postgrest.rs new file mode 100644 index 00000000..83aee033 --- /dev/null +++ b/crates/cmds-std/benches/postgrest.rs @@ -0,0 +1,69 @@ +use cmds_std::postgrest::builder_select; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use flow_lib::{ + value::{self, array, map}, + Context, Value, +}; +use reqwest::header::{HeaderMap, HeaderValue}; + +fn build_header() -> HeaderMap { + let mut map = HeaderMap::new(); + map.insert("accept", HeaderValue::from_str("application/json").unwrap()); + map +} + +pub fn criterion_benchmark(c: &mut Criterion) { + let json = serde_json::from_str::( + r#" +{ + "url": "https://base.spaceoperator.com/rest/v1/table", + "body": null, + "is_rpc": false, + "method": "GET", + "schema": null, + "headers": [ + [ + "accept", + "application/json" + ] + ], + "queries": [] +}"#, + ) + .unwrap(); + let params = map! { + "query" => json, + "columns" => "*", + }; + let cmd = builder_select::build().unwrap(); + let ctx = Context::default(); + c.bench_function("run_command", |b| { + b.iter(|| { + let fut = cmd.run(black_box(ctx.clone()), black_box(params.clone())); + futures_executor::block_on(fut) + }) + }); + c.bench_function("deserialize", |b| { + b.iter(|| value::from_map::(black_box(params.clone())).unwrap()) + }); + let query = value::from_map::(params) + .unwrap() + .query; + c.bench_function("serialize", |b| { + b.iter(|| { + value::to_map(&black_box(builder_select::Output { + query: query.clone(), + })) + .unwrap() + }) + }); + c.bench_function("build_header", |b| b.iter(|| build_header())); + let value = Value::Array(array![array!["accept", "application/json"]]); + c.bench_function("deser_vec_tuple", |b| { + b.iter(|| value::from_value::>(black_box(value.clone())).unwrap()) + }); + c.bench_function("new_reqwest_client", |b| b.iter(|| reqwest::Client::new())); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/crates/cmds-std/src/postgrest/builder_eq.rs b/crates/cmds-std/src/postgrest/builder_eq.rs index 68f730a5..a1f4afbc 100644 --- a/crates/cmds-std/src/postgrest/builder_eq.rs +++ b/crates/cmds-std/src/postgrest/builder_eq.rs @@ -4,19 +4,21 @@ const NAME: &str = "postgrest_builder_eq"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, column: String, filter: String, } #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.eq(input.column, input.filter), + query: postgrest::Builder::from_query(input.query, ctx.http) + .eq(input.column, input.filter) + .into(), }) } diff --git a/crates/cmds-std/src/postgrest/builder_insert.rs b/crates/cmds-std/src/postgrest/builder_insert.rs index 41ace0d7..92193eb5 100644 --- a/crates/cmds-std/src/postgrest/builder_insert.rs +++ b/crates/cmds-std/src/postgrest/builder_insert.rs @@ -4,18 +4,20 @@ const NAME: &str = "postgrest_builder_insert"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, body: JsonValue, } #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.insert(serde_json::to_string(&input.body)?), + query: postgrest::Builder::from_query(input.query, ctx.http) + .insert(serde_json::to_string(&input.body)?) + .into(), }) } diff --git a/crates/cmds-std/src/postgrest/builder_is.rs b/crates/cmds-std/src/postgrest/builder_is.rs index 98fc1260..f6d807dd 100644 --- a/crates/cmds-std/src/postgrest/builder_is.rs +++ b/crates/cmds-std/src/postgrest/builder_is.rs @@ -4,19 +4,21 @@ const NAME: &str = "postgrest_builder_is"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, column: String, filter: String, } #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.is(input.column, input.filter), + query: postgrest::Builder::from_query(input.query, ctx.http) + .is(input.column, input.filter) + .into(), }) } diff --git a/crates/cmds-std/src/postgrest/builder_order.rs b/crates/cmds-std/src/postgrest/builder_order.rs index 09013cb7..3d26d1db 100644 --- a/crates/cmds-std/src/postgrest/builder_order.rs +++ b/crates/cmds-std/src/postgrest/builder_order.rs @@ -4,18 +4,20 @@ const NAME: &str = "postgrest_builder_order"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, columns: String, } #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.order(input.columns), + query: postgrest::Builder::from_query(input.query, ctx.http) + .order(input.columns) + .into(), }) } diff --git a/crates/cmds-std/src/postgrest/builder_select.rs b/crates/cmds-std/src/postgrest/builder_select.rs index cb40ce3d..f7035da0 100644 --- a/crates/cmds-std/src/postgrest/builder_select.rs +++ b/crates/cmds-std/src/postgrest/builder_select.rs @@ -3,23 +3,25 @@ use flow_lib::command::prelude::*; const NAME: &str = "postgrest_builder_select"; #[derive(Deserialize, Debug)] -struct Input { - query: postgrest::Builder, - columns: String, +pub struct Input { + pub query: postgrest::Query, + pub columns: String, } #[derive(Serialize, Debug)] -struct Output { - query: postgrest::Builder, +pub struct Output { + pub query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.select(input.columns), + query: postgrest::Builder::from_query(input.query, ctx.http) + .select(input.columns) + .into(), }) } -fn build() -> BuildResult { +pub fn build() -> BuildResult { Ok( CmdBuilder::new(flow_lib::node_definition!("postgrest/builder_select.json"))? .check_name(NAME)? diff --git a/crates/cmds-std/src/postgrest/builder_update.rs b/crates/cmds-std/src/postgrest/builder_update.rs index f692c581..93b201ec 100644 --- a/crates/cmds-std/src/postgrest/builder_update.rs +++ b/crates/cmds-std/src/postgrest/builder_update.rs @@ -4,18 +4,20 @@ const NAME: &str = "postgrest_builder_update"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, body: serde_json::Map, } #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } -async fn run(_: Context, input: Input) -> Result { +async fn run(ctx: Context, input: Input) -> Result { Ok(Output { - query: input.query.update(serde_json::to_string(&input.body)?), + query: postgrest::Builder::from_query(input.query, ctx.http) + .update(serde_json::to_string(&input.body)?) + .into(), }) } diff --git a/crates/cmds-std/src/postgrest/execute_query.rs b/crates/cmds-std/src/postgrest/execute_query.rs index 237eaf75..e8c6a2ac 100644 --- a/crates/cmds-std/src/postgrest/execute_query.rs +++ b/crates/cmds-std/src/postgrest/execute_query.rs @@ -9,7 +9,7 @@ const NAME: &str = "postgrest_execute_query"; #[derive(Deserialize, Debug)] struct Input { - query: postgrest::Builder, + query: postgrest::Query, #[serde(default)] pub headers: Vec<(String, String)>, } @@ -35,7 +35,7 @@ async fn run(mut ctx: Context, input: Input) -> Result { .url .starts_with(&format!("{}/rest/v1", ctx.endpoints.supabase)); - let mut req = input.query.build(); + let mut req = postgrest::Builder::from_query(input.query, ctx.http.clone()).build(); for (k, v) in input.headers { req = req.header(k, v); } diff --git a/crates/cmds-std/src/postgrest/new_query.rs b/crates/cmds-std/src/postgrest/new_query.rs index 18d3f4f1..f7646dfb 100644 --- a/crates/cmds-std/src/postgrest/new_query.rs +++ b/crates/cmds-std/src/postgrest/new_query.rs @@ -11,14 +11,14 @@ struct Input { #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } async fn run(ctx: Context, input: Input) -> Result { let url = input .url .unwrap_or_else(|| format!("{}/rest/v1/{}", ctx.endpoints.supabase, input.table)); - let query = postgrest::Builder::new(url, input.schema, <_>::default(), ctx.http); + let query = postgrest::Builder::new(url, input.schema, <_>::default(), ctx.http).into(); Ok(Output { query }) } diff --git a/crates/cmds-std/src/postgrest/new_rpc.rs b/crates/cmds-std/src/postgrest/new_rpc.rs index 8f4479a2..8f38bed9 100644 --- a/crates/cmds-std/src/postgrest/new_rpc.rs +++ b/crates/cmds-std/src/postgrest/new_rpc.rs @@ -12,7 +12,7 @@ struct Input { #[derive(Serialize, Debug)] struct Output { - query: postgrest::Builder, + query: postgrest::Query, } async fn run(ctx: Context, input: Input) -> Result { @@ -23,7 +23,9 @@ async fn run(ctx: Context, input: Input) -> Result { if let Some(schema) = input.schema { pg = pg.schema(schema); } - let query = pg.rpc(input.function, serde_json::to_string(&input.params)?); + let query = pg + .rpc(input.function, serde_json::to_string(&input.params)?) + .into(); Ok(Output { query }) } diff --git a/crates/flow-server/Cargo.toml b/crates/flow-server/Cargo.toml index 273bc35b..a8837c7f 100644 --- a/crates/flow-server/Cargo.toml +++ b/crates/flow-server/Cargo.toml @@ -51,7 +51,7 @@ once_cell = "1.17.2" rhai-script.workspace = true [dev-dependencies] -criterion = "0.4" +criterion = "0.5" inventory = "0.3" [[bench]] diff --git a/lib/flow-lib/src/context.rs b/lib/flow-lib/src/context.rs index e085220a..36f3573f 100644 --- a/lib/flow-lib/src/context.rs +++ b/lib/flow-lib/src/context.rs @@ -433,3 +433,13 @@ impl Context { f::(); } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_no_tokio() { + Context::default(); + } +} diff --git a/lib/flow-lib/src/utils/tower_client.rs b/lib/flow-lib/src/utils/tower_client.rs index 07e16bce..61eb7f9b 100644 --- a/lib/flow-lib/src/utils/tower_client.rs +++ b/lib/flow-lib/src/utils/tower_client.rs @@ -38,7 +38,10 @@ where S: tower::Service + Send + 'static, S::Future: Send + 'static, { - let buffer = Buffer::new(BoxService::new(s), size); + let (buffer, worker) = Buffer::pair(BoxService::new(s), size); + if let Ok(rt) = tokio::runtime::Handle::try_current() { + rt.spawn(worker); + } Self::new(buffer, worker_error) }