Skip to content

Commit

Permalink
Adds delete range batch op (#20)
Browse files Browse the repository at this point in the history
* adds delete range batch op

* adds range iter
  • Loading branch information
ross-weir authored Mar 5, 2025
1 parent 16a8d99 commit c2b15a8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/cache/delta_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ where
Operation::Put { value } => {
return Some((key.to_vec(), value.to_vec()))
}
Operation::Delete => continue,
Operation::Delete | Operation::DeleteRange { .. } => continue,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ where

impl<'a, S> FusedIterator for SchemaIterator<'a, S> where S: Schema {}

/// Iterates over given column in [`rocksdb::DB`].
/// Iterates over given column in [`rocksdb::DB`] using raw types.
pub(crate) struct RawDbIter<'a> {
db_iter: rocksdb::DBRawIterator<'a>,
direction: ScanDirection,
Expand Down
50 changes: 47 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ use iterator::ScanDirection;
pub use iterator::{SchemaIterator, SeekKeyEncoder};
use metrics::{
SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS, SCHEMADB_DELETES,
SCHEMADB_GET_BYTES, SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES,
SCHEMADB_DELETE_RANGE, SCHEMADB_GET_BYTES, SCHEMADB_GET_LATENCY_SECONDS, SCHEMADB_PUT_BYTES,
};
pub use rocksdb;
use rocksdb::ReadOptions;
pub use rocksdb::DEFAULT_COLUMN_FAMILY_NAME;
use thiserror::Error;
use tracing::info;
use tracing::{info, warn};

use crate::iterator::RawDbIter;
pub use crate::schema::Schema;
Expand Down Expand Up @@ -271,6 +271,18 @@ impl DB {
self.iter_with_direction::<S>(Default::default(), ScanDirection::Forward)
}

/// Returns a range based [`SchemaIterator`] for the schema with the default read options.
pub fn iter_range<S: Schema>(
&self,
from: &impl SeekKeyEncoder<S>,
to: &impl SeekKeyEncoder<S>,
) -> anyhow::Result<SchemaIterator<S>> {
let mut opts = ReadOptions::default();
opts.set_iterate_lower_bound(from.encode_seek_key()?);
opts.set_iterate_upper_bound(to.encode_seek_key()?);
self.iter_with_direction::<S>(opts, ScanDirection::Forward)
}

/// Returns a [`RawDbIter`] which allows to iterate over raw values in specified [`ScanDirection`].
pub(crate) fn raw_iter<S: Schema>(
&self,
Expand Down Expand Up @@ -313,6 +325,23 @@ impl DB {
match operation {
Operation::Put { value } => db_batch.put_cf(cf_handle, key, value),
Operation::Delete => db_batch.delete_cf(cf_handle, key),
Operation::DeleteRange { .. } => {
warn!("Unexpected range operation found: {:?}", operation)
}
}
}
}
for (cf_name, operations) in batch.range_ops.iter() {
let cf_handle = self.get_cf_handle(cf_name)?;
for operation in operations {
match operation {
Operation::DeleteRange { from, to } => {
db_batch.delete_range_cf(cf_handle, from, to)
}
_ => warn!(
"Unexpected non range based operation found: {:?}",
operation
),
}
}
}
Expand All @@ -332,6 +361,14 @@ impl DB {
Operation::Delete => {
SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
}
Operation::DeleteRange { .. } => (),
}
}
}
for (cf_name, operations) in batch.range_ops.iter() {
for operation in operations {
if let Operation::DeleteRange { .. } = operation {
SCHEMADB_DELETE_RANGE.with_label_values(&[cf_name]).inc();
}
}
}
Expand Down Expand Up @@ -404,6 +441,13 @@ pub enum Operation {
},
/// Deleting a value
Delete,
/// Deleting a range of values
DeleteRange {
/// Start of the range to delete
from: SchemaKey,
/// End of the range to delete
to: SchemaKey,
},
}

impl Operation {
Expand All @@ -414,7 +458,7 @@ impl Operation {
let value = S::Value::decode_value(value)?;
Ok(Some(value))
}
Operation::Delete => Ok(None),
Operation::Delete | Operation::DeleteRange { .. } => Ok(None),
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ pub static SCHEMADB_DELETES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!("storage_deletes", "Storage delete calls", &["cf_name"]).unwrap()
});

pub static SCHEMADB_DELETE_RANGE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"storage_delete_range",
"Storage delete range calls",
&["cf_name"]
)
.unwrap()
});

pub static SCHEMADB_BATCH_PUT_LATENCY_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
// metric name
Expand Down
107 changes: 98 additions & 9 deletions src/schema_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::collections::{btree_map, BTreeMap, HashMap};

use crate::metrics::SCHEMADB_BATCH_PUT_LATENCY_SECONDS;
use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec};
use crate::{Operation, Schema, SchemaKey};
use crate::{Operation, Schema, SchemaKey, SeekKeyEncoder};

