From 154546a3a45e5d084628c720f6cff9d9b388578d Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Fri, 28 Apr 2023 18:13:13 +0800 Subject: [PATCH] feat: assemble runtime crdt state (#408) * feat: calc clock in update refs * feat: add refs * feat: impl store * feat: add utils for doc store * feat: add state vector & self check * feat: doc integrate skeleton * feat: new id * test: `get_state` & `get_state_vector` * test: `add_item` & `get_item` * feat: init doc struct * feat: integrate part2 * feat: crdt trait * chore: move update_missing_sv * feat: add split item * feat: doc integrate part3 * chore: ignore dead code temporarily * chore: fix clippy lint --- Cargo.lock | 3 + libs/jwst-codec/Cargo.toml | 6 + libs/jwst-codec/src/doc/{ => codec}/any.rs | 2 +- .../jwst-codec/src/doc/{ => codec}/content.rs | 25 +- libs/jwst-codec/src/doc/{ => codec}/id.rs | 10 +- libs/jwst-codec/src/doc/{ => codec}/item.rs | 6 +- libs/jwst-codec/src/doc/codec/mod.rs | 20 + libs/jwst-codec/src/doc/codec/refs.rs | 177 ++++++++ libs/jwst-codec/src/doc/codec/update.rs | 54 +++ libs/jwst-codec/src/doc/document.rs | 122 ++++++ libs/jwst-codec/src/doc/mod.rs | 18 +- libs/jwst-codec/src/doc/store.rs | 388 ++++++++++++++++++ libs/jwst-codec/src/doc/traits.rs | 29 ++ libs/jwst-codec/src/doc/update.rs | 109 ----- libs/jwst-codec/src/lib.rs | 38 +- 15 files changed, 868 insertions(+), 139 deletions(-) rename libs/jwst-codec/src/doc/{ => codec}/any.rs (98%) rename libs/jwst-codec/src/doc/{ => codec}/content.rs (81%) rename libs/jwst-codec/src/doc/{ => codec}/id.rs (56%) rename libs/jwst-codec/src/doc/{ => codec}/item.rs (97%) create mode 100644 libs/jwst-codec/src/doc/codec/mod.rs create mode 100644 libs/jwst-codec/src/doc/codec/refs.rs create mode 100644 libs/jwst-codec/src/doc/codec/update.rs create mode 100644 libs/jwst-codec/src/doc/document.rs create mode 100644 libs/jwst-codec/src/doc/store.rs create mode 100644 libs/jwst-codec/src/doc/traits.rs delete mode 100644 libs/jwst-codec/src/doc/update.rs diff --git a/Cargo.lock b/Cargo.lock index 9c772ea79..d58ef2b91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2114,11 +2114,14 @@ dependencies = [ "bitvec", "byteorder", "criterion", + "jwst-logger", "lib0", + "nanoid", "nom", "rand 0.8.5", "serde", "serde_json", + "thiserror", ] [[package]] diff --git a/libs/jwst-codec/Cargo.toml b/libs/jwst-codec/Cargo.toml index 04fb6ceb4..37f6081fc 100644 --- a/libs/jwst-codec/Cargo.toml +++ b/libs/jwst-codec/Cargo.toml @@ -10,8 +10,14 @@ license = "AGPL-3.0-only" [dependencies] bitvec = "1.0.1" byteorder = "1.4.3" +nanoid = "0.4.0" nom = "7.1.3" +rand = "0.8.5" serde_json = "1.0.94" +thiserror = "1.0.40" + +# ======= workspace dependencies ======= +jwst-logger = { path = "../jwst-logger" } [dev-dependencies] criterion = { version = "0.4.0", features = ["html_reports"] } diff --git a/libs/jwst-codec/src/doc/any.rs b/libs/jwst-codec/src/doc/codec/any.rs similarity index 98% rename from libs/jwst-codec/src/doc/any.rs rename to libs/jwst-codec/src/doc/codec/any.rs index 391e8ee26..1ea1bd41a 100644 --- a/libs/jwst-codec/src/doc/any.rs +++ b/libs/jwst-codec/src/doc/codec/any.rs @@ -6,7 +6,7 @@ use nom::{ use super::*; use std::collections::HashMap; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum Any { Undefined, Null, diff --git a/libs/jwst-codec/src/doc/content.rs b/libs/jwst-codec/src/doc/codec/content.rs similarity index 81% rename from libs/jwst-codec/src/doc/content.rs rename to libs/jwst-codec/src/doc/codec/content.rs index ac0330d54..8c77af3a2 100644 --- a/libs/jwst-codec/src/doc/content.rs +++ b/libs/jwst-codec/src/doc/codec/content.rs @@ -6,7 +6,7 @@ use nom::{ }; use serde_json::Value as JsonValue; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum YType { Array, Map, @@ -17,7 +17,7 @@ pub enum YType { XmlHook(String), } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum Content { Deleted(u64), JSON(Vec>), @@ -30,6 +30,27 @@ pub enum Content { Doc { guid: String, opts: Vec }, } +impl Content { + pub fn clock_len(&self) -> u64 { + match self { + Content::Deleted(len) => *len, + Content::JSON(strings) => strings.len() as u64, + Content::String(string) => string.len() as u64, + Content::Any(any) => any.len() as u64, + Content::Binary(_) + | Content::Embed(_) + | Content::Format { .. } + | Content::Type(_) + | Content::Doc { .. } => 1, + } + } + + pub fn split(&self, diff: u64) -> JwstCodecResult<(Content, Content)> { + // TODO: implement split for other types + Err(JwstCodecError::ContentSplitNotSupport(diff)) + } +} + pub fn read_content(input: &[u8], tag_type: u8) -> IResult<&[u8], Content> { match tag_type { 1 => { diff --git a/libs/jwst-codec/src/doc/id.rs b/libs/jwst-codec/src/doc/codec/id.rs similarity index 56% rename from libs/jwst-codec/src/doc/id.rs rename to libs/jwst-codec/src/doc/codec/id.rs index 8d181d615..14801386a 100644 --- a/libs/jwst-codec/src/doc/id.rs +++ b/libs/jwst-codec/src/doc/codec/id.rs @@ -1,13 +1,19 @@ use super::*; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct Id { pub client: u64, pub clock: u64, } +impl Id { + pub fn new(client: u64, clock: u64) -> Self { + Self { client, clock } + } +} + pub fn read_item_id(input: &[u8]) -> IResult<&[u8], Id> { let (tail, client) = read_var_u64(input)?; let (tail, clock) = read_var_u64(tail)?; - Ok((tail, Id { client, clock })) + Ok((tail, Id::new(client, clock))) } diff --git a/libs/jwst-codec/src/doc/item.rs b/libs/jwst-codec/src/doc/codec/item.rs similarity index 97% rename from libs/jwst-codec/src/doc/item.rs rename to libs/jwst-codec/src/doc/codec/item.rs index 03acf2422..8b278b1b2 100644 --- a/libs/jwst-codec/src/doc/item.rs +++ b/libs/jwst-codec/src/doc/codec/item.rs @@ -1,14 +1,13 @@ use super::*; -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum Parent { String(String), Id(Id), } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct Item { - pub info: u8, pub left_id: Option, pub right_id: Option, pub parent: Option, @@ -32,7 +31,6 @@ pub fn read_item(input: &[u8], info: u8, first_5_bit: u8) -> IResult<&[u8], Item // NOTE: read order must keep the same as the order in yjs // TODO: this data structure design will break the cpu OOE, need to be optimized let item = Item { - info, left_id: if has_left_id { let (tail, id) = read_item_id(input)?; input = tail; diff --git a/libs/jwst-codec/src/doc/codec/mod.rs b/libs/jwst-codec/src/doc/codec/mod.rs new file mode 100644 index 000000000..0ae02f252 --- /dev/null +++ b/libs/jwst-codec/src/doc/codec/mod.rs @@ -0,0 +1,20 @@ +mod any; +mod content; +mod id; +mod item; +mod refs; +mod update; + +pub use any::Any; +pub use content::Content; +pub use id::Id; +pub use item::Item; +pub use refs::StructInfo; +pub use update::{read_update, Update}; + +use super::*; +use any::read_any; +use content::read_content; +use id::read_item_id; +use item::read_item; +use refs::read_client_struct_refs; diff --git a/libs/jwst-codec/src/doc/codec/refs.rs b/libs/jwst-codec/src/doc/codec/refs.rs new file mode 100644 index 000000000..7ae16fac2 --- /dev/null +++ b/libs/jwst-codec/src/doc/codec/refs.rs @@ -0,0 +1,177 @@ +use super::*; +use nom::{multi::count, number::complete::be_u8}; +use std::collections::HashMap; + +enum RawStructInfo { + GC(u64), + Skip(u64), + Item(Item), +} + +struct RawRefs { + client: u64, + refs: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum StructInfo { + GC { id: Id, len: u64 }, + Skip { id: Id, len: u64 }, + Item { id: Id, item: Item }, +} + +impl StructInfo { + pub fn id(&self) -> &Id { + match self { + StructInfo::GC { id, .. } => id, + StructInfo::Skip { id, .. } => id, + StructInfo::Item { id, .. } => id, + } + } + + pub fn client_id(&self) -> u64 { + self.id().client + } + + pub fn clock(&self) -> u64 { + self.id().clock + } + + pub fn len(&self) -> u64 { + match self { + StructInfo::GC { len, .. } => *len, + StructInfo::Skip { len, .. } => *len, + StructInfo::Item { item, .. } => item.content.clock_len(), + } + } + + pub fn is_gc(&self) -> bool { + matches!(self, StructInfo::GC { .. }) + } + + pub fn is_skip(&self) -> bool { + matches!(self, StructInfo::Skip { .. }) + } + + pub fn is_item(&self) -> bool { + matches!(self, StructInfo::Item { .. }) + } + + pub fn split_item(&mut self, diff: u64) -> JwstCodecResult<(Self, Self)> { + if let Self::Item { id, item } = self { + let right_id = Id::new(id.client, id.clock + diff); + let (left_content, right_content) = item.content.split(diff)?; + + let left_item = StructInfo::Item { + id: id.clone(), + item: Item { + right_id: Some(right_id.clone()), + content: left_content, + ..item.clone() + }, + }; + + let right_item = StructInfo::Item { + id: right_id, + item: Item { + left_id: Some(Id::new(id.client, id.clock + diff - 1)), + right_id: item.right_id.clone(), + parent: item.parent.clone(), + parent_sub: item.parent_sub.clone(), + content: right_content, + }, + }; + + Ok((left_item, right_item)) + } else { + Err(JwstCodecError::ItemSplitNotSupport) + } + } +} + +fn read_struct(input: &[u8]) -> IResult<&[u8], RawStructInfo> { + let (input, info) = be_u8(input)?; + let first_5_bit = info & 0b11111; + + match first_5_bit { + 0 => { + let (input, len) = read_var_u64(input)?; + Ok((input, RawStructInfo::GC(len))) + } + 10 => { + let (input, len) = read_var_u64(input)?; + Ok((input, RawStructInfo::Skip(len))) + } + _ => { + let (input, item) = read_item(input, info, first_5_bit)?; + Ok((input, RawStructInfo::Item(item))) + } + } +} + +fn read_refs(input: &[u8]) -> IResult<&[u8], RawRefs> { + let (input, num_of_structs) = read_var_u64(input)?; + let (input, client) = read_var_u64(input)?; + let (input, clock) = read_var_u64(input)?; + let (input, structs) = count(read_struct, num_of_structs as usize)(input)?; + let (refs, _) = structs + .into_iter() + .fold((vec![], clock), |(mut vec, clock), s| { + let id = Id::new(client, clock); + match s { + RawStructInfo::GC(len) => { + vec.push(StructInfo::GC { id, len }); + (vec, clock + len) + } + RawStructInfo::Skip(len) => { + vec.push(StructInfo::Skip { id, len }); + (vec, clock + len) + } + RawStructInfo::Item(item) => { + let len = item.content.clock_len(); + vec.push(StructInfo::Item { id, item }); + (vec, clock + len) + } + } + }); + + Ok((input, RawRefs { client, refs })) +} + +pub fn read_client_struct_refs(input: &[u8]) -> IResult<&[u8], HashMap>> { + let (input, num_of_updates) = read_var_u64(input)?; + let (tail, updates) = count(read_refs, num_of_updates as usize)(input)?; + + Ok(( + tail, + updates.into_iter().map(|u| (u.client, u.refs)).collect(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_struct_info() { + { + let struct_info = StructInfo::GC { + id: Id::new(1, 0), + len: 10, + }; + assert_eq!(struct_info.len(), 10); + assert_eq!(struct_info.client_id(), 1); + assert_eq!(struct_info.clock(), 0); + } + + { + let struct_info = StructInfo::Skip { + id: Id::new(2, 0), + len: 20, + }; + assert_eq!(struct_info.len(), 20); + assert_eq!(struct_info.client_id(), 2); + assert_eq!(struct_info.clock(), 0); + } + } +} diff --git a/libs/jwst-codec/src/doc/codec/update.rs b/libs/jwst-codec/src/doc/codec/update.rs new file mode 100644 index 000000000..f908158c6 --- /dev/null +++ b/libs/jwst-codec/src/doc/codec/update.rs @@ -0,0 +1,54 @@ +use super::*; +use nom::multi::count; +use std::collections::HashMap; + +#[derive(Debug)] +pub struct Delete { + pub clock: u64, + pub clock_len: u64, +} + +#[derive(Debug)] +pub struct DeleteSets { + pub client: u64, + pub deletes: Vec, +} + +#[derive(Debug)] +pub struct Update { + pub delete_sets: Vec, + pub structs: HashMap>, +} + +fn read_delete(input: &[u8]) -> IResult<&[u8], Delete> { + let (tail, clock) = read_var_u64(input)?; + let (tail, clock_len) = read_var_u64(tail)?; + Ok((tail, Delete { clock, clock_len })) +} + +fn parse_delete_set(input: &[u8]) -> IResult<&[u8], DeleteSets> { + let (input, client) = read_var_u64(input)?; + let (input, num_of_deletes) = read_var_u64(input)?; + let (tail, deletes) = count(read_delete, num_of_deletes as usize)(input)?; + + Ok((tail, DeleteSets { client, deletes })) +} + +fn read_delete_set(input: &[u8]) -> IResult<&[u8], Vec> { + let (input, num_of_clients) = read_var_u64(input)?; + let (tail, deletes) = count(parse_delete_set, num_of_clients as usize)(input)?; + + Ok((tail, deletes)) +} + +pub fn read_update(input: &[u8]) -> IResult<&[u8], Update> { + let (tail, structs) = read_client_struct_refs(input)?; + let (tail, delete_sets) = read_delete_set(tail)?; + Ok(( + tail, + Update { + structs, + delete_sets, + }, + )) +} diff --git a/libs/jwst-codec/src/doc/document.rs b/libs/jwst-codec/src/doc/document.rs new file mode 100644 index 000000000..c9e1b0c06 --- /dev/null +++ b/libs/jwst-codec/src/doc/document.rs @@ -0,0 +1,122 @@ +use std::collections::{HashMap, HashSet}; + +use super::*; + +struct RestItems { + missing_state_vector: HashMap, + // TODO: use function in code + #[allow(dead_code)] + items: HashMap>, + stage_items: Vec, +} + +impl RestItems { + fn new() -> Self { + Self { + missing_state_vector: HashMap::new(), + items: HashMap::new(), + stage_items: Vec::new(), + } + } + + fn update_missing_sv(&mut self, client: u64, clock: u64) { + self.missing_state_vector + .entry(client) + .and_modify(|mclock| { + if *mclock > clock { + *mclock = clock; + } + }) + .or_insert(clock); + } + + fn add_item(&mut self, item: StructInfo) { + self.stage_items.push(item); + } + + // TODO: use function in code + #[allow(dead_code)] + fn collect_items(&mut self) -> HashSet { + let mut client_ids = HashSet::new(); + for item in self.stage_items.drain(..) { + let client_id = item.client_id(); + // TODO: move all items of the current iterator to rest_items + client_ids.insert(client_id); + } + client_ids + } +} + +pub struct Doc { + // TODO: use function in code + #[allow(dead_code)] + // random client id for each doc + client_id: u64, + // TODO: use function in code + #[allow(dead_code)] + // random id for each doc, use in sub doc + guid: String, + // root_type: HashMap, + store: DocStore, +} + +impl Doc { + pub fn new() -> Self { + Self { + client_id: rand::random(), + guid: nanoid!(), + // share: HashMap::new(), + store: DocStore::new(), + } + } + + fn integrate_update( + &self, + items: &HashMap>, + ) -> JwstCodecResult> { + let mut client_ids = items.keys().copied().collect::>(); + if client_ids.is_empty() { + return Ok(None); + } + client_ids.sort(); + client_ids.reverse(); + + let mut rest_items = RestItems::new(); + let mut state_cache = HashMap::new(); + + for client_id in client_ids { + let refs = items.get(&client_id).unwrap(); + let mut iterator = refs.iter(); + while let Some(item) = iterator.next() { + debug_assert_eq!(item.client_id(), client_id); + if !item.is_skip() { + let local_clock = *state_cache + .entry(client_id) + .or_insert_with(|| self.store.get_state(client_id)); + let offset = local_clock as i64 - item.clock() as i64; + + if offset < 0 { + // applying update depends on another update that not exists on local + rest_items.add_item(item.clone()); + rest_items.update_missing_sv(client_id, item.clock() - 1); + // TODO: move all items of the current iterator to rest_items + // rest_items.collect_items(); + } else { + } + } + } + } + + Ok(None) + } + + pub fn apply_update(&self, update: &[u8]) -> JwstCodecResult { + let (rest, update) = read_update(update).map_err(|e| e.map_input(|u| u.len()))?; + if rest.is_empty() { + return Err(JwstCodecError::UpdateNotFullyConsumed(rest.len())); + } + self.integrate_update(&update.structs)?; + + Ok(()) + } +} diff --git a/libs/jwst-codec/src/doc/mod.rs b/libs/jwst-codec/src/doc/mod.rs index ddf90ba23..7143a916f 100644 --- a/libs/jwst-codec/src/doc/mod.rs +++ b/libs/jwst-codec/src/doc/mod.rs @@ -1,13 +1,11 @@ -mod any; -mod content; -mod id; -mod item; -mod update; +mod codec; +mod document; +mod store; +mod traits; use super::*; -pub use any::{read_any, Any}; -pub use content::{read_content, Content}; -pub use id::{read_item_id, Id}; -pub use item::{read_item, Item}; -pub use update::{read_update, Update}; +pub use codec::{read_update, Any, Content, Id, Item, StructInfo, Update}; +pub use document::Doc; +pub use store::DocStore; +pub use traits::{CrdtList, CrdtMap}; diff --git a/libs/jwst-codec/src/doc/store.rs b/libs/jwst-codec/src/doc/store.rs new file mode 100644 index 000000000..f8d37829a --- /dev/null +++ b/libs/jwst-codec/src/doc/store.rs @@ -0,0 +1,388 @@ +use super::*; +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::{Arc, RwLock}, +}; + +#[derive(Clone)] +pub struct DocStore { + items: Arc>>>, +} + +impl DocStore { + pub fn new() -> Self { + Self { + items: Arc::default(), + } + } + + pub fn get_state(&self, client: u64) -> u64 { + if let Some(structs) = self.items.read().unwrap().get(&client) { + if let Some(last_struct) = structs.last() { + last_struct.clock() + last_struct.len() + } else { + warn!("client {} has no struct info", client); + 0 + } + } else { + 0 + } + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn get_state_vector(&self) -> HashMap { + let mut sm = HashMap::new(); + for (client, structs) in self.items.read().unwrap().iter() { + if let Some(last_struct) = structs.last() { + sm.insert(*client, last_struct.clock() + last_struct.len()); + } else { + warn!("client {} has no struct info", client); + } + } + sm + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn add_item(&self, item: StructInfo) -> JwstCodecResult { + let client_id = item.client_id(); + + match self.items.write().unwrap().entry(client_id) { + Entry::Occupied(mut entry) => { + let structs = entry.get_mut(); + if let Some(last_struct) = structs.last() { + let expect = last_struct.clock() + last_struct.len(); + let actually = item.clock(); + if expect != actually { + return Err(JwstCodecError::StructClockInvalid { expect, actually }); + } + } else { + warn!("client {} has no struct info", client_id); + } + structs.push(item); + } + Entry::Vacant(entry) => { + entry.insert(vec![item]); + } + } + + Ok(()) + } + + // TODO: use function in code + #[allow(dead_code)] + // binary search struct info on a sorted array + pub fn get_item_index( + items: &Vec, + client_id: u64, + clock: u64, + ) -> JwstCodecResult { + let mut left = 0; + let mut right = items.len() - 1; + let middle = &items[right]; + let middle_clock = middle.clock(); + if middle_clock == clock { + return Ok(right); + } + let mut middle_index = (clock / (middle_clock + middle.len() - 1)) as usize * right; + while left <= right { + let middle = &items[middle_index]; + let middle_clock = middle.clock(); + if middle_clock <= clock { + if clock < middle_clock + middle.len() { + return Ok(middle_index); + } + left = middle_index + 1; + } else { + right = middle_index - 1; + } + middle_index = (left + right) / 2; + } + Err(JwstCodecError::StructSequenceInvalid { client_id, clock }) + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn get_item(&self, client_id: u64, clock: u64) -> JwstCodecResult { + if let Some(items) = self.items.read().unwrap().get(&client_id) { + let index = Self::get_item_index(items, client_id, clock)?; + // TODO: item need to be a reference + Ok(items[index].clone()) + } else { + Err(JwstCodecError::StructSequenceNotExists(client_id)) + } + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn get_item_clean_end(&self, id: Id) -> JwstCodecResult { + if let Some(items) = self.items.read().unwrap().get(&id.client) { + let index = Self::get_item_index(items, id.client, id.client)?; + let mut item = items[index].clone(); + if id.clock != item.clock() + item.len() - 1 && !item.is_gc() { + let (left_item, right_item) = item.split_item(id.clock - item.clock() + 1)?; + if let Some(items) = self.items.write().unwrap().get_mut(&id.client) { + if let Some(index) = items.iter().position(|i| i.clock() == id.clock) { + items[index] = left_item.clone(); + items.insert(index + 1, right_item); + // TODO: item need to be a reference + return Ok(left_item); + } + } + Err(JwstCodecError::ItemSplitNotSupport) + } else { + Ok(item) + } + } else { + Err(JwstCodecError::StructSequenceNotExists(id.client)) + } + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn replace_item(&self, item: StructInfo) -> JwstCodecResult { + let client_id = item.client_id(); + let clock = item.clock(); + + if let Some(structs) = self.items.write().unwrap().get_mut(&client_id) { + let mut left = 0; + let mut right = structs.len() - 1; + let middle = &structs[right]; + let middle_clock = middle.clock(); + if middle_clock == clock { + structs[middle_clock as usize] = item; + return Ok(()); + } + let mut middle_index = (clock / (middle_clock + middle.len() - 1)) as usize * right; + while left <= right { + let middle = &structs[middle_index]; + let middle_clock = middle.clock(); + if middle_clock <= clock { + if clock < middle_clock + middle.len() { + structs[middle_index] = item; + return Ok(()); + } + left = middle_index + 1; + } else { + right = middle_index - 1; + } + middle_index = (left + right) / 2; + } + Err(JwstCodecError::StructSequenceInvalid { client_id, clock }) + } else { + Err(JwstCodecError::StructSequenceNotExists(client_id)) + } + } + + // TODO: use function in code + #[allow(dead_code)] + pub fn self_check(&self) -> JwstCodecResult { + for structs in self.items.read().unwrap().values() { + for i in 1..structs.len() { + let l = &structs[i - 1]; + let r = &structs[i]; + if l.clock() + l.len() != r.clock() { + return Err(JwstCodecError::StructSequenceInvalid { + client_id: l.client_id(), + clock: l.clock(), + }); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_state() { + { + let doc_store = DocStore::new(); + let state = doc_store.get_state(1); + assert_eq!(state, 0); + } + + { + let doc_store = DocStore::new(); + + let client_id = 1; + + let struct_info1 = StructInfo::GC { + id: Id::new(1, 1), + len: 5, + }; + let struct_info2 = StructInfo::Skip { + id: Id::new(1, 6), + len: 7, + }; + + doc_store + .items + .write() + .unwrap() + .insert(client_id, vec![struct_info1.clone(), struct_info2.clone()]); + + let state = doc_store.get_state(client_id); + + assert_eq!(state, struct_info2.clock() + struct_info2.len()); + + assert!(doc_store.self_check().is_ok()); + } + } + + #[test] + fn test_get_state_vector() { + { + let doc_store = DocStore::new(); + let state_map = doc_store.get_state_vector(); + assert!(state_map.is_empty()); + } + + { + let doc_store = DocStore::new(); + + let client1 = 1; + let struct_info1 = StructInfo::GC { + id: Id::new(1, 0), + len: 5, + }; + + let client2 = 2; + let struct_info2 = StructInfo::GC { + id: Id::new(2, 0), + len: 6, + }; + let struct_info3 = StructInfo::Skip { + id: Id::new(2, 6), + len: 1, + }; + + doc_store + .items + .write() + .unwrap() + .insert(client1, vec![struct_info1.clone()]); + doc_store + .items + .write() + .unwrap() + .insert(client2, vec![struct_info2.clone(), struct_info3.clone()]); + + let state_map = doc_store.get_state_vector(); + + assert_eq!( + state_map.get(&client1), + Some(&(struct_info1.clock() + struct_info1.len())) + ); + assert_eq!( + state_map.get(&client2), + Some(&(struct_info3.clock() + struct_info3.len())) + ); + + assert!(doc_store.self_check().is_ok()); + } + } + + #[test] + fn test_add_item() { + let doc_store = DocStore::new(); + + let struct_info1 = StructInfo::GC { + id: Id::new(1, 0), + len: 5, + }; + let struct_info2 = StructInfo::Skip { + id: Id::new(1, 5), + len: 1, + }; + let struct_info3_err = StructInfo::Skip { + id: Id::new(1, 5), + len: 1, + }; + let struct_info3 = StructInfo::Skip { + id: Id::new(1, 6), + len: 1, + }; + + assert!(doc_store.add_item(struct_info1.clone()).is_ok()); + assert!(doc_store.add_item(struct_info2.clone()).is_ok()); + assert_eq!( + doc_store.add_item(struct_info3_err), + Err(JwstCodecError::StructClockInvalid { + expect: 6, + actually: 5 + }) + ); + assert!(doc_store.add_item(struct_info3.clone()).is_ok()); + assert_eq!( + doc_store.get_state(struct_info1.client_id()), + struct_info3.clock() + struct_info3.len() + ); + } + + #[test] + fn test_get_item() { + { + let doc_store = DocStore::new(); + let struct_info = StructInfo::GC { + id: Id::new(1, 0), + len: 10, + }; + doc_store.add_item(struct_info.clone()).unwrap(); + + assert_eq!(doc_store.get_item(1, 9), Ok(struct_info)); + } + + { + let doc_store = DocStore::new(); + let struct_info1 = StructInfo::GC { + id: Id::new(1, 0), + len: 10, + }; + let struct_info2 = StructInfo::GC { + id: Id::new(1, 10), + len: 20, + }; + doc_store.add_item(struct_info1.clone()).unwrap(); + doc_store.add_item(struct_info2.clone()).unwrap(); + + assert_eq!(doc_store.get_item(1, 25), Ok(struct_info2)); + } + + { + let doc_store = DocStore::new(); + + assert_eq!( + doc_store.get_item(1, 0), + Err(JwstCodecError::StructSequenceNotExists(1)) + ); + } + + { + let doc_store = DocStore::new(); + let struct_info1 = StructInfo::GC { + id: Id::new(1, 0), + len: 10, + }; + let struct_info2 = StructInfo::GC { + id: Id::new(1, 10), + len: 20, + }; + doc_store.add_item(struct_info1.clone()).unwrap(); + doc_store.add_item(struct_info2.clone()).unwrap(); + + assert_eq!( + doc_store.get_item(1, 35), + Err(JwstCodecError::StructSequenceInvalid { + client_id: 1, + clock: 35 + }) + ); + } + } +} diff --git a/libs/jwst-codec/src/doc/traits.rs b/libs/jwst-codec/src/doc/traits.rs new file mode 100644 index 000000000..75cec0df5 --- /dev/null +++ b/libs/jwst-codec/src/doc/traits.rs @@ -0,0 +1,29 @@ +use super::*; + +pub trait CrdtList { + type Item; + + fn len(&self) -> usize; + fn get(&self, index: usize) -> Option<&Self::Item>; + fn insert(&mut self, index: usize, item: Self::Item); + fn remove(&mut self, index: usize) -> Option; + fn push(&mut self, item: Self::Item); + fn pop(&mut self) -> Option; + fn truncate(&mut self, len: usize); + fn clear(&mut self); + + fn get_store(&self) -> DocStore; +} + +pub trait CrdtMap { + type Key; + type Value; + + fn len(&self) -> usize; + fn get(&self, key: &Self::Key) -> Option<&Self::Value>; + fn insert(&mut self, key: Self::Key, value: Self::Value); + fn remove(&mut self, key: &Self::Key) -> Option; + fn clear(&mut self); + + fn get_store(&self) -> DocStore; +} diff --git a/libs/jwst-codec/src/doc/update.rs b/libs/jwst-codec/src/doc/update.rs deleted file mode 100644 index 6ccf4ffc7..000000000 --- a/libs/jwst-codec/src/doc/update.rs +++ /dev/null @@ -1,109 +0,0 @@ -use super::*; -use nom::{multi::count, number::complete::be_u8}; - -#[derive(Debug)] -pub enum StructInfo { - GC(u64), - Skip(u64), - Item(Item), -} - -#[derive(Debug)] -pub struct Delete { - pub clock: u64, - pub clock_len: u64, -} - -#[derive(Debug)] -pub struct DeleteSets { - pub client: u64, - pub deletes: Vec, -} - -#[derive(Debug)] -pub struct Structs { - pub client: u64, - pub clock: u64, - pub structs: Vec, -} - -#[derive(Debug)] -pub struct Update { - pub delete_sets: Vec, - pub structs: Vec, -} - -fn parse_struct(input: &[u8]) -> IResult<&[u8], StructInfo> { - let (input, info) = be_u8(input)?; - let first_5_bit = info & 0b11111; - - match first_5_bit { - 0 => { - let (input, len) = read_var_u64(input)?; - Ok((input, StructInfo::GC(len))) - } - 10 => { - let (input, len) = read_var_u64(input)?; - Ok((input, StructInfo::Skip(len))) - } - _ => { - let (input, item) = read_item(input, info, first_5_bit)?; - Ok((input, StructInfo::Item(item))) - } - } -} - -fn parse_structs(input: &[u8]) -> IResult<&[u8], Structs> { - let (input, num_of_structs) = read_var_u64(input)?; - let (input, client) = read_var_u64(input)?; - let (input, clock) = read_var_u64(input)?; - let (input, structs) = count(parse_struct, num_of_structs as usize)(input)?; - Ok(( - input, - Structs { - client, - clock, - structs, - }, - )) -} - -fn read_client_struct_refs(input: &[u8]) -> IResult<&[u8], Vec> { - let (input, num_of_updates) = read_var_u64(input)?; - let (tail, updates) = count(parse_structs, num_of_updates as usize)(input)?; - - Ok((tail, updates)) -} - -fn read_delete(input: &[u8]) -> IResult<&[u8], Delete> { - let (tail, clock) = read_var_u64(input)?; - let (tail, clock_len) = read_var_u64(tail)?; - Ok((tail, Delete { clock, clock_len })) -} - -fn parse_delete_set(input: &[u8]) -> IResult<&[u8], DeleteSets> { - let (input, client) = read_var_u64(input)?; - let (input, num_of_deletes) = read_var_u64(input)?; - let (tail, deletes) = count(read_delete, num_of_deletes as usize)(input)?; - - Ok((tail, DeleteSets { client, deletes })) -} - -fn read_delete_set(input: &[u8]) -> IResult<&[u8], Vec> { - let (input, num_of_clients) = read_var_u64(input)?; - let (tail, deletes) = count(parse_delete_set, num_of_clients as usize)(input)?; - - Ok((tail, deletes)) -} - -pub fn read_update(input: &[u8]) -> IResult<&[u8], Update> { - let (tail, structs) = read_client_struct_refs(input)?; - let (tail, delete_sets) = read_delete_set(tail)?; - Ok(( - tail, - Update { - structs, - delete_sets, - }, - )) -} diff --git a/libs/jwst-codec/src/lib.rs b/libs/jwst-codec/src/lib.rs index 0896ac255..d75119b52 100644 --- a/libs/jwst-codec/src/lib.rs +++ b/libs/jwst-codec/src/lib.rs @@ -4,9 +4,32 @@ mod doc; pub use codec::{ read_var_buffer, read_var_i64, read_var_string, read_var_u64, write_var_i64, write_var_u64, }; -pub use doc::{read_content, read_item, read_item_id, read_update, Content, Id, Item, Update}; +pub use doc::{read_update, Content, Doc, Id, Item, Update}; +use jwst_logger::warn; +use nanoid::nanoid; use nom::IResult; +use thiserror::Error; + +#[derive(Debug, Error, PartialEq)] +pub enum JwstCodecError { + #[error("Content does not support splitting in {0}")] + ContentSplitNotSupport(u64), + #[error("GC or Skip does not support splitting")] + ItemSplitNotSupport, + #[error("invalid update")] + UpdateInvalid(#[from] nom::Err>), + #[error("update not fully consumed: {0}")] + UpdateNotFullyConsumed(usize), + #[error("invalid struct clock, expect {expect}, actually {actually}")] + StructClockInvalid { expect: u64, actually: u64 }, + #[error("cannot find struct {clock} in {client_id}")] + StructSequenceInvalid { client_id: u64, clock: u64 }, + #[error("struct {0} not exists")] + StructSequenceNotExists(u64), +} + +pub type JwstCodecResult = Result; pub fn parse_doc_update(input: &[u8]) -> IResult<&[u8], Update> { let (input, update) = read_update(input)?; @@ -34,13 +57,10 @@ mod tests { assert_eq!(tail.len(), 0); assert_eq!(update.structs.len(), clients); assert_eq!( - update - .structs - .iter() - .map(|s| s.structs.len()) - .sum::(), + update.structs.iter().map(|s| s.1.len()).sum::(), structs ); + println!("{:?}", update); } } @@ -76,11 +96,7 @@ mod tests { "workspace: {}, global structs: {}, total structs: {}", ws.workspace, update.structs.len(), - update - .structs - .iter() - .map(|s| s.structs.len()) - .sum::() + update.structs.iter().map(|s| s.1.len()).sum::() ); } Err(_e) => {