diff --git a/Cargo.lock b/Cargo.lock index a0ee8949d1..1ea1161826 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1257,6 +1257,27 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.65.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.29", +] + [[package]] name = "bindgen" version = "0.66.1" @@ -2803,8 +2824,10 @@ dependencies = [ name = "dozer-recordstore" version = "0.1.0" dependencies = [ + "dozer-storage", "dozer-types", "slice-dst", + "tempdir", ] [[package]] @@ -2853,6 +2876,7 @@ dependencies = [ "lmdb-rkv-sys", "page_size", "pin-project", + "rocksdb", "tempdir", "tokio", ] @@ -4503,6 +4527,22 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" +[[package]] +name = "librocksdb-sys" +version = "0.11.0+8.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" +dependencies = [ + "bindgen 0.65.1", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + [[package]] name = "libsqlite3-sys" version = "0.25.2" @@ -5032,7 +5072,7 @@ checksum = "57349d5a326b437989b6ee4dc8f2f34b0cc131202748414712a8e7d98952fc8c" dependencies = [ "base64 0.21.0", "bigdecimal", - "bindgen", + "bindgen 0.66.1", "bitflags 2.4.0", "bitvec 1.0.1", "byteorder", @@ -6836,6 +6876,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cbf4a6aa5f6d6888f39e980649f3ad6b666acdce1d78e95b8a2cb076e687ae30" +[[package]] +name = "rocksdb" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6f170a4041d50a0ce04b0d2e14916d6ca863ea2e422689a5b694395d299ffe" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rstar" version = "0.11.0" diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 05edb37231..73dc4630e4 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -2,10 +2,9 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint; use dozer_cache::dozer_log::camino::Utf8Path; use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir}; use dozer_cache::dozer_log::replication::Log; -use dozer_core::checkpoint::OptionCheckpoint; +use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint}; use dozer_tracing::LabelsAndProgress; use dozer_types::models::api_endpoint::ApiEndpoint; -use dozer_types::models::app_config::DataStorage; use dozer_types::models::flags::Flags; use tokio::runtime::Runtime; use tokio::sync::Mutex; @@ -47,7 +46,7 @@ impl<'a> Executor<'a> { sources: &'a [Source], sql: Option<&'a str>, api_endpoints: &'a [ApiEndpoint], - storage_config: DataStorage, + checkpoint_options: CheckpointOptions, labels: LabelsAndProgress, udfs: &'a [UdfConfig], ) -> Result, OrchestrationError> { @@ -59,7 +58,7 @@ impl<'a> Executor<'a> { // Load pipeline checkpoint. let checkpoint = - OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?; + OptionCheckpoint::new(build_path.data_dir.to_string(), checkpoint_options).await?; let mut endpoint_and_logs = vec![]; for endpoint in api_endpoints { diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 0f51a8b340..9a5596e4dc 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -5,7 +5,10 @@ use crate::pipeline::PipelineBuilder; use crate::shutdown::ShutdownReceiver; use crate::simple::build; use crate::simple::helper::validate_config; -use crate::utils::{get_cache_manager_options, get_default_max_num_records, get_executor_options}; +use crate::utils::{ + get_cache_manager_options, get_checkpoint_options, get_default_max_num_records, + get_executor_options, +}; use crate::{flatten_join_handle, join_handle_map_err}; use dozer_api::auth::{Access, Authorizer}; @@ -226,7 +229,7 @@ impl SimpleOrchestrator { &self.config.sources, self.config.sql.as_deref(), &self.config.endpoints, - self.config.app.data_storage.clone(), + get_checkpoint_options(&self.config), self.labels.clone(), &self.config.udfs, ))?; diff --git a/dozer-cli/src/utils.rs b/dozer-cli/src/utils.rs index 6ec59d1b0a..60f5faef8f 100644 --- a/dozer-cli/src/utils.rs +++ b/dozer-cli/src/utils.rs @@ -1,6 +1,8 @@ use dozer_cache::cache::CacheManagerOptions; use dozer_core::{ - checkpoint::CheckpointFactoryOptions, epoch::EpochManagerOptions, executor::ExecutorOptions, + checkpoint::{CheckpointFactoryOptions, CheckpointOptions}, + epoch::EpochManagerOptions, + executor::ExecutorOptions, }; use dozer_types::{ constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS, @@ -62,6 +64,14 @@ fn get_max_interval_before_persist_in_seconds(config: &Config) -> u64 { .unwrap_or_else(default_max_interval_before_persist_in_seconds) } +pub fn get_checkpoint_options(config: &Config) -> CheckpointOptions { + let app = &config.app; + CheckpointOptions { + data_storage: app.data_storage.clone(), + record_store: app.record_store, + } +} + fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions { CheckpointFactoryOptions { persist_queue_capacity: config diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index d0631c4147..aff19563fd 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -10,7 +10,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer, use dozer_types::{ bincode, log::{error, info}, - models::app_config::DataStorage, + models::app_config::{DataStorage, RecordStore}, node::{NodeHandle, OpIdentifier, SourceStates, TableState}, parking_lot::Mutex, serde::{Deserialize, Serialize}, @@ -58,14 +58,21 @@ pub struct OptionCheckpoint { checkpoint: Option, } +#[derive(Debug, Clone, Default)] +pub struct CheckpointOptions { + pub data_storage: DataStorage, + pub record_store: RecordStore, +} + impl OptionCheckpoint { pub async fn new( checkpoint_dir: String, - storage_config: DataStorage, + options: CheckpointOptions, ) -> Result { let (storage, prefix) = - create_data_storage(storage_config, checkpoint_dir.to_string()).await?; - let (record_store, checkpoint) = read_record_store_slices(&*storage, &prefix).await?; + create_data_storage(options.data_storage, checkpoint_dir.to_string()).await?; + let (record_store, checkpoint) = + read_record_store_slices(&*storage, &prefix, options.record_store).await?; if let Some(checkpoint) = &checkpoint { info!( "Restored record store from {}th checkpoint, last epoch id is {}, processor states are stored in {}", @@ -289,8 +296,9 @@ impl Drop for CheckpointWriter { async fn read_record_store_slices( storage: &dyn Storage, factory_prefix: &str, + record_store: RecordStore, ) -> Result<(ProcessorRecordStoreDeserializer, Option), ExecutionError> { - let record_store = ProcessorRecordStoreDeserializer::new()?; + let record_store = ProcessorRecordStoreDeserializer::new(record_store)?; let record_store_prefix = record_store_prefix(factory_prefix); let mut last_checkpoint: Option = None; @@ -358,7 +366,7 @@ async fn read_record_store_slices( pub async fn create_checkpoint_for_test() -> (TempDir, OptionCheckpoint) { let temp_dir = TempDir::new("create_checkpoint_for_test").unwrap(); let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string(); - let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local) + let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default()) .await .unwrap(); (temp_dir, checkpoint) @@ -371,7 +379,7 @@ pub async fn create_checkpoint_factory_for_test( // Create empty checkpoint storage. let temp_dir = TempDir::new("create_checkpoint_factory_for_test").unwrap(); let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string(); - let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local) + let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default()) .await .unwrap(); let (checkpoint_factory, handle) = CheckpointFactory::new(checkpoint, Default::default()) @@ -404,7 +412,7 @@ pub async fn create_checkpoint_factory_for_test( handle.await.unwrap(); // Create a new factory that loads from the checkpoint. - let checkpoint = OptionCheckpoint::new(checkpoint_dir, DataStorage::Local) + let checkpoint = OptionCheckpoint::new(checkpoint_dir, Default::default()) .await .unwrap(); let last_checkpoint = checkpoint.checkpoint.as_ref().unwrap(); diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs index 2ab38b64be..cb4cd7c583 100644 --- a/dozer-core/src/executor/receiver_loop.rs +++ b/dozer-core/src/executor/receiver_loop.rs @@ -202,7 +202,7 @@ mod tests { #[test] fn receiver_loop_forwards_op() { let (mut test_loop, senders) = TestReceiverLoop::new(2); - let record_store = ProcessorRecordStore::new().unwrap(); + let record_store = ProcessorRecordStore::new(Default::default()).unwrap(); let record: ProcessorRecord = record_store .create_record(&Record::new(vec![Field::Int(1)])) .unwrap(); diff --git a/dozer-recordstore/Cargo.toml b/dozer-recordstore/Cargo.toml index 7aec7fa7e2..a66b543d54 100644 --- a/dozer-recordstore/Cargo.toml +++ b/dozer-recordstore/Cargo.toml @@ -10,4 +10,6 @@ fuzz = ["dozer-types/arbitrary"] [dependencies] dozer-types = { path = "../dozer-types" } +dozer-storage = { path = "../dozer-storage" } slice-dst = { version = "1.5.1", default-features = false } +tempdir = "0.3.7" diff --git a/dozer-recordstore/src/in_memory/mod.rs b/dozer-recordstore/src/in_memory/mod.rs new file mode 100644 index 0000000000..a3866294cf --- /dev/null +++ b/dozer-recordstore/src/in_memory/mod.rs @@ -0,0 +1,496 @@ +//! [`RecordRef`] is a compact representation of a collection of [dozer_types::types::Field]s +//! There are two principles that make this representation more compact than `[Field]`: +//! 1. The fields and their types are stored as a Struct of Arrays instead of +//! and Array of Structs. This makes it possible to pack the discriminants +//! for the field types as a byte per field, instead of taking up a full word, +//! which is the case in [Field] (because the variant value must be aligned) +//! 2. The field values are stored packed. In a `[Field]` representation, each +//! field takes as much space as the largest enum variant in [Field] (plus its discriminant, +//! see (1.)). Instead, for the compact representation, we pack the values into +//! align_of::() sized slots. This way, a u64 takes only 8 bytes, whereas +//! a u128 can still use its 16 bytes. +use std::alloc::{dealloc, handle_alloc_error, Layout}; +use std::sync::Arc; +use std::{hash::Hash, ptr::NonNull}; + +use slice_dst::SliceWithHeader; + +use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; +use dozer_types::json_types::JsonValue; +use dozer_types::ordered_float::OrderedFloat; +use dozer_types::rust_decimal::Decimal; +use dozer_types::types::{DozerDuration, DozerPoint}; +use dozer_types::{ + serde::{Deserialize, Serialize}, + types::{Field, FieldType}, +}; + +// The alignment of an enum is necessarily the maximum alignment of its variants +// (otherwise it would be unsound to read from it). +// So, by using the alignment of `Field` as the alignment of the values in our +// packed `RecordRef`, we ensure that all accesses are aligned. +// This wastes a little bit of memory for subsequent fields that have +// smaller minimum alignment and size (such as `bool`, which has size=1, align=1), +// but in practice this should be negligible compared to the added effort of +// packing these fields while keeping everything aligned. +const MAX_ALIGN: usize = std::mem::align_of::(); + +#[repr(transparent)] +#[derive(Debug)] +/// `repr(transparent)` inner struct so we can implement drop logic on it +/// This is a `slice_dst` `SliceWithHeader` so we can make a fat Arc, saving a level +/// of indirection and a pointer which would otherwise be needed for the field types +struct RecordRefInner(SliceWithHeader, Option>); + +unsafe impl Send for RecordRefInner {} +unsafe impl Sync for RecordRefInner {} + +#[derive(Debug, Clone)] +pub struct RecordRef(Arc); + +impl PartialEq for RecordRef { + fn eq(&self, other: &Self) -> bool { + self.load() == other.load() + } +} + +impl Eq for RecordRef {} + +impl Hash for RecordRef { + fn hash(&self, state: &mut H) { + self.load().hash(state) + } +} + +impl<'de> Deserialize<'de> for RecordRef { + fn deserialize(deserializer: D) -> Result + where + D: dozer_types::serde::Deserializer<'de>, + { + let fields = Vec::::deserialize(deserializer)?; + let owned_fields: Vec<_> = fields.iter().map(FieldRef::cloned).collect(); + Ok(Self::new(owned_fields)) + } +} +impl Serialize for RecordRef { + fn serialize(&self, serializer: S) -> Result + where + S: dozer_types::serde::Serializer, + { + self.load().serialize(serializer) + } +} + +#[inline(always)] +unsafe fn adjust_alignment(ptr: *mut u8) -> *mut u8 { + ptr.add(ptr.align_offset(std::mem::align_of::())) +} +/// # Safety +/// ptr should be valid for writing a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +unsafe fn write(ptr: *mut u8, value: T) -> *mut u8 { + let ptr = adjust_alignment::(ptr) as *mut T; + ptr.write(value); + ptr.add(1) as *mut u8 +} + +/// # Safety +/// ptr should be valid for reading a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +/// and the memory read should be initialized. +/// The returned reference is only valid as long as pointed to memory is valid +/// for reading. +unsafe fn read_ref<'a, T>(ptr: *mut u8) -> (*mut u8, &'a T) { + let ptr = adjust_alignment::(ptr) as *mut T; + let result = &*ptr; + (ptr.add(1) as *mut u8, result) +} + +/// # Safety +/// ptr should be valid for reading a `T`, +/// that is, ptr..ptr + size_of:: should be inside a single live allocation +/// and the memory read should be initialized. +/// This takes ownership of the memory returned as `T`, which means dropping `T` +/// may make future reads from `ptr` undefined behavior +unsafe fn read(ptr: *mut u8) -> (*mut u8, T) { + let ptr = adjust_alignment::(ptr) as *mut T; + let result = ptr.read(); + (ptr.add(1) as *mut u8, result) +} + +/// # Safety +/// `ptr` should be valid for reading the contents of a `Field` with the type +/// corresponding to `field_type`. +/// See `read_ref` +unsafe fn read_field_ref<'a>(ptr: *mut u8, field_type: FieldType) -> (*mut u8, FieldRef<'a>) { + match field_type { + FieldType::UInt => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::UInt(*value)) + } + FieldType::U128 => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::U128(*value)) + } + + FieldType::Int => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Int(*value)) + } + + FieldType::I128 => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::I128(*value)) + } + + FieldType::Float => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Float(*value)) + } + + FieldType::Boolean => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Boolean(*value)) + } + + FieldType::String => { + let (ptr, value): (_, &String) = read_ref(ptr); + (ptr, FieldRef::String(value)) + } + FieldType::Text => { + let (ptr, value): (_, &String) = read_ref(ptr); + (ptr, FieldRef::Text(value)) + } + FieldType::Binary => { + let (ptr, value): (_, &Vec) = read_ref(ptr); + (ptr, FieldRef::Binary(value)) + } + FieldType::Decimal => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Decimal(*value)) + } + FieldType::Timestamp => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Timestamp(*value)) + } + FieldType::Date => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Date(*value)) + } + FieldType::Json => { + let (ptr, value) = read_ref::(ptr); + (ptr, FieldRef::Json(value.to_owned())) + } + FieldType::Point => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Point(*value)) + } + FieldType::Duration => { + let (ptr, value) = read_ref(ptr); + (ptr, FieldRef::Duration(*value)) + } + } +} +unsafe fn read_field(ptr: *mut u8, field_type: FieldType) -> (*mut u8, Field) { + match field_type { + FieldType::UInt => { + let (ptr, value) = read(ptr); + (ptr, Field::UInt(value)) + } + FieldType::U128 => { + let (ptr, value) = read(ptr); + (ptr, Field::U128(value)) + } + + FieldType::Int => { + let (ptr, value) = read(ptr); + (ptr, Field::Int(value)) + } + + FieldType::I128 => { + let (ptr, value) = read(ptr); + (ptr, Field::I128(value)) + } + + FieldType::Float => { + let (ptr, value) = read(ptr); + (ptr, Field::Float(value)) + } + + FieldType::Boolean => { + let (ptr, value) = read(ptr); + (ptr, Field::Boolean(value)) + } + + FieldType::String => { + let (ptr, value) = read(ptr); + (ptr, Field::String(value)) + } + FieldType::Text => { + let (ptr, value) = read(ptr); + (ptr, Field::String(value)) + } + FieldType::Binary => { + let (ptr, value) = read(ptr); + (ptr, Field::Binary(value)) + } + FieldType::Decimal => { + let (ptr, value) = read(ptr); + (ptr, Field::Decimal(value)) + } + FieldType::Timestamp => { + let (ptr, value) = read(ptr); + (ptr, Field::Timestamp(value)) + } + FieldType::Date => { + let (ptr, value) = read(ptr); + (ptr, Field::Date(value)) + } + FieldType::Json => { + let (ptr, value) = read::(ptr); + (ptr, Field::Json(value)) + } + FieldType::Point => { + let (ptr, value) = read(ptr); + (ptr, Field::Point(value)) + } + FieldType::Duration => { + let (ptr, value) = read(ptr); + (ptr, Field::Duration(value)) + } + } +} + +#[inline(always)] +fn add_field_size(size: &mut usize) { + let align = std::mem::align_of::(); + // Align the start of the field + *size = (*size + (align - 1)) & !(align - 1); + *size += std::mem::size_of::(); +} +fn size(fields: &[Option]) -> usize { + let mut size = 0; + for field in fields.iter().flatten() { + match field { + FieldType::UInt => add_field_size::(&mut size), + FieldType::U128 => add_field_size::(&mut size), + FieldType::Int => add_field_size::(&mut size), + FieldType::I128 => add_field_size::(&mut size), + FieldType::Float => add_field_size::>(&mut size), + FieldType::Boolean => add_field_size::(&mut size), + FieldType::String => add_field_size::(&mut size), + FieldType::Text => add_field_size::(&mut size), + FieldType::Binary => add_field_size::>(&mut size), + FieldType::Decimal => add_field_size::(&mut size), + FieldType::Timestamp => add_field_size::>(&mut size), + FieldType::Date => add_field_size::(&mut size), + FieldType::Json => add_field_size::(&mut size), + FieldType::Point => add_field_size::(&mut size), + FieldType::Duration => add_field_size::(&mut size), + } + } + size +} + +#[derive(Hash, Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(crate = "dozer_types::serde")] +pub enum FieldRef<'a> { + UInt(u64), + U128(u128), + Int(i64), + I128(i128), + Float(OrderedFloat), + Boolean(bool), + String(&'a str), + Text(&'a str), + Binary(&'a [u8]), + Decimal(Decimal), + Timestamp(DateTime), + Date(NaiveDate), + Json(JsonValue), + Point(DozerPoint), + Duration(DozerDuration), + Null, +} + +impl FieldRef<'_> { + pub fn cloned(&self) -> Field { + match self { + FieldRef::UInt(v) => Field::UInt(*v), + FieldRef::U128(v) => Field::U128(*v), + FieldRef::Int(v) => Field::Int(*v), + FieldRef::I128(v) => Field::I128(*v), + FieldRef::Float(v) => Field::Float(*v), + FieldRef::Boolean(v) => Field::Boolean(*v), + FieldRef::String(v) => Field::String((*v).to_owned()), + FieldRef::Text(v) => Field::Text((*v).to_owned()), + FieldRef::Binary(v) => Field::Binary((*v).to_vec()), + FieldRef::Decimal(v) => Field::Decimal(*v), + FieldRef::Timestamp(v) => Field::Timestamp(*v), + FieldRef::Date(v) => Field::Date(*v), + FieldRef::Json(v) => Field::Json(v.clone()), + FieldRef::Point(v) => Field::Point(*v), + FieldRef::Duration(v) => Field::Duration(*v), + FieldRef::Null => Field::Null, + } + } +} + +impl RecordRef { + pub fn new(fields: Vec) -> Self { + let field_types = fields + .iter() + .map(|field| field.ty()) + .collect::]>>(); + let size = size(&field_types); + + let layout = Layout::from_size_align(size, MAX_ALIGN).unwrap(); + // SAFETY: Everything is `ALIGN` byte aligned + let data = unsafe { + let data = std::alloc::alloc(layout); + if data.is_null() { + handle_alloc_error(layout); + } + data + }; + // SAFETY: We checked for null above + let data = unsafe { NonNull::new_unchecked(data) }; + let mut ptr = data.as_ptr(); + + // SAFETY: + // - ptr is non-null (we got it from a NonNull) + // - ptr is dereferencable (its memory range is large enough and not de-allocated) + // + unsafe { + for field in fields { + match field { + Field::UInt(v) => ptr = write(ptr, v), + Field::U128(v) => ptr = write(ptr, v), + Field::Int(v) => ptr = write(ptr, v), + Field::I128(v) => ptr = write(ptr, v), + Field::Float(v) => ptr = write(ptr, v), + Field::Boolean(v) => ptr = write(ptr, v), + Field::String(v) => ptr = write(ptr, v), + Field::Text(v) => ptr = write(ptr, v), + Field::Binary(v) => ptr = write(ptr, v), + Field::Decimal(v) => ptr = write(ptr, v), + Field::Timestamp(v) => ptr = write(ptr, v), + Field::Date(v) => ptr = write(ptr, v), + Field::Json(v) => ptr = write(ptr, v), + Field::Point(v) => ptr = write(ptr, v), + Field::Duration(v) => ptr = write(ptr, v), + Field::Null => (), + } + } + } + // SAFETY: This is valid, because inner is `repr(transparent)` + let arc = unsafe { + let arc = SliceWithHeader::from_slice::>(data, &field_types); + std::mem::transmute(arc) + }; + Self(arc) + } + + pub fn load(&self) -> Vec> { + self.0 + .field_types() + .iter() + .scan(self.0.data().as_ptr(), |ptr, field_type| { + let Some(field_type) = field_type else { + return Some(FieldRef::Null); + }; + + unsafe { + let (new_ptr, value) = read_field_ref(*ptr, *field_type); + *ptr = new_ptr; + Some(value) + } + }) + .collect() + } + + #[inline(always)] + pub fn id(&self) -> usize { + Arc::as_ptr(&self.0) as *const () as usize + } +} + +impl RecordRefInner { + #[inline(always)] + fn field_types(&self) -> &[Option] { + &self.0.slice + } + + #[inline(always)] + fn data(&self) -> NonNull { + self.0.header + } +} + +impl Drop for RecordRefInner { + fn drop(&mut self) { + let mut ptr = self.data().as_ptr(); + for field in self.field_types().iter().flatten() { + unsafe { + // Read owned so all field destructors run + ptr = read_field(ptr, *field).0; + } + } + // Then deallocate the field storage + unsafe { + dealloc( + self.data().as_ptr(), + Layout::from_size_align(size(self.field_types()), MAX_ALIGN).unwrap(), + ); + } + } +} + +mod store; +pub use store::{ProcessorRecordStore, ProcessorRecordStoreDeserializer, StoreRecord}; + +#[cfg(test)] +mod tests { + use dozer_types::types::Field; + + use super::RecordRef; + + #[test] + fn test_store_load() { + let fields = vec![ + Field::String("asdf".to_owned()), + Field::Int(23), + Field::Null, + Field::U128(234), + ]; + + let record = RecordRef::new(fields.clone()); + let loaded_fields: Vec<_> = record + .load() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(&fields, &loaded_fields); + } + + #[test] + fn test_ser_de() { + let fields = vec![ + Field::String("asdf".to_owned()), + Field::Int(23), + Field::Null, + Field::U128(234), + ]; + + let record = RecordRef::new(fields.clone()); + + let bytes = dozer_types::bincode::serialize(&record).unwrap(); + let deserialized: RecordRef = dozer_types::bincode::deserialize(&bytes).unwrap(); + let loaded_fields: Vec<_> = deserialized + .load() + .into_iter() + .map(|field| field.cloned()) + .collect(); + assert_eq!(&fields, &loaded_fields); + } +} diff --git a/dozer-recordstore/src/store.rs b/dozer-recordstore/src/in_memory/store.rs similarity index 57% rename from dozer-recordstore/src/store.rs rename to dozer-recordstore/src/in_memory/store.rs index 496cdc19c1..0dc334b5c7 100644 --- a/dozer-recordstore/src/store.rs +++ b/dozer-recordstore/src/in_memory/store.rs @@ -3,33 +3,11 @@ use std::{ sync::{Arc, Weak}, }; -use dozer_types::{ - bincode, - errors::internal::BoxedError, - parking_lot::RwLock, - serde::{Deserialize, Serialize}, - thiserror::Error, - types::{Field, Lifetime, Record}, -}; - -use crate::RecordRefInner; +use dozer_types::{bincode, parking_lot::RwLock, types::Field}; -use super::{FieldRef, ProcessorRecord, RecordRef}; +use crate::RecordStoreError; -#[derive(Error, Debug)] -pub enum RecordStoreError { - #[error("Unable to deserialize type: {} - Reason: {}", typ, reason.to_string())] - DeserializationError { - typ: &'static str, - reason: BoxedError, - }, - - #[error("Unable to serialize type: {} - Reason: {}", typ, reason.to_string())] - SerializationError { - typ: &'static str, - reason: BoxedError, - }, -} +use super::{RecordRef, RecordRefInner}; pub trait StoreRecord { fn store_record(&self, record: &RecordRef) -> Result<(), RecordStoreError>; @@ -39,32 +17,6 @@ pub trait StoreRecord { self.store_record(&record)?; Ok(record) } - - fn load_ref<'a>( - &self, - record_ref: &'a RecordRef, - ) -> Result>, RecordStoreError> { - Ok(record_ref.load()) - } - - fn create_record(&self, record: &Record) -> Result { - let record_ref = self.create_ref(&record.values)?; - let mut processor_record = ProcessorRecord::new(Box::new([record_ref])); - processor_record.set_lifetime(record.lifetime.clone()); - Ok(processor_record) - } - - fn load_record(&self, processor_record: &ProcessorRecord) -> Result { - let mut record = Record::default(); - for record_ref in processor_record.values.iter() { - let fields = self.load_ref(record_ref)?; - record - .values - .extend(fields.iter().map(|field| field.cloned())); - } - record.set_lifetime(processor_record.get_lifetime()); - Ok(record) - } } #[derive(Debug)] @@ -111,19 +63,6 @@ impl ProcessorRecordStore { .get(&(record_ref.id())) .expect("RecordRef not found in ProcessorRecordStore") as u64 } - - pub fn serialize_record(&self, record: &ProcessorRecord) -> Result, bincode::Error> { - let ProcessorRecord { values, lifetime } = record; - let values = values - .iter() - .map(|value| self.serialize_ref(value)) - .collect(); - let record = ProcessorRecordForSerialization { - values, - lifetime: lifetime.clone(), - }; - bincode::serialize(&record) - } } impl StoreRecord for ProcessorRecordStore { @@ -184,22 +123,16 @@ impl ProcessorRecordStoreDeserializer { Ok(()) } - pub fn deserialize_ref(&self, index: u64) -> RecordRef { - self.inner.read().records[index as usize] + pub fn deserialize_ref(&self, index: u64) -> Result { + Ok(self + .inner + .read() + .records + .get(index as usize) + .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? .as_ref() - .unwrap_or_else(|| { - panic!("RecordRef {index} not found in ProcessorRecordStoreDeserializer") - }) - .clone() - } - - pub fn deserialize_record(&self, data: &[u8]) -> Result { - let ProcessorRecordForSerialization { values, lifetime } = bincode::deserialize(data)?; - let values = values - .iter() - .map(|index| self.deserialize_ref(*index)) - .collect(); - Ok(ProcessorRecord { values, lifetime }) + .ok_or(RecordStoreError::InMemoryRecordNotFound(index))? + .clone()) } pub fn into_record_store(self) -> ProcessorRecordStore { @@ -243,43 +176,10 @@ fn insert_record_pointer_to_index( debug_assert!(previous_index.is_none()); } -#[derive(Debug, Serialize, Deserialize)] -#[serde(crate = "dozer_types::serde")] -struct ProcessorRecordForSerialization { - values: Vec, - lifetime: Option>, -} - #[cfg(test)] mod tests { - use std::time::Duration; - - use dozer_types::types::Timestamp; - use super::*; - fn test_record() -> Record { - let mut record = Record::new(vec![ - Field::Int(1), - Field::Int(2), - Field::Int(3), - Field::Int(4), - ]); - record.lifetime = Some(Lifetime { - reference: Timestamp::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap(), - duration: Duration::from_secs(10), - }); - record - } - - #[test] - fn test_record_roundtrip() { - let record = test_record(); - let record_store = ProcessorRecordStore::new().unwrap(); - let processor_record = record_store.create_record(&record).unwrap(); - assert_eq!(record_store.load_record(&processor_record).unwrap(), record); - } - #[test] fn test_serialization_roundtrip() { let record_store = ProcessorRecordStore::new().unwrap(); @@ -304,21 +204,9 @@ mod tests { record_store.deserialize_and_extend(&data).unwrap(); let mut deserialized_record_refs = vec![]; for serialized_record_ref in serialized_record_refs { - deserialized_record_refs.push(record_store.deserialize_ref(serialized_record_ref)); + deserialized_record_refs + .push(record_store.deserialize_ref(serialized_record_ref).unwrap()); } assert_eq!(deserialized_record_refs, record_refs); } - - #[test] - fn test_record_serialization_roundtrip() { - let record_store = ProcessorRecordStore::new().unwrap(); - let record = record_store.create_record(&test_record()).unwrap(); - let serialized_record = record_store.serialize_record(&record).unwrap(); - let data = record_store.serialize_slice(0).unwrap().0; - - let record_store = ProcessorRecordStoreDeserializer::new().unwrap(); - record_store.deserialize_and_extend(&data).unwrap(); - let deserialized_record = record_store.deserialize_record(&serialized_record).unwrap(); - assert_eq!(deserialized_record, record); - } } diff --git a/dozer-recordstore/src/lib.rs b/dozer-recordstore/src/lib.rs index 3a2e203375..ef971b2b23 100644 --- a/dozer-recordstore/src/lib.rs +++ b/dozer-recordstore/src/lib.rs @@ -1,535 +1,307 @@ -//! [`RecordRef`] is a compact representation of a collection of [dozer_types::types::Field]s -//! There are two principles that make this representation more compact than `[Field]`: -//! 1. The fields and their types are stored as a Struct of Arrays instead of -//! and Array of Structs. This makes it possible to pack the discriminants -//! for the field types as a byte per field, instead of taking up a full word, -//! which is the case in [Field] (because the variant value must be aligned) -//! 2. The field values are stored packed. In a `[Field]` representation, each -//! field takes as much space as the largest enum variant in [Field] (plus its discriminant, -//! see (1.)). Instead, for the compact representation, we pack the values into -//! align_of::() sized slots. This way, a u64 takes only 8 bytes, whereas -//! a u128 can still use its 16 bytes. -use std::alloc::{dealloc, handle_alloc_error, Layout}; -use std::sync::Arc; -use std::{hash::Hash, ptr::NonNull}; - -use slice_dst::SliceWithHeader; - -use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate}; -use dozer_types::json_types::JsonValue; -use dozer_types::ordered_float::OrderedFloat; -use dozer_types::rust_decimal::Decimal; -use dozer_types::types::{DozerDuration, DozerPoint}; use dozer_types::{ + bincode, + errors::internal::BoxedError, + models::app_config::RecordStore, serde::{Deserialize, Serialize}, - types::{Field, FieldType, Lifetime}, + thiserror::{self, Error}, + types::{Field, Lifetime, Record}, }; +use in_memory::{FieldRef, StoreRecord as _}; + +#[derive(Error, Debug)] +pub enum RecordStoreError { + #[error("Unable to deserialize type: {} - Reason: {}", typ, reason.to_string())] + DeserializationError { + typ: &'static str, + reason: BoxedError, + }, + + #[error("Unable to serialize type: {} - Reason: {}", typ, reason.to_string())] + SerializationError { + typ: &'static str, + reason: BoxedError, + }, + #[error("Failed to create tempdir: {0}")] + FailedToCreateTempDir(#[source] std::io::Error), + #[error("Storage error: {0}")] + Storage(#[from] dozer_storage::errors::StorageError), + #[error("In memory record not found {0}")] + InMemoryRecordNotFound(u64), + #[error("Rocksdb record not found: {0}")] + RocksdbRecordNotFound(u64), + #[error("Bincode error: {0}")] + Bincode(#[from] bincode::Error), +} -// The alignment of an enum is necessarily the maximum alignment of its variants -// (otherwise it would be unsound to read from it). -// So, by using the alignment of `Field` as the alignment of the values in our -// packed `RecordRef`, we ensure that all accesses are aligned. -// This wastes a little bit of memory for subsequent fields that have -// smaller minimum alignment and size (such as `bool`, which has size=1, align=1), -// but in practice this should be negligible compared to the added effort of -// packing these fields while keeping everything aligned. -const MAX_ALIGN: usize = std::mem::align_of::(); - -#[repr(transparent)] -#[derive(Debug)] -/// `repr(transparent)` inner struct so we can implement drop logic on it -/// This is a `slice_dst` `SliceWithHeader` so we can make a fat Arc, saving a level -/// of indirection and a pointer which would otherwise be needed for the field types -struct RecordRefInner(SliceWithHeader, Option>); - -unsafe impl Send for RecordRefInner {} -unsafe impl Sync for RecordRefInner {} +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum RecordRef { + InMemory(in_memory::RecordRef), + Rocksdb(u64), +} -#[derive(Debug, Clone)] -pub struct RecordRef(Arc); +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct ProcessorRecord { + /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage. + /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity + values: Box<[RecordRef]>, -impl PartialEq for RecordRef { - fn eq(&self, other: &Self) -> bool { - self.load() == other.load() - } + /// Time To Live for this record. If the value is None, the record will never expire. + lifetime: Option>, } -impl Eq for RecordRef {} - -impl Hash for RecordRef { - fn hash(&self, state: &mut H) { - self.load().hash(state) +impl ProcessorRecord { + pub fn new(values: Box<[RecordRef]>) -> Self { + Self { + values, + ..Default::default() + } } -} -impl<'de> Deserialize<'de> for RecordRef { - fn deserialize(deserializer: D) -> Result - where - D: dozer_types::serde::Deserializer<'de>, - { - let fields = Vec::::deserialize(deserializer)?; - let owned_fields: Vec<_> = fields.iter().map(FieldRef::cloned).collect(); - Ok(Self::new(owned_fields)) + pub fn get_lifetime(&self) -> Option { + self.lifetime.as_ref().map(|lifetime| *lifetime.clone()) } -} -impl Serialize for RecordRef { - fn serialize(&self, serializer: S) -> Result - where - S: dozer_types::serde::Serializer, - { - self.load().serialize(serializer) + pub fn set_lifetime(&mut self, lifetime: Option) { + self.lifetime = lifetime.map(Box::new); } -} - -#[inline(always)] -unsafe fn adjust_alignment(ptr: *mut u8) -> *mut u8 { - ptr.add(ptr.align_offset(std::mem::align_of::())) -} -/// # Safety -/// ptr should be valid for writing a `T`, -/// that is, ptr..ptr + size_of:: should be inside a single live allocation -unsafe fn write(ptr: *mut u8, value: T) -> *mut u8 { - let ptr = adjust_alignment::(ptr) as *mut T; - ptr.write(value); - ptr.add(1) as *mut u8 -} -/// # Safety -/// ptr should be valid for reading a `T`, -/// that is, ptr..ptr + size_of:: should be inside a single live allocation -/// and the memory read should be initialized. -/// The returned reference is only valid as long as pointed to memory is valid -/// for reading. -unsafe fn read_ref<'a, T>(ptr: *mut u8) -> (*mut u8, &'a T) { - let ptr = adjust_alignment::(ptr) as *mut T; - let result = &*ptr; - (ptr.add(1) as *mut u8, result) -} + pub fn values(&self) -> &[RecordRef] { + &self.values + } -/// # Safety -/// ptr should be valid for reading a `T`, -/// that is, ptr..ptr + size_of:: should be inside a single live allocation -/// and the memory read should be initialized. -/// This takes ownership of the memory returned as `T`, which means dropping `T` -/// may make future reads from `ptr` undefined behavior -unsafe fn read(ptr: *mut u8) -> (*mut u8, T) { - let ptr = adjust_alignment::(ptr) as *mut T; - let result = ptr.read(); - (ptr.add(1) as *mut u8, result) + pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self { + let mut values = Vec::with_capacity(existing.values().len() + 1); + values.extend_from_slice(existing.values()); + values.push(additional); + Self::new(values.into_boxed_slice()) + } } -/// # Safety -/// `ptr` should be valid for reading the contents of a `Field` with the type -/// corresponding to `field_type`. -/// See `read_ref` -unsafe fn read_field_ref<'a>(ptr: *mut u8, field_type: FieldType) -> (*mut u8, FieldRef<'a>) { - match field_type { - FieldType::UInt => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::UInt(*value)) - } - FieldType::U128 => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::U128(*value)) - } +pub trait StoreRecord { + fn create_ref(&self, values: &[Field]) -> Result; - FieldType::Int => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Int(*value)) - } + fn load_ref(&self, record_ref: &RecordRef) -> Result, RecordStoreError>; - FieldType::I128 => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::I128(*value)) - } + fn create_record(&self, record: &Record) -> Result { + let record_ref = self.create_ref(&record.values)?; + let mut processor_record = ProcessorRecord::new(Box::new([record_ref])); + processor_record.set_lifetime(record.lifetime.clone()); + Ok(processor_record) + } - FieldType::Float => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Float(*value)) + fn load_record(&self, processor_record: &ProcessorRecord) -> Result { + let mut record = Record::default(); + for record_ref in processor_record.values.iter() { + let fields = self.load_ref(record_ref)?; + record.values.extend(fields); } + record.set_lifetime(processor_record.get_lifetime()); + Ok(record) + } +} - FieldType::Boolean => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Boolean(*value)) - } +#[derive(Debug)] +pub enum ProcessorRecordStore { + InMemory(in_memory::ProcessorRecordStore), + Rocksdb(rocksdb::ProcessorRecordStore), +} - FieldType::String => { - let (ptr, value): (_, &String) = read_ref(ptr); - (ptr, FieldRef::String(value)) - } - FieldType::Text => { - let (ptr, value): (_, &String) = read_ref(ptr); - (ptr, FieldRef::Text(value)) - } - FieldType::Binary => { - let (ptr, value): (_, &Vec) = read_ref(ptr); - (ptr, FieldRef::Binary(value)) - } - FieldType::Decimal => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Decimal(*value)) - } - FieldType::Timestamp => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Timestamp(*value)) - } - FieldType::Date => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Date(*value)) - } - FieldType::Json => { - let (ptr, value) = read_ref::(ptr); - (ptr, FieldRef::Json(value.to_owned())) - } - FieldType::Point => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Point(*value)) - } - FieldType::Duration => { - let (ptr, value) = read_ref(ptr); - (ptr, FieldRef::Duration(*value)) +impl ProcessorRecordStore { + pub fn new(record_store: RecordStore) -> Result { + match record_store { + RecordStore::InMemory => Ok(Self::InMemory(in_memory::ProcessorRecordStore::new()?)), + RecordStore::Rocksdb => Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new()?)), } } -} -unsafe fn read_field(ptr: *mut u8, field_type: FieldType) -> (*mut u8, Field) { - match field_type { - FieldType::UInt => { - let (ptr, value) = read(ptr); - (ptr, Field::UInt(value)) - } - FieldType::U128 => { - let (ptr, value) = read(ptr); - (ptr, Field::U128(value)) - } - FieldType::Int => { - let (ptr, value) = read(ptr); - (ptr, Field::Int(value)) + pub fn num_records(&self) -> usize { + match self { + Self::InMemory(store) => store.num_records(), + Self::Rocksdb(store) => store.num_records(), } + } - FieldType::I128 => { - let (ptr, value) = read(ptr); - (ptr, Field::I128(value)) + pub fn serialize_slice(&self, start: usize) -> Result<(Vec, usize), RecordStoreError> { + match self { + Self::InMemory(store) => store.serialize_slice(start), + Self::Rocksdb(store) => store.serialize_slice(start), } + } - FieldType::Float => { - let (ptr, value) = read(ptr); - (ptr, Field::Float(value)) - } + pub fn serialize_record(&self, record: &ProcessorRecord) -> Result, bincode::Error> { + let ProcessorRecord { values, lifetime } = record; + let values = values + .iter() + .map(|value| match (value, self) { + (RecordRef::InMemory(record_ref), ProcessorRecordStore::InMemory(record_store)) => { + record_store.serialize_ref(record_ref) + } + (RecordRef::Rocksdb(record_ref), _) => *record_ref, + _ => panic!("In memory record ref cannot be serialized by rocksdb record store"), + }) + .collect(); + let record = ProcessorRecordForSerialization { + values, + lifetime: lifetime.clone(), + }; + bincode::serialize(&record) + } +} - FieldType::Boolean => { - let (ptr, value) = read(ptr); - (ptr, Field::Boolean(value)) +impl StoreRecord for ProcessorRecordStore { + fn create_ref(&self, values: &[Field]) -> Result { + match self { + Self::InMemory(store) => Ok(RecordRef::InMemory(store.create_ref(values)?)), + Self::Rocksdb(store) => Ok(RecordRef::Rocksdb(store.create_ref(values)?)), } + } - FieldType::String => { - let (ptr, value) = read(ptr); - (ptr, Field::String(value)) - } - FieldType::Text => { - let (ptr, value) = read(ptr); - (ptr, Field::String(value)) - } - FieldType::Binary => { - let (ptr, value) = read(ptr); - (ptr, Field::Binary(value)) - } - FieldType::Decimal => { - let (ptr, value) = read(ptr); - (ptr, Field::Decimal(value)) - } - FieldType::Timestamp => { - let (ptr, value) = read(ptr); - (ptr, Field::Timestamp(value)) - } - FieldType::Date => { - let (ptr, value) = read(ptr); - (ptr, Field::Date(value)) - } - FieldType::Json => { - let (ptr, value) = read::(ptr); - (ptr, Field::Json(value)) - } - FieldType::Point => { - let (ptr, value) = read(ptr); - (ptr, Field::Point(value)) - } - FieldType::Duration => { - let (ptr, value) = read(ptr); - (ptr, Field::Duration(value)) + fn load_ref(&self, record_ref: &RecordRef) -> Result, RecordStoreError> { + match (record_ref, self) { + (RecordRef::InMemory(record_ref), _) => Ok(load_in_memory_record_ref(record_ref)), + (RecordRef::Rocksdb(record_ref), ProcessorRecordStore::Rocksdb(record_store)) => { + Ok(record_store.load_ref(record_ref)?) + } + _ => panic!("Rocksdb record ref cannot be loaded by in memory record store"), } } } -#[inline(always)] -fn add_field_size(size: &mut usize) { - let align = std::mem::align_of::(); - // Align the start of the field - *size = (*size + (align - 1)) & !(align - 1); - *size += std::mem::size_of::(); +#[derive(Debug)] +pub enum ProcessorRecordStoreDeserializer { + InMemory(in_memory::ProcessorRecordStoreDeserializer), + Rocksdb(rocksdb::ProcessorRecordStore), } -fn size(fields: &[Option]) -> usize { - let mut size = 0; - for field in fields.iter().flatten() { - match field { - FieldType::UInt => add_field_size::(&mut size), - FieldType::U128 => add_field_size::(&mut size), - FieldType::Int => add_field_size::(&mut size), - FieldType::I128 => add_field_size::(&mut size), - FieldType::Float => add_field_size::>(&mut size), - FieldType::Boolean => add_field_size::(&mut size), - FieldType::String => add_field_size::(&mut size), - FieldType::Text => add_field_size::(&mut size), - FieldType::Binary => add_field_size::>(&mut size), - FieldType::Decimal => add_field_size::(&mut size), - FieldType::Timestamp => add_field_size::>(&mut size), - FieldType::Date => add_field_size::(&mut size), - FieldType::Json => add_field_size::(&mut size), - FieldType::Point => add_field_size::(&mut size), - FieldType::Duration => add_field_size::(&mut size), + +impl ProcessorRecordStoreDeserializer { + pub fn new(record_store: RecordStore) -> Result { + match record_store { + RecordStore::InMemory => Ok(Self::InMemory( + in_memory::ProcessorRecordStoreDeserializer::new()?, + )), + RecordStore::Rocksdb => Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new()?)), } } - size -} - -#[derive(Hash, Serialize, Deserialize, Debug, PartialEq, Eq)] -#[serde(crate = "dozer_types::serde")] -pub enum FieldRef<'a> { - UInt(u64), - U128(u128), - Int(i64), - I128(i128), - Float(OrderedFloat), - Boolean(bool), - String(&'a str), - Text(&'a str), - Binary(&'a [u8]), - Decimal(Decimal), - Timestamp(DateTime), - Date(NaiveDate), - Json(JsonValue), - Point(DozerPoint), - Duration(DozerDuration), - Null, -} -impl FieldRef<'_> { - pub fn cloned(&self) -> Field { + pub fn deserialize_and_extend(&self, data: &[u8]) -> Result<(), RecordStoreError> { match self { - FieldRef::UInt(v) => Field::UInt(*v), - FieldRef::U128(v) => Field::U128(*v), - FieldRef::Int(v) => Field::Int(*v), - FieldRef::I128(v) => Field::I128(*v), - FieldRef::Float(v) => Field::Float(*v), - FieldRef::Boolean(v) => Field::Boolean(*v), - FieldRef::String(v) => Field::String((*v).to_owned()), - FieldRef::Text(v) => Field::Text((*v).to_owned()), - FieldRef::Binary(v) => Field::Binary((*v).to_vec()), - FieldRef::Decimal(v) => Field::Decimal(*v), - FieldRef::Timestamp(v) => Field::Timestamp(*v), - FieldRef::Date(v) => Field::Date(*v), - FieldRef::Json(v) => Field::Json(v.clone()), - FieldRef::Point(v) => Field::Point(*v), - FieldRef::Duration(v) => Field::Duration(*v), - FieldRef::Null => Field::Null, + Self::InMemory(store) => store.deserialize_and_extend(data), + Self::Rocksdb(store) => store.deserialize_and_extend(data), } } -} -impl RecordRef { - pub fn new(fields: Vec) -> Self { - let field_types = fields - .iter() - .map(|field| field.ty()) - .collect::]>>(); - let size = size(&field_types); - - let layout = Layout::from_size_align(size, MAX_ALIGN).unwrap(); - // SAFETY: Everything is `ALIGN` byte aligned - let data = unsafe { - let data = std::alloc::alloc(layout); - if data.is_null() { - handle_alloc_error(layout); - } - data - }; - // SAFETY: We checked for null above - let data = unsafe { NonNull::new_unchecked(data) }; - let mut ptr = data.as_ptr(); - - // SAFETY: - // - ptr is non-null (we got it from a NonNull) - // - ptr is dereferencable (its memory range is large enough and not de-allocated) - // - unsafe { - for field in fields { - match field { - Field::UInt(v) => ptr = write(ptr, v), - Field::U128(v) => ptr = write(ptr, v), - Field::Int(v) => ptr = write(ptr, v), - Field::I128(v) => ptr = write(ptr, v), - Field::Float(v) => ptr = write(ptr, v), - Field::Boolean(v) => ptr = write(ptr, v), - Field::String(v) => ptr = write(ptr, v), - Field::Text(v) => ptr = write(ptr, v), - Field::Binary(v) => ptr = write(ptr, v), - Field::Decimal(v) => ptr = write(ptr, v), - Field::Timestamp(v) => ptr = write(ptr, v), - Field::Date(v) => ptr = write(ptr, v), - Field::Json(v) => ptr = write(ptr, v), - Field::Point(v) => ptr = write(ptr, v), - Field::Duration(v) => ptr = write(ptr, v), - Field::Null => (), + pub fn deserialize_record(&self, data: &[u8]) -> Result { + let ProcessorRecordForSerialization { values, lifetime } = bincode::deserialize(data)?; + let mut deserialized_values = Vec::with_capacity(values.len()); + for value in values { + match self { + Self::InMemory(record_store) => { + let record_ref = record_store.deserialize_ref(value)?; + deserialized_values.push(RecordRef::InMemory(record_ref)); } + Self::Rocksdb(_) => deserialized_values.push(RecordRef::Rocksdb(value)), } } - // SAFETY: This is valid, because inner is `repr(transparent)` - let arc = unsafe { - let arc = SliceWithHeader::from_slice::>(data, &field_types); - std::mem::transmute(arc) - }; - Self(arc) + Ok(ProcessorRecord { + values: deserialized_values.into(), + lifetime, + }) } - pub fn load(&self) -> Vec> { - self.0 - .field_types() - .iter() - .scan(self.0.data().as_ptr(), |ptr, field_type| { - let Some(field_type) = field_type else { - return Some(FieldRef::Null); - }; - - unsafe { - let (new_ptr, value) = read_field_ref(*ptr, *field_type); - *ptr = new_ptr; - Some(value) - } - }) - .collect() - } - - #[inline(always)] - pub fn id(&self) -> usize { - Arc::as_ptr(&self.0) as *const () as usize + pub fn into_record_store(self) -> ProcessorRecordStore { + match self { + Self::InMemory(record_store) => { + ProcessorRecordStore::InMemory(record_store.into_record_store()) + } + Self::Rocksdb(record_store) => ProcessorRecordStore::Rocksdb(record_store), + } } } -impl RecordRefInner { - #[inline(always)] - fn field_types(&self) -> &[Option] { - &self.0.slice +impl StoreRecord for ProcessorRecordStoreDeserializer { + fn create_ref(&self, values: &[Field]) -> Result { + match self { + Self::InMemory(store) => Ok(RecordRef::InMemory(store.create_ref(values)?)), + Self::Rocksdb(store) => Ok(RecordRef::Rocksdb(store.create_ref(values)?)), + } } - #[inline(always)] - fn data(&self) -> NonNull { - self.0.header + fn load_ref(&self, record_ref: &RecordRef) -> Result, RecordStoreError> { + match (record_ref, self) { + (RecordRef::InMemory(record_ref), _) => Ok(load_in_memory_record_ref(record_ref)), + ( + RecordRef::Rocksdb(record_ref), + ProcessorRecordStoreDeserializer::Rocksdb(record_store), + ) => Ok(record_store.load_ref(record_ref)?), + _ => panic!("Rocksdb record ref cannot be loaded by in memory record store"), + } } } -impl Drop for RecordRefInner { - fn drop(&mut self) { - let mut ptr = self.data().as_ptr(); - for field in self.field_types().iter().flatten() { - unsafe { - // Read owned so all field destructors run - ptr = read_field(ptr, *field).0; - } - } - // Then deallocate the field storage - unsafe { - dealloc( - self.data().as_ptr(), - Layout::from_size_align(size(self.field_types()), MAX_ALIGN).unwrap(), - ); - } - } +fn load_in_memory_record_ref(record_ref: &in_memory::RecordRef) -> Vec { + record_ref.load().iter().map(FieldRef::cloned).collect() } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] -pub struct ProcessorRecord { - /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage. - /// This is a Box<[]> instead of a Vec to save space on storing the vec's capacity - values: Box<[RecordRef]>, +mod in_memory; +mod rocksdb; - /// Time To Live for this record. If the value is None, the record will never expire. +#[derive(Debug, Serialize, Deserialize)] +#[serde(crate = "dozer_types::serde")] +struct ProcessorRecordForSerialization { + values: Vec, lifetime: Option>, } -impl ProcessorRecord { - pub fn new(values: Box<[RecordRef]>) -> Self { - Self { - values, - ..Default::default() - } - } - - pub fn get_lifetime(&self) -> Option { - self.lifetime.as_ref().map(|lifetime| *lifetime.clone()) - } - pub fn set_lifetime(&mut self, lifetime: Option) { - self.lifetime = lifetime.map(Box::new); +#[cfg(test)] +mod tests { + use std::time::Duration; + + use dozer_types::types::Timestamp; + + use super::*; + + fn test_record() -> Record { + let mut record = Record::new(vec![ + Field::Int(1), + Field::Int(2), + Field::Int(3), + Field::Int(4), + ]); + record.lifetime = Some(Lifetime { + reference: Timestamp::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap(), + duration: Duration::from_secs(10), + }); + record } - pub fn values(&self) -> &[RecordRef] { - &self.values + fn test_record_roundtrip_impl(record_store_kind: RecordStore) { + let record = test_record(); + let record_store = ProcessorRecordStore::new(record_store_kind).unwrap(); + let processor_record = record_store.create_record(&record).unwrap(); + assert_eq!(record_store.load_record(&processor_record).unwrap(), record); } - pub fn appended(existing: &ProcessorRecord, additional: RecordRef) -> Self { - let mut values = Vec::with_capacity(existing.values().len() + 1); - values.extend_from_slice(existing.values()); - values.push(additional); - Self::new(values.into_boxed_slice()) + #[test] + fn test_record_roundtrip() { + test_record_roundtrip_impl(RecordStore::InMemory); + test_record_roundtrip_impl(RecordStore::Rocksdb); } -} - -mod store; -pub use store::{ - ProcessorRecordStore, ProcessorRecordStoreDeserializer, RecordStoreError, StoreRecord, -}; -#[cfg(test)] -mod tests { - use dozer_types::types::Field; + fn test_record_serialization_roundtrip_impl(record_store_kind: RecordStore) { + let record_store = ProcessorRecordStore::new(record_store_kind).unwrap(); + let record = record_store.create_record(&test_record()).unwrap(); + let serialized_record = record_store.serialize_record(&record).unwrap(); + let data = record_store.serialize_slice(0).unwrap().0; - use crate::RecordRef; - - #[test] - fn test_store_load() { - let fields = vec![ - Field::String("asdf".to_owned()), - Field::Int(23), - Field::Null, - Field::U128(234), - ]; - - let record = RecordRef::new(fields.clone()); - let loaded_fields: Vec<_> = record - .load() - .into_iter() - .map(|field| field.cloned()) - .collect(); - assert_eq!(&fields, &loaded_fields); + let record_store = ProcessorRecordStoreDeserializer::new(record_store_kind).unwrap(); + record_store.deserialize_and_extend(&data).unwrap(); + let deserialized_record = record_store.deserialize_record(&serialized_record).unwrap(); + assert_eq!(deserialized_record, record); } #[test] - fn test_ser_de() { - let fields = vec![ - Field::String("asdf".to_owned()), - Field::Int(23), - Field::Null, - Field::U128(234), - ]; - - let record = RecordRef::new(fields.clone()); - - let bytes = dozer_types::bincode::serialize(&record).unwrap(); - let deserialized: RecordRef = dozer_types::bincode::deserialize(&bytes).unwrap(); - let loaded_fields: Vec<_> = deserialized - .load() - .into_iter() - .map(|field| field.cloned()) - .collect(); - assert_eq!(&fields, &loaded_fields); + fn test_record_serialization_roundtrip() { + test_record_serialization_roundtrip_impl(RecordStore::InMemory); + // TODO: enable this test when serialization is implemented for rocksdb + // test_record_serialization_roundtrip_impl(RecordStore::Rocksdb); } } diff --git a/dozer-recordstore/src/rocksdb.rs b/dozer-recordstore/src/rocksdb.rs new file mode 100644 index 0000000000..09a82d2d0b --- /dev/null +++ b/dozer-recordstore/src/rocksdb.rs @@ -0,0 +1,54 @@ +use std::sync::atomic::AtomicU64; + +use dozer_storage::RocksdbMap; +use dozer_types::types::Field; +use tempdir::TempDir; + +use crate::RecordStoreError; + +#[derive(Debug)] +pub struct ProcessorRecordStore { + _temp_dir: TempDir, + next_id: AtomicU64, + records: RocksdbMap>, +} + +impl ProcessorRecordStore { + pub fn new() -> Result { + let temp_dir = TempDir::new("rocksdb_processor_record_store") + .map_err(RecordStoreError::FailedToCreateTempDir)?; + let records = RocksdbMap::>::create(temp_dir.path())?; + + Ok(Self { + _temp_dir: temp_dir, + next_id: AtomicU64::new(0), + records, + }) + } + + pub fn num_records(&self) -> usize { + self.next_id.load(std::sync::atomic::Ordering::SeqCst) as usize + } + + pub fn create_ref(&self, values: &[Field]) -> Result { + let id = self + .next_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.records.insert(&id, values)?; + Ok(id) + } + + pub fn load_ref(&self, record_ref: &u64) -> Result, RecordStoreError> { + self.records + .get(record_ref)? + .ok_or(RecordStoreError::RocksdbRecordNotFound(*record_ref)) + } + + pub fn serialize_slice(&self, _start: usize) -> Result<(Vec, usize), RecordStoreError> { + todo!("implement rocksdb record store checkpointing") + } + + pub fn deserialize_and_extend(&self, _data: &[u8]) -> Result<(), RecordStoreError> { + todo!("implement rocksdb record store checkpointing") + } +} diff --git a/dozer-sql/src/expression/tests/test_common.rs b/dozer-sql/src/expression/tests/test_common.rs index 7a1050ea6a..1ed845f599 100644 --- a/dozer-sql/src/expression/tests/test_common.rs +++ b/dozer-sql/src/expression/tests/test_common.rs @@ -19,7 +19,7 @@ impl ProcessorChannelForwarder for TestChannelForwarder { } pub(crate) fn run_fct(sql: &str, schema: Schema, input: Vec) -> Field { - let record_store = ProcessorRecordStoreDeserializer::new().unwrap(); + let record_store = ProcessorRecordStoreDeserializer::new(Default::default()).unwrap(); let select = get_select(sql).unwrap(); let processor_factory = diff --git a/dozer-sql/src/product/join/operator/table.rs b/dozer-sql/src/product/join/operator/table.rs index 661c10c4a7..0d64a71612 100644 --- a/dozer-sql/src/product/join/operator/table.rs +++ b/dozer-sql/src/product/join/operator/table.rs @@ -326,7 +326,7 @@ mod tests { }], primary_index: vec![0], }; - let record_store = ProcessorRecordStoreDeserializer::new().unwrap(); + let record_store = ProcessorRecordStoreDeserializer::new(Default::default()).unwrap(); let mut table = JoinTable::new(&schema, vec![0], &record_store, true, None).unwrap(); let record = Record::new(vec![Field::Int(1)]); diff --git a/dozer-sql/src/product/set/record_map/mod.rs b/dozer-sql/src/product/set/record_map/mod.rs index 8e011342df..c8b4975656 100644 --- a/dozer-sql/src/product/set/record_map/mod.rs +++ b/dozer-sql/src/product/set/record_map/mod.rs @@ -170,7 +170,7 @@ mod tests { }; fn test_map(mut map: CountingRecordMapEnum) { - let record_store = ProcessorRecordStore::new().unwrap(); + let record_store = ProcessorRecordStore::new(Default::default()).unwrap(); let make_record = |fields: Vec| -> ProcessorRecord { record_store.create_record(&Record::new(fields)).unwrap() }; diff --git a/dozer-sql/src/table_operator/tests/operator_test.rs b/dozer-sql/src/table_operator/tests/operator_test.rs index a2771bb489..a73024d6b9 100644 --- a/dozer-sql/src/table_operator/tests/operator_test.rs +++ b/dozer-sql/src/table_operator/tests/operator_test.rs @@ -36,7 +36,7 @@ fn test_lifetime() { ) .to_owned(); - let record_store = ProcessorRecordStore::new().unwrap(); + let record_store = ProcessorRecordStore::new(Default::default()).unwrap(); let record = Record::new(vec![ Field::Int(0), Field::Timestamp(DateTime::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap()), diff --git a/dozer-sql/src/utils/serialize.rs b/dozer-sql/src/utils/serialize.rs index 8fee0b271d..4e364f2759 100644 --- a/dozer-sql/src/utils/serialize.rs +++ b/dozer-sql/src/utils/serialize.rs @@ -47,6 +47,8 @@ pub enum DeserializationError { NotEnoughData { requested: usize, remaining: usize }, #[error("bincode error: {0}")] Bincode(#[from] bincode::Error), + #[error("record store error: {0}")] + RecordStore(#[from] dozer_recordstore::RecordStoreError), } pub fn serialize_u64(value: u64, object: &mut Object) -> Result<(), SerializationError> { diff --git a/dozer-sql/src/window/tests/operator_test.rs b/dozer-sql/src/window/tests/operator_test.rs index 0b3d7d2e04..426cf808f1 100644 --- a/dozer-sql/src/window/tests/operator_test.rs +++ b/dozer-sql/src/window/tests/operator_test.rs @@ -9,7 +9,7 @@ use crate::window::operator::WindowType; #[test] fn test_hop() { - let record_store = ProcessorRecordStore::new().unwrap(); + let record_store = ProcessorRecordStore::new(Default::default()).unwrap(); let record = record_store .create_record(&Record::new(vec![ Field::Int(0), @@ -61,7 +61,7 @@ fn test_hop() { #[test] fn test_tumble() { - let record_store = ProcessorRecordStore::new().unwrap(); + let record_store = ProcessorRecordStore::new(Default::default()).unwrap(); let record = record_store .create_record(&Record::new(vec![ Field::Int(0), diff --git a/dozer-storage/Cargo.toml b/dozer-storage/Cargo.toml index 56a57ff7eb..92c4a54529 100644 --- a/dozer-storage/Cargo.toml +++ b/dozer-storage/Cargo.toml @@ -13,6 +13,7 @@ lmdb-rkv = "0.14.0" lmdb-rkv-sys = "0.11.2" page_size = "0.5.0" pin-project = "1.1.0" +rocksdb = "0.21.0" tokio = "1.28.2" [dev-dependencies] diff --git a/dozer-storage/src/errors.rs b/dozer-storage/src/errors.rs index 07f846ee69..5a76169c89 100644 --- a/dozer-storage/src/errors.rs +++ b/dozer-storage/src/errors.rs @@ -31,6 +31,9 @@ pub enum StorageError { // Error forwarding #[error("Lmdb error: {0}")] Lmdb(#[from] lmdb::Error), + + #[error("Rocksdb error: {0}")] + Rocksdb(#[from] rocksdb::Error), } #[derive(Debug, Error)] diff --git a/dozer-storage/src/lib.rs b/dozer-storage/src/lib.rs index f8f32e8ce8..16b3962a78 100644 --- a/dozer-storage/src/lib.rs +++ b/dozer-storage/src/lib.rs @@ -17,6 +17,8 @@ mod lmdb_counter; pub use lmdb_counter::LmdbCounter; mod lmdb_option; pub use lmdb_option::LmdbOption; +mod rocksdb_map; +pub use rocksdb_map::RocksdbMap; #[cfg(test)] mod tests; diff --git a/dozer-storage/src/lmdb_database/lmdb_val.rs b/dozer-storage/src/lmdb_database/lmdb_val.rs index 6c50636811..044d5ae871 100644 --- a/dozer-storage/src/lmdb_database/lmdb_val.rs +++ b/dozer-storage/src/lmdb_database/lmdb_val.rs @@ -1,6 +1,6 @@ use dozer_types::{ borrow::{Borrow, Cow}, - types::{IndexDefinition, Record, SchemaWithIndex}, + types::{Field, IndexDefinition, Record, SchemaWithIndex}, }; use crate::errors::{InvalidBool, StorageError}; @@ -223,6 +223,34 @@ unsafe impl LmdbKey for String { const TYPE: LmdbKeyType = LmdbKeyType::VariableSize; } +impl<'a> Encode<'a> for &'a [Field] { + fn encode(self) -> Result, StorageError> { + dozer_types::bincode::serialize(self) + .map(Encoded::Vec) + .map_err(|e| StorageError::SerializationError { + typ: "[Field]", + reason: Box::new(e), + }) + } +} + +impl BorrowEncode for Vec { + type Encode<'a> = &'a [Field]; +} + +impl Decode for Vec { + fn decode(bytes: &[u8]) -> Result, StorageError> { + dozer_types::bincode::deserialize(bytes) + .map(Cow::Owned) + .map_err(|e| StorageError::DeserializationError { + typ: "Vec", + reason: Box::new(e), + }) + } +} + +unsafe impl LmdbVal for Vec {} + impl<'a> Encode<'a> for &'a Record { fn encode(self) -> Result, StorageError> { dozer_types::bincode::serialize(self) diff --git a/dozer-storage/src/rocksdb_map.rs b/dozer-storage/src/rocksdb_map.rs new file mode 100644 index 0000000000..f2b59e73d6 --- /dev/null +++ b/dozer-storage/src/rocksdb_map.rs @@ -0,0 +1,69 @@ +use std::path::Path; + +use rocksdb::DB; + +use dozer_types::borrow::IntoOwned; + +use crate::{errors::StorageError, BorrowEncode, Encode, LmdbVal}; + +#[derive(Debug)] +pub struct RocksdbMap { + db: DB, + _key: std::marker::PhantomData, + _value: std::marker::PhantomData, +} + +impl RocksdbMap +where + for<'a> V::Borrowed<'a>: IntoOwned, +{ + pub fn create(path: &Path) -> Result { + let db = DB::open_default(path)?; + Ok(Self { + db, + _key: std::marker::PhantomData, + _value: std::marker::PhantomData, + }) + } + + pub fn count(&self) -> Result { + Ok(self + .db + .property_int_value("rocksdb.estimate-num-keys")? + .expect("rocksdb.estimate-num-keys") as usize) + } + + pub fn get(&self, key: K::Encode<'_>) -> Result, StorageError> { + let key = key.encode()?; + let value = self.db.get_pinned(key)?; + if let Some(value) = value { + let value = V::decode(&value)?; + Ok(Some(value.into_owned())) + } else { + Ok(None) + } + } + + pub fn contains(&self, key: K::Encode<'_>) -> Result { + let key = key.encode()?; + let value = self.db.get_pinned(key)?; + Ok(value.is_some()) + } + + pub fn insert(&self, key: K::Encode<'_>, value: V::Encode<'_>) -> Result<(), StorageError> { + let key = key.encode()?; + let value = value.encode()?; + self.db.put(key, value)?; + Ok(()) + } + + pub fn remove(&self, key: K::Encode<'_>) -> Result<(), StorageError> { + let key = key.encode()?; + self.db.delete(key)?; + Ok(()) + } + + pub fn flush(&self) -> Result<(), StorageError> { + self.db.flush().map_err(Into::into) + } +} diff --git a/dozer-types/src/models/app_config.rs b/dozer-types/src/models/app_config.rs index dc1308b2f6..13647565d2 100644 --- a/dozer-types/src/models/app_config.rs +++ b/dozer-types/src/models/app_config.rs @@ -37,6 +37,10 @@ pub struct AppConfig { #[serde(skip_serializing_if = "Option::is_none")] /// The maximum time in seconds before a new checkpoint is created. If there're no new records, no checkpoint will be created. pub max_interval_before_persist_in_seconds: Option, + + #[serde(default, skip_serializing_if = "equal_default")] + /// The record store to use for the processors. + pub record_store: RecordStore, } #[derive(Debug, JsonSchema, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] @@ -54,6 +58,14 @@ pub struct S3Storage { pub bucket_name: String, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)] +#[serde(deny_unknown_fields)] +pub enum RecordStore { + #[default] + InMemory, + Rocksdb, +} + pub fn default_persist_queue_capacity() -> u32 { 100 } diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index d28aa9caaa..84a3a24d2d 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -326,6 +326,14 @@ ], "format": "uint32", "minimum": 0.0 + }, + "record_store": { + "description": "The record store to use for the processors.", + "allOf": [ + { + "$ref": "#/definitions/RecordStore" + } + ] } }, "additionalProperties": false @@ -1298,6 +1306,13 @@ }, "additionalProperties": false }, + "RecordStore": { + "type": "string", + "enum": [ + "InMemory", + "Rocksdb" + ] + }, "RefreshConfig": { "type": "string", "enum": [