diff --git a/Cargo.lock b/Cargo.lock index cc94b03a6081..4d783c34d546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1016,6 +1016,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "bitpacking" @@ -1064,9 +1067,9 @@ checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -1332,6 +1335,7 @@ name = "chroma-log" version = "0.1.0" dependencies = [ "async-trait", + "bytemuck", "chroma-config", "chroma-error", "chroma-segment", @@ -1339,7 +1343,10 @@ dependencies = [ "opentelemetry", "rand", "serde", + "serde_json", + "sqlx", "thiserror 1.0.69", + "tokio", "tonic", "tracing", "tracing-opentelemetry", @@ -1466,6 +1473,7 @@ dependencies = [ "prost 0.13.3", "prost-types", "roaring", + "serde", "thiserror 1.0.69", "tonic", "tonic-build", @@ -1667,6 +1675,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32c" version = "0.6.8" @@ -1751,6 +1774,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -1906,6 +1938,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "der" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.11" @@ -1945,6 +1988,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -1981,6 +2025,12 @@ dependencies = [ "syn 2.0.89", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast-rs" version = "1.2.1" @@ -1999,10 +2049,10 @@ version = "0.14.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" dependencies = [ - "der", + "der 0.6.1", "elliptic-curve", "rfc6979", - "signature", + "signature 1.6.4", ] [[package]] @@ -2010,6 +2060,9 @@ name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -2019,12 +2072,12 @@ checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" dependencies = [ "base16ct", "crypto-bigint 0.4.9", - "der", + "der 0.6.1", "digest", "ff", "generic-array", "group", - "pkcs8", + "pkcs8 0.9.0", "rand_core", "sec1", "subtle", @@ -2062,6 +2115,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "event-listener" version = "5.3.1" @@ -2435,6 +2499,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -2686,6 +2761,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "heck" version = "0.4.1" @@ -2716,6 +2800,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -3420,6 +3513,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin", +] [[package]] name = "levenshtein_automata" @@ -3513,6 +3609,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3886,6 +3993,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-complex" version = "0.4.6" @@ -4279,6 +4403,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -4372,14 +4505,35 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der 0.7.9", + "pkcs8 0.10.2", + "spki 0.7.3", +] + [[package]] name = "pkcs8" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" dependencies = [ - "der", - "spki", + "der 0.6.1", + "spki 0.6.0", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der 0.7.9", + "spki 0.7.3", ] [[package]] @@ -5028,6 +5182,26 @@ dependencies = [ "byteorder", ] +[[package]] +name = "rsa" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47c75d7c5c6b673e58bf54d8544a9f432e3a925b0e80f7cd3602ab5c50c55519" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8 0.10.2", + "rand_core", + "signature 2.2.0", + "spki 0.7.3", + "subtle", + "zeroize", +] + [[package]] name = "rtrb" version = "0.3.1" @@ -5299,9 +5473,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" dependencies = [ "base16ct", - "der", + "der 0.6.1", "generic-array", - "pkcs8", + "pkcs8 0.9.0", "subtle", "zeroize", ] @@ -5555,6 +5729,16 @@ dependencies = [ "rand_core", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "siphasher" version = "1.0.1" @@ -5584,6 +5768,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "snafu" @@ -5632,7 +5819,204 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" dependencies = [ "base64ct", - "der", + "der 0.6.1", +] + +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der 0.7.9", +] + +[[package]] +name = "sqlx" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4410e73b3c0d8442c5f99b425d7a435b5ee0ae4167b3196771dd3f7a01be745f" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a007b6936676aa9ab40207cde35daab0a04b823be8ae004368c0793b96a61e0" +dependencies = [ + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.2", + "hashlink", + "indexmap 2.6.0", + "log", + "memchr", + "once_cell", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "thiserror 2.0.4", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3112e2ad78643fef903618d78cf0aec1cb3134b019730edb039b69eaf531f310" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.89", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e9f90acc5ab146a99bf5061a7eb4976b573f560bc898ef3bf8435448dd5e7ad" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.89", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.4", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.4", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f85ca71d3a5b24e64e1d08dd8fe36c6c95c339a896cc33068148906784620540" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing", + "url", ] [[package]] @@ -5647,6 +6031,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.10.0" @@ -6473,12 +6868,33 @@ dependencies = [ "version_check", ] +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-width" version = "0.1.14" @@ -6631,6 +7047,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -6748,6 +7170,16 @@ dependencies = [ "rustix", ] +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall", + "wasite", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index f8bf31c1aaa0..f7bc8f2fcd2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ tracing-bunyan-formatter = "0.3" tracing-opentelemetry = "0.28.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] } +sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite"] } +bytemuck = "1.21.0" chroma-benchmark = { path = "rust/benchmark" } chroma-blockstore = { path = "rust/blockstore" } diff --git a/rust/log/Cargo.toml b/rust/log/Cargo.toml index 1c428f87054e..55f732293734 100644 --- a/rust/log/Cargo.toml +++ b/rust/log/Cargo.toml @@ -15,9 +15,12 @@ tracing = { workspace = true } # Used by tracing tracing-opentelemetry = { workspace = true } uuid = { workspace = true } +sqlx = { workspace = true } +tokio = { workspace = true } +serde_json = { workspace = true } +bytemuck = { workspace = true } chroma-config = { workspace = true } chroma-error = { workspace = true } chroma-segment = { workspace = true } chroma-types = { workspace = true } - diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index b87e51aa11b0..b5baddb8c64a 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -1,7 +1,7 @@ use super::config::LogConfig; use crate::tracing::client_interceptor; use crate::types::{ - CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError, + CollectionInfo, GetCollectionsWithNewDataError, UpdateCollectionLogOffsetError, }; use crate::PushLogsError; use async_trait::async_trait; @@ -18,6 +18,23 @@ use tonic::transport::Endpoint; use tonic::{Request, Status}; use uuid::Uuid; +#[derive(Error, Debug)] +pub enum GrpcPullLogsError { + #[error("Failed to fetch")] + FailedToPullLogs(#[from] tonic::Status), + #[error("Failed to convert proto embedding record into EmbeddingRecord")] + ConversionError(#[from] RecordConversionError), +} + +impl ChromaError for GrpcPullLogsError { + fn code(&self) -> ErrorCodes { + match self { + GrpcPullLogsError::FailedToPullLogs(_) => ErrorCodes::Internal, + GrpcPullLogsError::ConversionError(_) => ErrorCodes::Internal, + } + } +} + #[derive(Clone, Debug)] pub struct GrpcLog { #[allow(clippy::type_complexity)] @@ -100,7 +117,7 @@ impl GrpcLog { offset: i64, batch_size: i32, end_timestamp: Option, - ) -> Result, PullLogsError> { + ) -> Result, GrpcPullLogsError> { let end_timestamp = match end_timestamp { Some(end_timestamp) => end_timestamp, None => i64::MAX, @@ -124,7 +141,7 @@ impl GrpcLog { result.push(log_record); } Err(err) => { - return Err(PullLogsError::ConversionError(err)); + return Err(GrpcPullLogsError::ConversionError(err)); } } } @@ -132,7 +149,7 @@ impl GrpcLog { } Err(e) => { tracing::error!("Failed to pull logs: {}", e); - Err(PullLogsError::FailedToPullLogs(e)) + Err(GrpcPullLogsError::FailedToPullLogs(e)) } } } diff --git a/rust/log/src/lib.rs b/rust/log/src/lib.rs index 77cf2621c399..a7ea4621beac 100644 --- a/rust/log/src/lib.rs +++ b/rust/log/src/lib.rs @@ -3,6 +3,7 @@ pub mod grpc_log; pub mod in_memory_log; #[allow(clippy::module_inception)] mod log; +pub mod sqlite_log; pub mod test; pub mod tracing; pub mod types; diff --git a/rust/log/src/log.rs b/rust/log/src/log.rs index 6b5cdf04d4fd..2098b433e633 100644 --- a/rust/log/src/log.rs +++ b/rust/log/src/log.rs @@ -1,9 +1,10 @@ use crate::grpc_log::GrpcLog; use crate::in_memory_log::InMemoryLog; +use crate::sqlite_log::SqliteLog; use crate::types::{ - CollectionInfo, GetCollectionsWithNewDataError, PullLogsError, UpdateCollectionLogOffsetError, + CollectionInfo, GetCollectionsWithNewDataError, UpdateCollectionLogOffsetError, }; -use crate::PushLogsError; +use chroma_error::ChromaError; use chroma_types::{CollectionUuid, LogRecord, OperationRecord}; use std::fmt::Debug; @@ -20,6 +21,7 @@ pub struct CollectionRecord { #[derive(Clone, Debug)] pub enum Log { + Sqlite(SqliteLog), Grpc(GrpcLog), #[allow(dead_code)] InMemory(InMemoryLog), @@ -32,16 +34,20 @@ impl Log { offset: i64, batch_size: i32, end_timestamp: Option, - ) -> Result, PullLogsError> { + ) -> Result, Box> { match self { - Log::Grpc(log) => { - log.read(collection_id, offset, batch_size, end_timestamp) - .await - } - Log::InMemory(log) => { - log.read(collection_id, offset, batch_size, end_timestamp) - .await - } + Log::Sqlite(log) => log + .read(collection_id, offset, batch_size, end_timestamp) + .await + .map_err(|e| Box::new(e) as Box), + Log::Grpc(log) => log + .read(collection_id, offset, batch_size, end_timestamp) + .await + .map_err(|e| Box::new(e) as Box), + Log::InMemory(log) => log + .read(collection_id, offset, batch_size, end_timestamp) + .await + .map_err(|e| Box::new(e) as Box), } } @@ -49,9 +55,16 @@ impl Log { &mut self, collection_id: CollectionUuid, records: Vec, - ) -> Result<(), PushLogsError> { + ) -> Result<(), Box> { match self { - Log::Grpc(log) => log.push_logs(collection_id, records).await, + Log::Sqlite(log) => log + .push_logs(collection_id, records) + .await + .map_err(|e| Box::new(e) as Box), + Log::Grpc(log) => log + .push_logs(collection_id, records) + .await + .map_err(|e| Box::new(e) as Box), Log::InMemory(_) => unimplemented!(), } } @@ -61,6 +74,7 @@ impl Log { min_compaction_size: u64, ) -> Result, GetCollectionsWithNewDataError> { match self { + Log::Sqlite(_) => unimplemented!(), Log::Grpc(log) => log.get_collections_with_new_data(min_compaction_size).await, Log::InMemory(log) => log.get_collections_with_new_data(min_compaction_size).await, } @@ -72,6 +86,7 @@ impl Log { new_offset: i64, ) -> Result<(), UpdateCollectionLogOffsetError> { match self { + Log::Sqlite(_) => unimplemented!(), Log::Grpc(log) => { log.update_collection_log_offset(collection_id, new_offset) .await diff --git a/rust/log/src/sqlite_log.rs b/rust/log/src/sqlite_log.rs new file mode 100644 index 000000000000..8241bee7f939 --- /dev/null +++ b/rust/log/src/sqlite_log.rs @@ -0,0 +1,324 @@ +use chroma_error::{ChromaError, ErrorCodes}; +use chroma_types::{ + CollectionUuid, LogRecord, Operation, OperationRecord, ScalarEncoding, + ScalarEncodingConversionError, UpdateMetadata, UpdateMetadataValue, +}; +use sqlx::sqlite::SqlitePool; +use sqlx::{QueryBuilder, Row}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SqlitePullLogsError { + #[error("Query error: {0}")] + QueryError(#[from] sqlx::Error), + #[error("Failed to parse embedding encoding")] + InvalidEncoding(#[from] ScalarEncodingConversionError), + #[error("Failed to parse embedding: {0}")] + InvalidEmbedding(bytemuck::PodCastError), + #[error("Failed to parse metadata: {0}")] + InvalidMetadata(#[from] serde_json::Error), +} + +impl ChromaError for SqlitePullLogsError { + fn code(&self) -> ErrorCodes { + match self { + SqlitePullLogsError::QueryError(_) => ErrorCodes::Internal, + SqlitePullLogsError::InvalidEncoding(_) => ErrorCodes::Internal, + SqlitePullLogsError::InvalidEmbedding(_) => ErrorCodes::Internal, + SqlitePullLogsError::InvalidMetadata(_) => ErrorCodes::Internal, + } + } +} + +#[derive(Error, Debug)] +pub enum SqlitePushLogsError { + #[error("Query error: {0}")] + QueryError(#[from] sqlx::Error), + #[error("Failed to serialize metadata: {0}")] + InvalidMetadata(#[from] serde_json::Error), +} + +impl ChromaError for SqlitePushLogsError { + fn code(&self) -> ErrorCodes { + match self { + SqlitePushLogsError::QueryError(_) => ErrorCodes::Internal, + SqlitePushLogsError::InvalidMetadata(_) => ErrorCodes::Internal, + } + } +} + +#[derive(Clone, Debug)] +pub struct SqliteLog { + pool: SqlitePool, + tenant_id: String, + topic_namespace: String, +} + +impl SqliteLog { + pub(super) async fn read( + &mut self, + collection_id: CollectionUuid, + offset: i64, + batch_size: i32, + end_timestamp_ns: Option, + ) -> Result, SqlitePullLogsError> { + let topic = get_topic_name(&self.tenant_id, &self.topic_namespace, collection_id); + + let end_timestamp_ns = end_timestamp_ns.unwrap_or(i64::MAX); + + let logs = sqlx::query( + r#" + SELECT + seq_id, + id, + operation, + vector, + encoding, + metadata + FROM embeddings_queue + WHERE topic = ? + AND seq_id >= ? + AND CAST(strftime('%s', created_at) AS INTEGER) <= (? / 1000000000) + ORDER BY seq_id ASC + LIMIT ? + "#, + ) + .bind(topic) + .bind(offset) + .bind(end_timestamp_ns) + .bind(batch_size) + .fetch_all(&self.pool) + .await?; + + let records = logs + .into_iter() + .map(|row| { + let log_offset: i64 = row.get("seq_id"); + let id: String = row.get("id"); + let embedding_bytes = row.get::, _>("vector"); + let encoding = row + .get::, _>("encoding") + .map(ScalarEncoding::try_from) + .transpose()?; + let metadata_str = row.get::, _>("metadata"); + + // Parse embedding + let embedding = embedding_bytes + .map( + |embedding_bytes| -> Result, SqlitePullLogsError> { + match encoding { + Some(ScalarEncoding::FLOAT32) => { + let slice: &[f32] = + bytemuck::try_cast_slice(embedding_bytes) + .map_err(SqlitePullLogsError::InvalidEmbedding)?; + Ok(Some(slice.to_vec())) + } + Some(ScalarEncoding::INT32) => { + unimplemented!() + } + None => Ok(None), + } + }, + ) + .transpose()? + .flatten(); + + // Parse metadata + let parsed_metadata_and_document: Option<(UpdateMetadata, Option)> = + metadata_str + .map(|metadata_str| { + let mut parsed: UpdateMetadata = serde_json::from_str(metadata_str)?; + + let document = match parsed.remove("chroma:document") { + Some(UpdateMetadataValue::Str(document)) => Some(document), + None => None, + _ => panic!("Document not found in metadata"), + }; + + Ok::<_, SqlitePullLogsError>((parsed, document)) + }) + .transpose()?; + let document = parsed_metadata_and_document + .as_ref() + .and_then(|(_, document)| document.clone()); + let metadata = parsed_metadata_and_document.map(|(metadata, _)| metadata); + + let operation = operation_from_code(row.get("operation")); + + Ok(LogRecord { + log_offset, + record: OperationRecord { + id, + embedding, + encoding, + metadata, + document, + operation, + }, + }) + }) + .collect::, SqlitePullLogsError>>()?; + + Ok(records) + } + + pub(super) async fn push_logs( + &mut self, + collection_id: CollectionUuid, + records: Vec, + ) -> Result<(), SqlitePushLogsError> { + let topic = get_topic_name(&self.tenant_id, &self.topic_namespace, collection_id); + + let records_and_serialized_metadatas = records + .into_iter() + .map(|mut record| { + let mut empty_metadata = UpdateMetadata::new(); + + let metadata = record.metadata.as_mut().unwrap_or(&mut empty_metadata); + if let Some(ref document) = record.document { + metadata.insert( + "chroma:document".to_string(), + UpdateMetadataValue::Str(document.clone()), + ); + } + + let serialized = serde_json::to_string(&metadata)?; + Ok::<_, SqlitePushLogsError>((record, serialized)) + }) + .collect::, SqlitePushLogsError>>()?; + + // todo: insert into FTS index? + // todo: insert into embedding metadata? + + let mut query_builder = QueryBuilder::new( + "INSERT INTO embeddings_queue (topic, id, operation, vector, encoding, metadata) ", + ); + // todo: detect if over binding limit? + query_builder.push_values( + records_and_serialized_metadatas, + |mut builder, (record, serialized_metadata)| { + builder.push_bind(&topic); + builder.push_bind(record.id); + builder.push_bind(operation_to_code(record.operation)); + builder.push_bind::>>( + record + .embedding + .map(|e| bytemuck::cast_slice(e.as_slice()).to_vec()), + ); + builder.push_bind(record.encoding.map(String::from)); + builder.push_bind::(serialized_metadata); + }, + ); + let query = query_builder.build(); + query.execute(&self.pool).await?; + + Ok(()) + } +} + +fn get_topic_name(tenant: &str, namespace: &str, collection_id: CollectionUuid) -> String { + format!("persistent://{}/{}/{}", tenant, namespace, collection_id) +} + +fn operation_from_code(code: u32) -> Operation { + // chromadb/db/mixins/embeddings_queue.py + match code { + 0 => Operation::Add, + 1 => Operation::Update, + 2 => Operation::Upsert, + 3 => Operation::Delete, + _ => panic!("Invalid operation code"), + } +} + +fn operation_to_code(operation: Operation) -> u32 { + match operation { + Operation::Add => 0, + Operation::Update => 1, + Operation::Upsert => 2, + Operation::Delete => 3, + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::*; + use chroma_types::CollectionUuid; + use sqlx::sqlite::SqlitePoolOptions; + + #[tokio::test] + async fn test_pull_logs() { + let pool = SqlitePoolOptions::new() + .connect("sqlite:///Users/maxisom/git/chroma/chroma_data/chroma.sqlite3") + .await + .unwrap(); + + let mut log = SqliteLog { + pool, + tenant_id: "default".to_string(), + topic_namespace: "default".to_string(), + }; + + let collection_id = + CollectionUuid::from_str("f54138fd-4f8a-4fb1-afec-830f0684c3fb").unwrap(); + let offset = 0; + let batch_size = 10; + let end_timestamp_ns = None; + + let logs = log + .read(collection_id, offset, batch_size, end_timestamp_ns) + .await + .unwrap(); + + for log in logs { + println!("{:?}", log); + } + } + + #[tokio::test] + async fn test_push_logs() { + let pool = SqlitePoolOptions::new() + .connect("sqlite:///Users/maxisom/git/chroma/chroma_data/chroma_mut.sqlite3") + .await + .unwrap(); + + let mut log = SqliteLog { + pool, + tenant_id: "default".to_string(), + topic_namespace: "default".to_string(), + }; + + let collection_id = + CollectionUuid::from_str("f54138fd-4f8a-4fb1-afec-830f0684c3fb").unwrap(); + let mut metadata = UpdateMetadata::new(); + metadata.insert( + "foo".to_string(), + UpdateMetadataValue::Str("bar".to_string()), + ); + + let record_to_add = OperationRecord { + id: "foo".to_string(), + embedding: Some(vec![1.0, 2.0, 3.0]), + encoding: Some(ScalarEncoding::FLOAT32), + metadata: Some(metadata), + document: Some("bar".to_string()), + operation: Operation::Add, + }; + + log.push_logs(collection_id, vec![record_to_add.clone()]) + .await + .unwrap(); + + let logs = log.read(collection_id, 0, 100, None).await.unwrap(); + let added_log = logs.iter().find(|log| log.record.id == "foo").unwrap(); + + assert_eq!(added_log.record.id, record_to_add.id); + assert_eq!(added_log.record.embedding, record_to_add.embedding); + assert_eq!(added_log.record.encoding, record_to_add.encoding); + assert_eq!(added_log.record.metadata, record_to_add.metadata); + assert_eq!(added_log.record.document, record_to_add.document); + assert_eq!(added_log.record.operation, record_to_add.operation); + } +} diff --git a/rust/types/Cargo.toml b/rust/types/Cargo.toml index 6cead6343c37..73d78eb0dc14 100644 --- a/rust/types/Cargo.toml +++ b/rust/types/Cargo.toml @@ -13,6 +13,7 @@ roaring = { workspace = true } thiserror = { workspace = true } tonic = { workspace = true } uuid = { workspace = true } +serde = { workspace = true } chroma-error = { workspace = true } diff --git a/rust/types/src/metadata.rs b/rust/types/src/metadata.rs index 545fd559bfa4..6edc451f79fd 100644 --- a/rust/types/src/metadata.rs +++ b/rust/types/src/metadata.rs @@ -1,4 +1,5 @@ use chroma_error::{ChromaError, ErrorCodes}; +use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use std::{ cmp::Ordering, collections::{HashMap, HashSet}, @@ -91,6 +92,91 @@ impl TryFrom<&UpdateMetadataValue> for MetadataValue { } } +impl Serialize for UpdateMetadataValue { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + UpdateMetadataValue::Bool(b) => serializer.serialize_bool(*b), + UpdateMetadataValue::Int(i) => serializer.serialize_i64(*i), + UpdateMetadataValue::Float(f) => serializer.serialize_f64(*f), + UpdateMetadataValue::Str(s) => serializer.serialize_str(s), + UpdateMetadataValue::None => serializer.serialize_unit(), + } + } +} + +impl<'de> Deserialize<'de> for UpdateMetadataValue { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct UpdateMetadataValueVisitor; + + impl Visitor<'_> for UpdateMetadataValueVisitor { + type Value = UpdateMetadataValue; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a bool, an integer, a float, a string, or null") + } + + fn visit_bool(self, value: bool) -> Result { + Ok(UpdateMetadataValue::Bool(value)) + } + + fn visit_i64(self, value: i64) -> Result { + Ok(UpdateMetadataValue::Int(value)) + } + + fn visit_u64(self, value: u64) -> Result + where + E: serde::de::Error, + { + // Because Serde may parse some integers as u64, + // convert to i64 if it fits, or fail otherwise: + if value <= i64::MAX as u64 { + Ok(UpdateMetadataValue::Int(value as i64)) + } else { + Err(E::invalid_value( + serde::de::Unexpected::Unsigned(value), + &self, + )) + } + } + + fn visit_f64(self, value: f64) -> Result { + Ok(UpdateMetadataValue::Float(value)) + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + Ok(UpdateMetadataValue::Str(value.to_owned())) + } + + fn visit_string(self, value: String) -> Result + where + E: serde::de::Error, + { + Ok(UpdateMetadataValue::Str(value)) + } + + fn visit_none(self) -> Result { + Ok(UpdateMetadataValue::None) + } + + fn visit_unit(self) -> Result { + // null in JSON is represented as unit in Serde + Ok(UpdateMetadataValue::None) + } + } + + deserializer.deserialize_any(UpdateMetadataValueVisitor) + } +} + /* =========================================== MetadataValue diff --git a/rust/types/src/scalar_encoding.rs b/rust/types/src/scalar_encoding.rs index 7c1a45f6eee3..49d1a2fea03e 100644 --- a/rust/types/src/scalar_encoding.rs +++ b/rust/types/src/scalar_encoding.rs @@ -49,6 +49,27 @@ impl TryFrom for ScalarEncoding { } } +impl TryFrom<&str> for ScalarEncoding { + type Error = ScalarEncodingConversionError; + + fn try_from(encoding: &str) -> Result { + match encoding { + "FLOAT32" => Ok(ScalarEncoding::FLOAT32), + "INT32" => Ok(ScalarEncoding::INT32), + _ => Err(ScalarEncodingConversionError::InvalidEncoding), + } + } +} + +impl From for String { + fn from(encoding: ScalarEncoding) -> String { + match encoding { + ScalarEncoding::FLOAT32 => "FLOAT32".to_string(), + ScalarEncoding::INT32 => "INT32".to_string(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/worker/src/execution/operators/fetch_log.rs b/rust/worker/src/execution/operators/fetch_log.rs index 1518bbc6c5c0..d984858cb497 100644 --- a/rust/worker/src/execution/operators/fetch_log.rs +++ b/rust/worker/src/execution/operators/fetch_log.rs @@ -2,7 +2,7 @@ use std::time::{SystemTime, SystemTimeError, UNIX_EPOCH}; use async_trait::async_trait; use chroma_error::{ChromaError, ErrorCodes}; -use chroma_log::{Log, PullLogsError}; +use chroma_log::Log; use chroma_system::{Operator, OperatorType}; use chroma_types::{Chunk, CollectionUuid, LogRecord}; use thiserror::Error; @@ -43,7 +43,7 @@ pub type FetchLogOutput = Chunk; #[derive(Error, Debug)] pub enum FetchLogError { #[error("Error when pulling log: {0}")] - PullLog(#[from] PullLogsError), + PullLog(#[from] Box), #[error("Error when capturing system time: {0}")] SystemTime(#[from] SystemTimeError), }