Skip to content

Commit

Permalink
add support for verbatims in ketrees
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Jan 31, 2024
1 parent 1de9d36 commit e232b68
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 48 deletions.
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute."
# (https://github.com/rust-lang/cargo/issues/11329)
[workspace.dependencies]
aes = "0.8.2"
ahash = "0.8.7"
anyhow = { version = "1.0.69", default-features = false } # Default features are disabled due to usage in no_std crates
async-executor = "1.5.0"
async-global-executor = "2.3.1"
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-keyexpr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ default = ["std"]
std = ["zenoh-result/std", "dep:schemars"]

[dependencies]
ahash = { workspace = true }
keyed-set = { workspace = true }
rand = { workspace = true, features = ["alloc", "getrandom"] }
schemars = { workspace = true, optional = true }
Expand Down
14 changes: 14 additions & 0 deletions commons/zenoh-keyexpr/benches/keyexpr_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ fn main() {
let mut ketree: KeBoxTree<_> = KeBoxTree::new();
let mut vectree: KeBoxTree<_, bool, VecSetProvider> = KeBoxTree::new();
let mut hashtree: KeBoxTree<_, bool, HashMapProvider> = KeBoxTree::new();
let mut ahashtree: KeBoxTree<_, bool, HashMapProvider<ahash::AHasher>> =
KeBoxTree::new();
let (kearctree, mut token): (KeArcTree<i32>, _) = KeArcTree::new().unwrap();
let mut map = HashMap::new();
for key in keys.iter() {
Expand All @@ -58,13 +60,15 @@ fn main() {
});
b.run_once("vectree_insert", || vectree.insert(key, 0));
b.run_once("hashtree_insert", || hashtree.insert(key, 0));
b.run_once("ahashtree_insert", || ahashtree.insert(key, 0));
b.run_once("hashmap_insert", || map.insert(key.to_owned(), 0));
}
for key in keys.iter() {
b.run_once("ketree_fetch", || ketree.node(key));
b.run_once("kearctree_fetch", || kearctree.node(&token, key));
b.run_once("vectree_fetch", || vectree.node(key));
b.run_once("hashtree_fetch", || hashtree.node(key));
b.run_once("ahashtree_fetch", || ahashtree.node(key));
b.run_once("hashmap_fetch", || map.get(key));
}
for key in keys.iter() {
Expand All @@ -81,6 +85,9 @@ fn main() {
b.run_once("hashtree_intersect", || {
hashtree.intersecting_nodes(key).count()
});
b.run_once("ahashtree_intersect", || {
ahashtree.intersecting_nodes(key).count()
});
b.run_once("hashmap_intersect", || {
map.iter().filter(|(k, _)| key.intersects(k)).count()
});
Expand All @@ -92,6 +99,9 @@ fn main() {
});
b.run_once("vectree_include", || vectree.included_nodes(key).count());
b.run_once("hashtree_include", || hashtree.included_nodes(key).count());
b.run_once("ahashtree_include", || {
ahashtree.included_nodes(key).count()
});
b.run_once("hashmap_include", || {
map.iter().filter(|(k, _)| key.includes(k)).count()
});
Expand All @@ -102,21 +112,25 @@ fn main() {
"kearctree_insert",
"vectree_insert",
"hashtree_insert",
"ahashtree_insert",
"hashmap_insert",
"ketree_fetch",
"kearctree_fetch",
"vectree_fetch",
"hashtree_fetch",
"ahashtree_fetch",
"hashmap_fetch",
"ketree_intersect",
"kearctree_intersect",
"vectree_intersect",
"hashtree_intersect",
"ahashtree_intersect",
"hashmap_intersect",
"ketree_include",
"kearctree_include",
"vectree_include",
"hashtree_include",
"ahashtree_include",
"hashmap_include",
] {
let b = results.benches.get(name).unwrap();
Expand Down
5 changes: 4 additions & 1 deletion commons/zenoh-keyexpr/src/key_expr/fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use super::OwnedKeyExpr;

fn random_chunk(rng: &'_ mut impl rand::Rng) -> impl Iterator<Item = u8> + '_ {
let n = rng.gen_range(1..3);
(0..n).map(move |_| rng.sample(rand::distributions::Uniform::from(b'a'..b'c')))
rng.gen_bool(0.05)
.then_some(b'@')
.into_iter()
.chain((0..n).map(move |_| rng.sample(rand::distributions::Uniform::from(b'a'..b'c'))))
}

fn make(ke: &mut Vec<u8>, rng: &mut impl rand::Rng) {
Expand Down
18 changes: 12 additions & 6 deletions commons/zenoh-keyexpr/src/keyexpr_tree/iters/inclusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ where
};
}
let chunk = node.chunk();
let chunk_is_verbatim = chunk.as_bytes()[0] == b'@';
for i in *start..*end {
let kec_start = self.ke_indices[i];
if kec_start == self.key.len() {
Expand All @@ -107,8 +108,10 @@ where
let subkey =
unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) };
if unlikely(subkey == "**") {
push!(kec_start);
push!(kec_start + kec_end + 1);
if !chunk_is_verbatim {
push!(kec_start);
push!(kec_start + kec_end + 1);
}
let post_key = &key[kec_end + 1..];
match post_key.iter().position(|&c| c == b'/') {
Some(sec_end) => {
Expand All @@ -133,7 +136,7 @@ where
}
None => {
let key = unsafe { keyexpr::from_slice_unchecked(key) };
if unlikely(key == "**") {
if unlikely(key == "**") && chunk.as_bytes()[0] != b'@' {
push!(kec_start);
node_matches = true;
} else if key.includes(chunk) {
Expand Down Expand Up @@ -256,6 +259,7 @@ where
};
}
let chunk = node.chunk();
let chunk_is_verbatim = chunk.as_bytes()[0] == b'@';
for i in *start..*end {
let kec_start = self.ke_indices[i];
if kec_start == self.key.len() {
Expand All @@ -267,8 +271,10 @@ where
let subkey =
unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) };
if unlikely(subkey == "**") {
push!(kec_start);
push!(kec_start + kec_end + 1);
if !chunk_is_verbatim {
push!(kec_start);
push!(kec_start + kec_end + 1);
}
let post_key = &key[kec_end + 1..];
match post_key.iter().position(|&c| c == b'/') {
Some(sec_end) => {
Expand All @@ -293,7 +299,7 @@ where
}
None => {
let key = unsafe { keyexpr::from_slice_unchecked(key) };
if unlikely(key == "**") {
if unlikely(key == "**") && chunk.as_bytes()[0] != b'@' {
push!(kec_start);
node_matches = true;
} else if key.includes(chunk) {
Expand Down
82 changes: 59 additions & 23 deletions commons/zenoh-keyexpr/src/keyexpr_tree/iters/intersection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,25 @@ where
};
}
let chunk = node.chunk();
let chunk_is_verbatim = chunk.as_bytes()[0] == b'@';
if unlikely(chunk.as_bytes() == b"**") {
// If the current node is `**`, it is guaranteed to match
// If the current node is `**`, it is guaranteed to match...
node_matches = true;
// and may consume any number of chunks from the KE
push!(self.ke_indices[*start]);
for i in self.ke_indices[*start]..self.key.len() {
if self.key.as_bytes()[i] == b'/' {
push!(i + 1);
if self.key.len() != self.ke_indices[*start] {
if self.key.as_bytes()[self.ke_indices[*start]] != b'@' {
for i in self.ke_indices[*start]..self.key.len() {
if self.key.as_bytes()[i] == b'/' {
push!(i + 1);
if self.key.as_bytes()[i + 1] == b'@' {
node_matches = false; // ...unless the KE contains a verbatim chunk.
break;
}
}
}
} else {
node_matches = false;
}
}
} else {
Expand All @@ -121,9 +132,11 @@ where
let subkey =
unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) };
if unlikely(subkey.as_bytes() == b"**") {
// If the chunk is `**`:
// children will have to process it again
push!(kec_start);
if !chunk_is_verbatim {
// If the query chunk is `**`:
// children will have to process it again
push!(kec_start);
}
// and we need to process this chunk as if the `**` wasn't there,
// but with the knowledge that the next chunk won't be `**`.
let post_key = &key[kec_end + 1..];
Expand All @@ -144,6 +157,7 @@ where
}
.intersects(chunk)
{
push!(self.key.len());
node_matches = true;
}
}
Expand All @@ -155,7 +169,7 @@ where
None => {
// If it's the last chunk of the query, check whether it's `**`
let key = unsafe { keyexpr::from_slice_unchecked(key) };
if unlikely(key.as_bytes() == b"**") {
if unlikely(key.as_bytes() == b"**") && !chunk_is_verbatim {
// If yes, it automatically matches, and must be reused from now on for iteration.
push!(kec_start);
node_matches = true;
Expand Down Expand Up @@ -274,40 +288,57 @@ where
macro_rules! push {
($index: expr) => {
let index = $index;
if new_end == new_start
|| self.ke_indices[new_start..new_end]
.iter()
.rev()
.all(|c| *c < index)
{
if new_end == new_start || self.ke_indices[new_end - 1] < index {
self.ke_indices.push(index);
new_end += 1;
}
};
}
let chunk = node.chunk();
if unlikely(chunk == "**") {
let chunk_is_verbatim = chunk.as_bytes()[0] == b'@';
if unlikely(chunk.as_bytes() == b"**") {
// If the current node is `**`, it is guaranteed to match...
node_matches = true;
// and may consume any number of chunks from the KE
push!(self.ke_indices[*start]);
for i in self.ke_indices[*start]..self.key.len() {
if self.key.as_bytes()[i] == b'/' {
push!(i + 1);
if self.key.len() != self.ke_indices[*start] {
if self.key.as_bytes()[self.ke_indices[*start]] != b'@' {
for i in self.ke_indices[*start]..self.key.len() {
if self.key.as_bytes()[i] == b'/' {
push!(i + 1);
if self.key.as_bytes()[i + 1] == b'@' {
node_matches = false; // ...unless the KE contains a verbatim chunk.
break;
}
}
}
} else {
node_matches = false;
}
}
} else {
// The current node is not `**`
// For all candidate chunks of the KE
for i in *start..*end {
// construct that chunk, while checking whether or not it's the last one
let kec_start = self.ke_indices[i];
if kec_start == self.key.len() {
if unlikely(kec_start == self.key.len()) {
break;
}
let key = &self.key.as_bytes()[kec_start..];
match key.iter().position(|&c| c == b'/') {
Some(kec_end) => {
// If we aren't in the last chunk
let subkey =
unsafe { keyexpr::from_slice_unchecked(&key[..kec_end]) };
if unlikely(subkey == "**") {
push!(kec_start);
push!(kec_start + kec_end + 1);
if unlikely(subkey.as_bytes() == b"**") {
if !chunk_is_verbatim {
// If the query chunk is `**`:
// children will have to process it again
push!(kec_start);
}
// and we need to process this chunk as if the `**` wasn't there,
// but with the knowledge that the next chunk won't be `**`.
let post_key = &key[kec_end + 1..];
match post_key.iter().position(|&c| c == b'/') {
Some(sec_end) => {
Expand All @@ -326,6 +357,7 @@ where
}
.intersects(chunk)
{
push!(self.key.len());
node_matches = true;
}
}
Expand All @@ -335,11 +367,15 @@ where
}
}
None => {
// If it's the last chunk of the query, check whether it's `**`
let key = unsafe { keyexpr::from_slice_unchecked(key) };
if unlikely(key == "**") {
if unlikely(key.as_bytes() == b"**") && !chunk_is_verbatim {
// If yes, it automatically matches, and must be reused from now on for iteration.
push!(kec_start);
node_matches = true;
} else if chunk.intersects(key) {
// else, if it intersects with the chunk, make sure the children of the node
// are searched for `**`
push!(self.key.len());
node_matches = true;
}
Expand Down
Loading

0 comments on commit e232b68

Please sign in to comment.