diff --git a/Cargo.lock b/Cargo.lock index a901fad..e5bdd0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,17 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "ahash" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.11" @@ -439,6 +428,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" + [[package]] name = "crossbeam-channel" version = "0.5.11" @@ -525,7 +520,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.3", + "hashbrown", "lock_api", "once_cell", "parking_lot_core", @@ -836,23 +831,13 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash 0.7.8", - "serde", -] - [[package]] name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" dependencies = [ - "ahash 0.8.11", + "ahash", "allocator-api2", ] @@ -971,12 +956,6 @@ dependencies = [ "either", ] -[[package]] -name = "itoa" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" - [[package]] name = "itoa" version = "1.0.10" @@ -1020,7 +999,7 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.27.0" +version = "0.28.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -1107,12 +1086,6 @@ dependencies = [ "redox_syscall", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "lock_api" version = "0.4.11" @@ -1138,6 +1111,15 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -1187,6 +1169,45 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "musli" +version = "0.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c21124dd24833900879114414b877f2136f4b7b7a3b49756ecc5c36eca332bb" +dependencies = [ + "musli-macros", +] + +[[package]] +name = "musli-common" +version = "0.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "178446623aa62978aa0f894b2081bc11ea77c2119ccfe35be428ab9ddb495dfc" +dependencies = [ + "musli", +] + +[[package]] +name = "musli-macros" +version = "0.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1ab0e4ac2721bc4fa3528a6a2640c1c30c36c820f8c85159252fbf6c2fac24" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + +[[package]] +name = "musli-storage" +version = "0.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2fc1f80b166f611c462e1344220e9b3a9ad37c885e43039d5d2e6887445937c" +dependencies = [ + "musli", + "musli-common", +] + [[package]] name = "nalgebra" version = "0.32.6" @@ -1389,6 +1410,10 @@ name = "once_cell" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "openssl" @@ -1590,6 +1615,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "portable-atomic" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1747,8 +1778,17 @@ checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.5", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1759,9 +1799,15 @@ checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -1792,40 +1838,74 @@ dependencies = [ [[package]] name = "rune" -version = "0.12.4" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51a636a206fc1a58be83dc4da610c74ecdb1a1e9b13eeccf8da31d0f9cac3fb1" +checksum = "d21925ac4f8974395d0d9e43f96a34c778e71ed86fe96d0313b2211102537234" dependencies = [ "anyhow", - "byteorder", "codespan-reporting", "futures-core", "futures-util", - "hashbrown 0.11.2", - "itoa 0.4.8", - "linked-hash-map", + "itoa", + "musli", + "musli-storage", "num 0.4.1", - "num-bigint 0.4.4", + "once_cell", "pin-project", + "rune-alloc", + "rune-core", "rune-macros", "ryu", "serde", - "serde_bytes", - "smallvec", - "thiserror", "tracing", +] + +[[package]] +name = "rune-alloc" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e85c26e19f7efb91c6e19afc68b008f04685fdb2852e96ce8fbd3cf4a0b4e76c" +dependencies = [ + "ahash", + "pin-project", + "rune-alloc-macros", + "serde", +] + +[[package]] +name = "rune-alloc-macros" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "810588952a8710959d35ad17c933804d60f96c3792f216277cda68c1a9887120" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + +[[package]] +name = "rune-core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d30fa78b6cb15d1560bb4cb18f4b99a9097b08ade3a5fc23e5ae7311f97c537b" +dependencies = [ + "byteorder", + "musli", + "rune-alloc", + "serde", "twox-hash", ] [[package]] name = "rune-macros" -version = "0.12.4" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c66f17eaa2c8f110102f3d6bdd66951c1bab248b81991e6dbdcd9361e37768" +checksum = "c1b91e53bae3804e4d72e2b04fa5d5108bd93e880ca597c0cae0fb0a662fe198" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "rune-core", + "syn 2.0.50", ] [[package]] @@ -1926,7 +2006,7 @@ dependencies = [ "chrono", "dashmap", "futures", - "hashbrown 0.14.3", + "hashbrown", "histogram", "itertools 0.11.0", "lazy_static", @@ -1996,15 +2076,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_bytes" -version = "0.11.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" -dependencies = [ - "serde", -] - [[package]] name = "serde_derive" version = "1.0.204" @@ -2022,7 +2093,7 @@ version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ - "itoa 1.0.10", + "itoa", "ryu", "serde", ] @@ -2089,9 +2160,6 @@ name = "smallvec" version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" -dependencies = [ - "serde", -] [[package]] name = "snap" @@ -2251,7 +2319,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", - "itoa 1.0.10", + "itoa", "num-conv", "powerfmt", "serde", @@ -2389,10 +2457,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index f9cd5a5..a628e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "latte-cli" description = "A database benchmarking tool for Apache Cassandra" -version = "0.27.0" +version = "0.28.0" authors = ["Piotr Kołaczkowski "] edition = "2021" readme = "README.md" @@ -37,7 +37,7 @@ plotters = "0.3.4" plotters-svg = "0.3.3" rand = "0.8" regex = "1.5" -rune = "0.12" +rune = "0.13" rust-embed = "8" scylla = { version = "0.13", features = ["ssl"] } search_path = "0.1" @@ -53,7 +53,7 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_l tokio-stream = "0.1" tracing = "0.1" tracing-appender = "0.2" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } try-lock = "0.2.3" uuid = { version = "1.1", features = ["v4"] } walkdir = "2" diff --git a/src/context.rs b/src/context.rs index fdbc637..2e6b047 100644 --- a/src/context.rs +++ b/src/context.rs @@ -6,10 +6,10 @@ use std::hash::{Hash, Hasher}; use std::io; use std::io::{BufRead, BufReader, ErrorKind, Read}; use std::net::IpAddr; +use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; -use anyhow::anyhow; use chrono::Utc; use hdrhistogram::Histogram; use itertools::Itertools; @@ -19,11 +19,11 @@ use openssl::ssl::{SslContext, SslContextBuilder, SslFiletype, SslMethod}; use rand::distributions::Distribution; use rand::rngs::StdRng; use rand::{random, Rng, SeedableRng}; -use rune::ast; -use rune::ast::Kind; +use rune::alloc::fmt::TryWrite; use rune::macros::{quote, MacroContext, TokenStream}; use rune::parse::Parser; -use rune::runtime::{Object, Shared, TypeInfo, VmError}; +use rune::runtime::{Mut, Object, Ref, Shared, TypeInfo, VmError, VmResult}; +use rune::{ast, vm_try, vm_write}; use rune::{Any, Value}; use rust_embed::RustEmbed; use scylla::_macro_internal::ColumnType; @@ -234,6 +234,12 @@ pub enum CassErrorKind { } impl CassError { + #[rune::function(protocol = STRING_DISPLAY)] + pub fn string_display(&self, f: &mut rune::runtime::Formatter) -> VmResult<()> { + vm_write!(f, "{}", self.to_string()); + VmResult::Ok(()) + } + pub fn display(&self, buf: &mut String) -> std::fmt::Result { use std::fmt::Write; match &self.0 { @@ -420,7 +426,7 @@ impl Context { retry_number, retry_interval, load_cycle_count: 0, - data: Value::Object(Shared::new(Object::new())), + data: Value::Object(Shared::new(Object::new()).unwrap()), } } @@ -603,17 +609,6 @@ mod bind { (Value::Float(v), ColumnType::Float) => Ok(CqlValue::Float(*v as f32)), (Value::Float(v), ColumnType::Double) => Ok(CqlValue::Double(*v)), - (Value::StaticString(s), ColumnType::Timeuuid) => { - let timeuuid = CqlTimeuuid::from_str(s); - match timeuuid { - Ok(timeuuid) => Ok(CqlValue::Timeuuid(timeuuid)), - Err(e) => Err(CassError(CassErrorKind::QueryParamConversion( - format!("{:?}", v), - ColumnType::Timeuuid, - Some(format!("{}", e)), - ))), - } - } (Value::String(s), ColumnType::Timeuuid) => { let timeuuid_str = s.borrow_ref().unwrap(); let timeuuid = CqlTimeuuid::from_str(timeuuid_str.as_str()); @@ -626,23 +621,9 @@ mod bind { ))), } } - (Value::StaticString(v), ColumnType::Text | ColumnType::Ascii) => { - Ok(CqlValue::Text(v.as_str().to_string())) - } (Value::String(v), ColumnType::Text | ColumnType::Ascii) => { Ok(CqlValue::Text(v.borrow_ref().unwrap().as_str().to_string())) } - (Value::StaticString(s), ColumnType::Inet) => { - let ipaddr = IpAddr::from_str(s); - match ipaddr { - Ok(ipaddr) => Ok(CqlValue::Inet(ipaddr)), - Err(e) => Err(CassError(CassErrorKind::QueryParamConversion( - format!("{:?}", v), - ColumnType::Inet, - Some(format!("{}", e)), - ))), - } - } (Value::String(s), ColumnType::Inet) => { let ipaddr_str = s.borrow_ref().unwrap(); let ipaddr = IpAddr::from_str(ipaddr_str.as_str()); @@ -655,7 +636,6 @@ mod bind { ))), } } - (Value::Bytes(v), ColumnType::Blob) => { Ok(CqlValue::Blob(v.borrow_ref().unwrap().to_vec())) } @@ -688,7 +668,7 @@ mod bind { match tuple { Value::Tuple(tuple) if tuple.borrow_ref().unwrap().len() == 2 => { let tuple = tuple.borrow_ref().unwrap(); - let key = to_scylla_value(tuple.get(0).unwrap(), key_elt)?; + let key = to_scylla_value(tuple.first().unwrap(), key_elt)?; let value = to_scylla_value(tuple.get(1).unwrap(), value_elt)?; map_vec.push((key, value)); } @@ -710,7 +690,8 @@ mod bind { let obj = obj.borrow_ref().unwrap(); let mut map_vec = Vec::with_capacity(obj.keys().len()); for (k, v) in obj.iter() { - let key = to_scylla_value(&(k.to_owned().to_value().unwrap()), key_elt)?; + let key = String::from(k.as_str()); + let key = to_scylla_value(&(key.to_value().unwrap()), key_elt)?; let value = to_scylla_value(v, value_elt)?; map_vec.push((key, value)); } @@ -829,7 +810,7 @@ mod bind { } fn read_params<'a, 'b>( - get_value: impl Fn(&String) -> Option<&'a Value>, + get_value: impl Fn(&str) -> Option<&'a Value>, params: &[ColumnSpec], ) -> Result, CassError> { let mut values = Vec::with_capacity(params.len()); @@ -844,7 +825,7 @@ mod bind { } fn read_fields<'a, 'b>( - get_value: impl Fn(&String) -> Option<&'a Value>, + get_value: impl Fn(&str) -> Option<&'a Value>, fields: &[(String, ColumnType)], ) -> Result)>, CassError> { let mut values = Vec::with_capacity(fields.len()); @@ -877,9 +858,10 @@ impl Uuid { Uuid(builder.into_uuid()) } - pub fn display(&self, buf: &mut String) -> std::fmt::Result { - use std::fmt::Write; - write!(buf, "{}", self.0) + #[rune::function(protocol = STRING_DISPLAY)] + pub fn string_display(&self, f: &mut rune::runtime::Formatter) -> VmResult<()> { + vm_write!(f, "{}", self.0); + VmResult::Ok(()) } } @@ -901,78 +883,51 @@ pub fn param( ctx: &mut MacroContext, params: &HashMap, ts: &TokenStream, -) -> rune::Result { +) -> rune::compile::Result { let mut parser = Parser::from_token_stream(ts, ctx.macro_span()); let name = parser.parse::()?; let name = ctx.resolve(name)?.to_string(); - let sep = parser.next()?; - if sep.kind != Kind::Comma { - return Err(anyhow!("Expected comma")); - } + let _ = parser.parse::()?; let expr = parser.parse::()?; let rhs = match params.get(&name) { Some(value) => { - let src_id = ctx.insert_source(&name, value); + let src_id = ctx.insert_source(&name, value)?; let value = ctx.parse_source::(src_id)?; quote!(#value) } None => quote!(#expr), }; - Ok(rhs.into_token_stream(ctx)) + Ok(rhs.into_token_stream(ctx)?) } -/// Converts a Rune integer to i8 (Cassandra tinyint) -pub fn int_to_i8(value: i64) -> Option { - Some(Int8(value.try_into().ok()?)) +/// Creates a new UUID for current iteration +#[rune::function] +pub fn uuid(i: i64) -> Uuid { + Uuid::new(i) } +#[rune::function] pub fn float_to_i8(value: f64) -> Option { - int_to_i8(value as i64) -} - -/// Converts a Rune integer to i16 (Cassandra smallint) -pub fn int_to_i16(value: i64) -> Option { - Some(Int16(value.try_into().ok()?)) -} - -pub fn float_to_i16(value: f64) -> Option { - int_to_i16(value as i64) -} - -/// Converts a Rune integer to i32 (Cassandra int) -pub fn int_to_i32(value: i64) -> Option { - Some(Int32(value.try_into().ok()?)) -} - -pub fn float_to_i32(value: f64) -> Option { - int_to_i32(value as i64) -} - -pub fn int_to_f32(value: i64) -> Option { - Some(Float32(value as f32)) -} - -pub fn float_to_f32(value: f64) -> Option { - Some(Float32(value as f32)) -} - -pub fn int_to_string(value: i64) -> Option { - Some(value.to_string()) -} - -pub fn float_to_string(value: f64) -> Option { - Some(value.to_string()) + Some(Int8((value as i64).try_into().ok()?)) } /// Computes a hash of an integer value `i`. /// Returns a value in range `0..i64::MAX`. -pub fn hash(i: i64) -> i64 { +fn hash_inner(i: i64) -> i64 { let mut hash = MetroHash64::new(); i.hash(&mut hash); (hash.finish() & 0x7FFFFFFFFFFFFFFF) as i64 } +/// Computes a hash of an integer value `i`. +/// Returns a value in range `0..i64::MAX`. +#[rune::function] +pub fn hash(i: i64) -> i64 { + hash_inner(i) +} + /// Computes hash of two integer values. +#[rune::function] pub fn hash2(a: i64, b: i64) -> i64 { let mut hash = MetroHash64::new(); a.hash(&mut hash); @@ -982,65 +937,63 @@ pub fn hash2(a: i64, b: i64) -> i64 { /// Computes a hash of an integer value `i`. /// Returns a value in range `0..max`. +#[rune::function] pub fn hash_range(i: i64, max: i64) -> i64 { - hash(i) % max + hash_inner(i) % max } /// Generates a floating point value with normal distribution -pub fn normal(i: i64, mean: f64, std_dev: f64) -> Result { +#[rune::function] +pub fn normal(i: i64, mean: f64, std_dev: f64) -> VmResult { let mut rng = StdRng::seed_from_u64(i as u64); - let distribution = Normal::new(mean, std_dev).map_err(|e| VmError::panic(format!("{e}")))?; - Ok(distribution.sample(&mut rng)) + let distribution = + vm_try!(Normal::new(mean, std_dev).map_err(|e| VmError::panic(format!("{e}")))); + VmResult::Ok(distribution.sample(&mut rng)) } -pub fn uniform(i: i64, min: f64, max: f64) -> Result { +#[rune::function] +pub fn uniform(i: i64, min: f64, max: f64) -> VmResult { let mut rng = StdRng::seed_from_u64(i as u64); - let distribution = Uniform::new(min, max).map_err(|e| VmError::panic(format!("{e}")))?; - Ok(distribution.sample(&mut rng)) -} - -/// Restricts a value to a certain interval unless it is NaN. -pub fn clamp_float(value: f64, min: f64, max: f64) -> f64 { - value.clamp(min, max) -} - -/// Restricts a value to a certain interval. -pub fn clamp_int(value: i64, min: i64, max: i64) -> i64 { - value.clamp(min, max) + let distribution = vm_try!(Uniform::new(min, max).map_err(|e| VmError::panic(format!("{e}")))); + VmResult::Ok(distribution.sample(&mut rng)) } /// Generates random blob of data of given length. /// Parameter `seed` is used to seed the RNG. -pub fn blob(seed: i64, len: usize) -> rune::runtime::Bytes { +#[rune::function] +pub fn blob(seed: i64, len: usize) -> Vec { let mut rng = StdRng::seed_from_u64(seed as u64); - let v = (0..len).map(|_| rng.gen()).collect_vec(); - rune::runtime::Bytes::from_vec(v) + (0..len).map(|_| rng.gen::()).collect() } /// Generates random string of given length. -/// Parameter `seed` is used to seed the RNG. -pub fn text(seed: i64, len: usize) -> rune::runtime::StaticString { +/// Parameter `seed` is used to seed +/// the RNG. +#[rune::function] +pub fn text(seed: i64, len: usize) -> String { let mut rng = StdRng::seed_from_u64(seed as u64); - let s: String = (0..len) + (0..len) .map(|_| { let code_point = rng.gen_range(0x0061u32..=0x007Au32); // Unicode range for 'a-z' std::char::from_u32(code_point).unwrap() }) - .collect(); - rune::runtime::StaticString::new(s) + .collect() } /// Generates 'now' timestamp +#[rune::function] pub fn now_timestamp() -> i64 { Utc::now().timestamp() } /// Selects one item from the collection based on the hash of the given value. -pub fn hash_select(i: i64, collection: &[Value]) -> &Value { - &collection[hash_range(i, collection.len() as i64) as usize] +#[rune::function] +pub fn hash_select(i: i64, collection: &[Value]) -> Value { + collection[(hash_inner(i) % collection.len() as i64) as usize].clone() } /// Reads a file into a string. +#[rune::function] pub fn read_to_string(filename: &str) -> io::Result { let mut file = File::open(filename).expect("no such file"); @@ -1051,6 +1004,7 @@ pub fn read_to_string(filename: &str) -> io::Result { } /// Reads a file into a vector of lines. +#[rune::function] pub fn read_lines(filename: &str) -> io::Result> { let file = File::open(filename).expect("no such file"); let buf = BufReader::new(file); @@ -1062,7 +1016,7 @@ pub fn read_lines(filename: &str) -> io::Result> { } /// Reads a resource file as a string. -pub fn read_resource_to_string(path: &str) -> io::Result { +fn read_resource_to_string_inner(path: &str) -> io::Result { let resource = Resources::get(path).ok_or_else(|| { io::Error::new(ErrorKind::NotFound, format!("Resource not found: {path}")) })?; @@ -1071,9 +1025,114 @@ pub fn read_resource_to_string(path: &str) -> io::Result { Ok(contents.to_string()) } +#[rune::function] +pub fn read_resource_to_string(path: &str) -> io::Result { + read_resource_to_string_inner(path) +} + +#[rune::function] pub fn read_resource_lines(path: &str) -> io::Result> { - Ok(read_resource_to_string(path)? + Ok(read_resource_to_string_inner(path)? .split('\n') .map(|s| s.to_string()) .collect_vec()) } + +#[rune::function(instance)] +pub async fn prepare(mut ctx: Mut, key: Ref, cql: Ref) -> Result<(), CassError> { + ctx.prepare(&key, &cql).await +} + +#[rune::function(instance)] +pub async fn execute(ctx: Ref, cql: Ref) -> Result<(), CassError> { + ctx.execute(cql.deref()).await +} + +#[rune::function(instance)] +pub async fn execute_prepared( + ctx: Ref, + key: Ref, + params: Value, +) -> Result<(), CassError> { + ctx.execute_prepared(&key, params).await +} + +#[rune::function(instance)] +pub fn elapsed_secs(ctx: &Context) -> f64 { + ctx.elapsed_secs() +} + +pub mod i64 { + use crate::context::{Float32, Int16, Int32, Int8}; + + /// Converts a Rune integer to i8 (Cassandra tinyint) + #[rune::function(instance)] + pub fn to_i8(value: i64) -> Option { + Some(Int8(value.try_into().ok()?)) + } + + /// Converts a Rune integer to i16 (Cassandra smallint) + #[rune::function(instance)] + pub fn to_i16(value: i64) -> Option { + Some(Int16(value.try_into().ok()?)) + } + + /// Converts a Rune integer to i32 (Cassandra int) + #[rune::function(instance)] + pub fn to_i32(value: i64) -> Option { + Some(Int32(value.try_into().ok()?)) + } + + /// Converts a Rune integer to f32 (Cassandra float) + #[rune::function(instance)] + pub fn to_f32(value: i64) -> Float32 { + Float32(value as f32) + } + + /// Converts a Rune integer to a String + #[rune::function(instance)] + pub fn to_string(value: i64) -> String { + value.to_string() + } + + /// Restricts a value to a certain interval. + #[rune::function(instance)] + pub fn clamp(value: i64, min: i64, max: i64) -> i64 { + value.clamp(min, max) + } +} + +pub mod f64 { + use crate::context::{Float32, Int16, Int32, Int8}; + + #[rune::function(instance)] + pub fn to_i8(value: f64) -> Int8 { + Int8(value as i8) + } + + #[rune::function(instance)] + pub fn to_i16(value: f64) -> Int16 { + Int16(value as i16) + } + + #[rune::function(instance)] + pub fn to_i32(value: f64) -> Int32 { + Int32(value as i32) + } + + #[rune::function(instance)] + pub fn to_f32(value: f64) -> Float32 { + Float32(value as f32) + } + + #[rune::function(instance)] + pub fn to_string(value: f64) -> String { + value.to_string() + } + + /// Restricts a value to a certain interval unless it is NaN. + #[rune::function(instance)] + pub fn clamp(value: f64, min: f64, max: f64) -> f64 { + value.clamp(min, max) + } +} diff --git a/src/error.rs b/src/error.rs index 48e4e96..0927151 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,6 +2,7 @@ use crate::context::CassError; use err_derive::*; use hdrhistogram::serialization::interval_log::IntervalLogWriterError; use hdrhistogram::serialization::V2DeflateSerializeError; +use rune::alloc; use std::path::PathBuf; #[derive(Debug, Error)] @@ -16,7 +17,7 @@ pub enum LatteError { Cassandra(#[source] CassError), #[error(display = "Failed to read file {:?}: {}", _0, _1)] - ScriptRead(PathBuf, #[source] std::io::Error), + ScriptRead(PathBuf, #[source] rune::source::FromPathError), #[error(display = "Failed to load script: {}", _0)] ScriptBuildError(#[source] rune::BuildError), @@ -44,6 +45,9 @@ pub enum LatteError { #[error(display = "Invalid configuration: {}", _0)] Configuration(String), + + #[error(display = "Memory allocation failure: {}", _0)] + OutOfMemory(#[source] alloc::Error), } impl LatteError {} diff --git a/src/exec.rs b/src/exec.rs index 8b70ace..1c10526 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -40,10 +40,8 @@ fn interval_stream(rate: f64) -> IntervalStream { /// - concurrency: the maximum number of pending workload calls /// - sampling: controls when to output workload statistics /// - progress: progress bar notified about each successful cycle -/// - interrupt: allows for terminating the stream early /// - out: the channel to receive workload statistics /// -#[allow(clippy::too_many_arguments)] // todo: refactor async fn run_stream( stream: impl Stream + std::marker::Unpin, workload: Workload, diff --git a/src/main.rs b/src/main.rs index 34204f2..1b418a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,9 @@ use search_path::SearchPath; use tokio::runtime::{Builder, Runtime}; use tokio::task::spawn_blocking; use tracing::info; +use tracing::level_filters::LevelFilter; use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::EnvFilter; use walkdir::WalkDir; use crate::config::{ @@ -471,7 +473,7 @@ fn edit(config: EditCommand) -> Result<()> { let workload = find_workload(&config.workload) .canonicalize() .unwrap_or_else(|_| config.workload.to_path_buf()); - File::open(&workload).map_err(|err| LatteError::ScriptRead(workload.clone(), err))?; + File::open(&workload).map_err(|err| LatteError::ScriptRead(workload.clone(), err.into()))?; edit_workload(workload) } @@ -508,9 +510,18 @@ fn setup_logging(run_id: &str, config: &AppConfig) -> Result { .map_err(|e| LatteError::LogFileCreate(log_file.clone(), e))?; let log_file = File::create(&log_file).map_err(|e| LatteError::LogFileCreate(log_file, e))?; let (non_blocking, guard) = tracing_appender::non_blocking(log_file); + + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .with_env_var("LATTE_LOG") + .from_env() + .map_err(|e| LatteError::Configuration(e.to_string()))? + .add_directive("rune=off".parse().unwrap()); // turn off rune tracing for performance reasons + tracing_subscriber::fmt() .with_ansi(false) .with_writer(non_blocking) + .with_env_filter(filter) .init(); Ok(guard) } diff --git a/src/workload.rs b/src/workload.rs index f5cbd42..d48a69d 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::path::Path; use std::sync::Arc; @@ -6,9 +6,12 @@ use std::time::Duration; use std::time::Instant; use hdrhistogram::Histogram; -use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError}; +use rune::alloc::clone::TryClone; +use rune::compile::meta::Kind; +use rune::compile::{CompileVisitor, MetaError, MetaRef}; +use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError, VmResult}; use rune::termcolor::{ColorChoice, StandardStream}; -use rune::{Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm}; +use rune::{vm_try, Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm}; use try_lock::TryLock; use crate::error::LatteError; @@ -35,9 +38,9 @@ impl SessionRef<'_> { /// implementation and the compiler is not going to catch that. /// The receiver of a `Value` must ensure that it is dropped before `Session`! impl<'a> ToValue for SessionRef<'a> { - fn to_value(self) -> Result { + fn to_value(self) -> VmResult { let obj = unsafe { AnyObj::from_ref(self.context) }; - Ok(Value::from(Shared::new(obj))) + VmResult::Ok(Value::from(vm_try!(Shared::new(obj)))) } } @@ -55,15 +58,15 @@ impl ContextRefMut<'_> { /// Caution! See `impl ToValue for SessionRef`. impl<'a> ToValue for ContextRefMut<'a> { - fn to_value(self) -> Result { + fn to_value(self) -> VmResult { let obj = unsafe { AnyObj::from_mut(self.context) }; - Ok(Value::from(Shared::new(obj))) + VmResult::Ok(Value::from(vm_try!(Shared::new(obj)))) } } /// Stores the name and hash together. /// Name is used for message formatting, hash is used for fast function lookup. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq, Hash)] pub struct FnRef { name: String, hash: rune::Hash, @@ -89,6 +92,7 @@ pub struct Program { sources: Arc, context: Arc, unit: Arc, + meta: ProgramMetadata, } impl Program { @@ -101,99 +105,59 @@ impl Program { pub fn new(source: Source, params: HashMap) -> Result { let mut context_module = Module::default(); context_module.ty::().unwrap(); + context_module.function_meta(context::execute).unwrap(); + context_module.function_meta(context::prepare).unwrap(); context_module - .async_inst_fn("execute", Context::execute) - .unwrap(); - context_module - .async_inst_fn("prepare", Context::prepare) - .unwrap(); - context_module - .async_inst_fn("execute_prepared", Context::execute_prepared) - .unwrap(); - context_module - .inst_fn("elapsed_secs", Context::elapsed_secs) + .function_meta(context::execute_prepared) .unwrap(); + context_module.function_meta(context::elapsed_secs).unwrap(); let mut err_module = Module::default(); err_module.ty::().unwrap(); - err_module - .inst_fn(rune::runtime::Protocol::STRING_DISPLAY, CassError::display) - .unwrap(); + err_module.function_meta(CassError::string_display).unwrap(); let mut uuid_module = Module::default(); uuid_module.ty::().unwrap(); uuid_module - .inst_fn( - rune::runtime::Protocol::STRING_DISPLAY, - context::Uuid::display, - ) - .unwrap(); - - let mut latte_module = Module::with_crate("latte"); - latte_module.function(&["blob"], context::blob).unwrap(); - latte_module.function(&["text"], context::text).unwrap(); - latte_module - .function(&["now_timestamp"], context::now_timestamp) - .unwrap(); - latte_module.function(&["hash"], context::hash).unwrap(); - latte_module.function(&["hash2"], context::hash2).unwrap(); - latte_module - .function(&["hash_range"], context::hash_range) - .unwrap(); - latte_module - .function(&["hash_select"], context::hash_select) - .unwrap(); - latte_module - .function(&["uuid"], context::Uuid::new) - .unwrap(); - latte_module.function(&["normal"], context::normal).unwrap(); - latte_module - .function(&["uniform"], context::uniform) - .unwrap(); - latte_module - .macro_(&["param"], move |ctx, ts| context::param(ctx, ¶ms, ts)) + .function_meta(context::Uuid::string_display) .unwrap(); + let mut latte_module = Module::with_crate("latte").unwrap(); latte_module - .inst_fn("to_string", context::int_to_string) - .unwrap(); - latte_module - .inst_fn("to_string", context::float_to_string) + .macro_("param", move |ctx, ts| context::param(ctx, ¶ms, ts)) .unwrap(); - latte_module.inst_fn("to_i32", context::int_to_i32).unwrap(); - latte_module - .inst_fn("to_i32", context::float_to_i32) - .unwrap(); - latte_module.inst_fn("to_i16", context::int_to_i16).unwrap(); - latte_module - .inst_fn("to_i16", context::float_to_i16) - .unwrap(); - latte_module.inst_fn("to_i8", context::int_to_i8).unwrap(); - latte_module.inst_fn("to_i8", context::float_to_i8).unwrap(); - latte_module.inst_fn("to_f32", context::int_to_f32).unwrap(); - latte_module - .inst_fn("to_f32", context::float_to_f32) - .unwrap(); - - latte_module.inst_fn("clamp", context::clamp_float).unwrap(); - latte_module.inst_fn("clamp", context::clamp_int).unwrap(); - - let mut fs_module = Module::with_crate("fs"); + latte_module.function_meta(context::blob).unwrap(); + latte_module.function_meta(context::text).unwrap(); + latte_module.function_meta(context::now_timestamp).unwrap(); + latte_module.function_meta(context::hash).unwrap(); + latte_module.function_meta(context::hash2).unwrap(); + latte_module.function_meta(context::hash_range).unwrap(); + latte_module.function_meta(context::hash_select).unwrap(); + latte_module.function_meta(context::uuid).unwrap(); + latte_module.function_meta(context::normal).unwrap(); + latte_module.function_meta(context::uniform).unwrap(); + + latte_module.function_meta(context::i64::to_i32).unwrap(); + latte_module.function_meta(context::i64::to_i16).unwrap(); + latte_module.function_meta(context::i64::to_i8).unwrap(); + latte_module.function_meta(context::i64::to_f32).unwrap(); + latte_module.function_meta(context::i64::clamp).unwrap(); + + latte_module.function_meta(context::f64::to_i8).unwrap(); + latte_module.function_meta(context::f64::to_i16).unwrap(); + latte_module.function_meta(context::f64::to_i32).unwrap(); + latte_module.function_meta(context::f64::to_f32).unwrap(); + latte_module.function_meta(context::f64::clamp).unwrap(); + + let mut fs_module = Module::with_crate("fs").unwrap(); + fs_module.function_meta(context::read_to_string).unwrap(); + fs_module.function_meta(context::read_lines).unwrap(); fs_module - .function(&["read_to_string"], context::read_to_string) + .function_meta(context::read_resource_to_string) .unwrap(); fs_module - .function(&["read_lines"], context::read_lines) - .unwrap(); - fs_module - .function( - &["read_resource_to_string"], - context::read_resource_to_string, - ) - .unwrap(); - fs_module - .function(&["read_resource_lines"], context::read_resource_lines) + .function_meta(context::read_resource_lines) .unwrap(); let mut context = rune::Context::with_default_modules().unwrap(); @@ -208,9 +172,11 @@ impl Program { let mut diagnostics = Diagnostics::new(); let mut sources = Self::load_sources(source)?; + let mut meta = ProgramMetadata::new(); let unit = rune::prepare(&mut sources) .with_context(&context) .with_diagnostics(&mut diagnostics) + .with_visitor(&mut meta)? .build(); if !diagnostics.is_empty() { @@ -221,8 +187,9 @@ impl Program { Ok(Program { sources: Arc::new(sources), - context: Arc::new(context.runtime()), + context: Arc::new(context.runtime().unwrap()), unit: Arc::new(unit), + meta, }) } @@ -233,7 +200,7 @@ impl Program { Self::try_insert_lib_source(parent, &mut sources)? } } - sources.insert(source); + sources.insert(source)?; Ok(sources) } @@ -244,7 +211,7 @@ impl Program { sources.insert( Source::from_path(&lib_src) .map_err(|e| LatteError::ScriptRead(lib_src.clone(), e))?, - ); + )?; } Ok(()) } @@ -255,9 +222,10 @@ impl Program { /// sharing of Arc references. fn unshare(&self) -> Program { Program { + meta: self.meta.clone(), sources: self.sources.clone(), - context: Arc::new(self.context.as_ref().clone()), - unit: Arc::new(self.unit.as_ref().clone()), + context: Arc::new(self.context.as_ref().try_clone().unwrap()), + unit: Arc::new(self.unit.as_ref().try_clone().unwrap()), } } @@ -284,14 +252,9 @@ impl Program { let e = e.take_downcast::().unwrap(); return Err(LatteError::Cassandra(e)); } - let mut msg = String::new(); - let mut buf = String::new(); + let e = Value::Any(e); - self.vm().with(|| { - if e.string_display(&mut msg, &mut buf).unwrap().is_err() { - msg = format!("{e:?}") - } - }); + let msg = self.vm().with(|| format!("{e:?}")); Err(LatteError::FunctionResult(function_name.to_string(), msg)) } Err(other) => Err(LatteError::FunctionResult( @@ -318,7 +281,11 @@ impl Program { LatteError::ScriptExecError(fun.name.to_string(), e) }; let execution = self.vm().send_execute(fun.hash, args).map_err(handle_err)?; - let result = execution.async_complete().await.map_err(handle_err)?; + let result = execution + .async_complete() + .await + .into_result() + .map_err(handle_err)?; self.convert_error(fun.name.as_str(), result) } @@ -339,7 +306,7 @@ impl Program { } pub fn has_function(&self, function: &FnRef) -> bool { - self.unit.function(function.hash).is_some() + self.meta.functions.contains(function) } /// Calls the script's `init` function. @@ -368,6 +335,29 @@ impl Program { } } +#[derive(Clone)] +struct ProgramMetadata { + functions: HashSet, +} + +impl ProgramMetadata { + pub fn new() -> Self { + Self { + functions: HashSet::new(), + } + } +} + +impl CompileVisitor for ProgramMetadata { + fn register_meta(&mut self, meta: MetaRef<'_>) -> Result<(), MetaError> { + if let Kind::Function { .. } = meta.kind { + let name = meta.item.last().unwrap().to_string(); + self.functions.insert(FnRef::new(name.as_str())); + } + Ok(()) + } +} + /// Tracks statistics of the Rune function invoked by the workload #[derive(Clone, Debug)] pub struct FnStats {