// [`SchemaBatch`] holds a collection of updates that can be applied to a DB
/// ([`Schema`]) atomically. The updates will be applied in the order in which
/// they are added to the [`SchemaBatch`].
#[derive(Debug, Default, Clone)]
pub struct SchemaBatch {
pub(crate) last_writes: HashMap<ColumnFamilyName, BTreeMap<SchemaKey, Operation>>,
pub(crate) range_ops: HashMap<ColumnFamilyName, Vec<Operation>>,
}

impl SchemaBatch {
Expand Down Expand Up @@ -45,6 +46,23 @@ impl SchemaBatch {
Ok(())
}

/// Adds a delete range operation to the batch.
///
/// Note: Range based operations aren't supported by iterators or merging,
/// which is consistent with rocksdb `WriteBatch`.
pub fn delete_range<S: Schema>(
&mut self,
from: &impl SeekKeyEncoder<S>,
to: &impl SeekKeyEncoder<S>,
) -> anyhow::Result<()> {
let ops = self.range_ops.entry(S::COLUMN_FAMILY_NAME).or_default();
ops.push(Operation::DeleteRange {
from: from.encode_seek_key()?,
to: to.encode_seek_key()?,
});
Ok(())
}

fn insert_operation<S: Schema>(&mut self, key: SchemaKey, operation: Operation) {
let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default();
column_writes.insert(key, operation);
Expand Down Expand Up @@ -108,16 +126,51 @@ impl SchemaBatch {
impl proptest::arbitrary::Arbitrary for SchemaBatch {
type Parameters = &'static [ColumnFamilyName];
fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy {
use proptest::prelude::any;
use proptest::prelude::*;
use proptest::strategy::Strategy;

proptest::collection::vec(any::<BTreeMap<SchemaKey, Operation>>(), columns.len())
.prop_map::<SchemaBatch, _>(|vec_vec_write_ops| {
let mut rows = HashMap::new();
for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_op);
let point_ops_strategy = proptest::collection::vec(
proptest::collection::btree_map(
any::<SchemaKey>(),
prop_oneof![
any::<crate::SchemaValue>().prop_map(|v| Operation::Put { value: v }),
Just(Operation::Delete)
],
0..10,
),
columns.len(),
);

let range_ops_strategy = proptest::collection::vec(
proptest::collection::vec(
(any::<SchemaKey>(), any::<SchemaKey>()).prop_map(|(start, end)| {
Operation::DeleteRange {
from: start,
to: end,
}
}),
0..5,
),
columns.len(),
);

(point_ops_strategy, range_ops_strategy)
.prop_map(|(point_ops, range_ops)| {
let mut last_writes = HashMap::new();
let mut range_operations = HashMap::new();

for (col, point_op) in columns.iter().zip(point_ops.into_iter()) {
last_writes.insert(*col, point_op);
}

for (col, range_op) in columns.iter().zip(range_ops.into_iter()) {
range_operations.insert(*col, range_op);
}

SchemaBatch {
last_writes,
range_ops: range_operations,
}
SchemaBatch { last_writes: rows }
})
.boxed()
}
Expand All @@ -134,6 +187,42 @@ mod tests {

define_schema!(TestSchema1, TestField, TestField, "TestCF1");

mod range {
use super::*;
use crate::schema::KeyDecoder;

#[test]
fn test_delete_range_inserts_expected_operation() {
let mut batch = SchemaBatch::new();
let field_1 = TestField(1);
let field_2 = TestField(5);
batch
.delete_range::<TestSchema1>(&field_1, &field_2)
.unwrap();

let op = &batch
.range_ops
.get(TestSchema1::COLUMN_FAMILY_NAME)
.unwrap()[0];

match op {
Operation::DeleteRange { from, to } => {
let from_key =
<<TestSchema1 as Schema>::Key as KeyDecoder<TestSchema1>>::decode_key(from)
.unwrap()
.0;
assert_eq!(from_key, 1);
let to_key =
<<TestSchema1 as Schema>::Key as KeyDecoder<TestSchema1>>::decode_key(to)
.unwrap()
.0;
assert_eq!(to_key, 5);
}
_ => panic!("Incorrect Operation disctriminate"),
};
}
}

mod iter {
use super::*;
use crate::schema::KeyDecoder;
Expand Down Expand Up @@ -163,7 +252,7 @@ mod tests {
.unwrap()
.0,
),
Operation::Delete => None,
Operation::Delete | Operation::DeleteRange { .. } => None,
};
(key, value)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/iterator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ fn test_schema_batch_iteration_order() {
decode_key(key),
<TestField as ValueCodec<S>>::decode_value(value).unwrap(),
)),
Operation::Delete => None,
Operation::Delete | Operation::DeleteRange { .. } => None,
})
.collect();
assert_eq!(operations, collected);
Expand Down

0 comments on commit c2b15a8

Please sign in to comment.