Skip to content

Commit

Permalink
add delete_range to schema batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-weir committed Mar 5, 2025
1 parent abfb2b5 commit 6862887
Showing 1 changed file with 97 additions and 8 deletions.
105 changes: 97 additions & 8 deletions src/schema_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::collections::{btree_map, BTreeMap, HashMap};

use crate::metrics::SCHEMADB_BATCH_PUT_LATENCY_SECONDS;
use crate::schema::{ColumnFamilyName, KeyCodec, ValueCodec};
use crate::{Operation, Schema, SchemaKey};
use crate::{Operation, Schema, SchemaKey, SchemaValue};

// [`SchemaBatch`] holds a collection of updates that can be applied to a DB
/// ([`Schema`]) atomically. The updates will be applied in the order in which
/// they are added to the [`SchemaBatch`].
#[derive(Debug, Default, Clone)]
pub struct SchemaBatch {
pub(crate) last_writes: HashMap<ColumnFamilyName, BTreeMap<SchemaKey, Operation>>,
range_ops: HashMap<ColumnFamilyName, Vec<Operation>>,
}

impl SchemaBatch {
Expand Down Expand Up @@ -45,6 +46,23 @@ impl SchemaBatch {
Ok(())
}

/// Adds a delete range operation to the batch.
///
/// Note: Range based operations aren't supported by iterators or merging,
/// which is consistent with rocksdb `WriteBatch`.
pub fn delete_range<S: Schema>(
&mut self,
from: &impl KeyCodec<S>,
to: &impl KeyCodec<S>,
) -> anyhow::Result<()> {
let ops = self.range_ops.entry(S::COLUMN_FAMILY_NAME).or_default();
ops.push(Operation::DeleteRange {
from: from.encode_key()?,
to: to.encode_key()?,
});
Ok(())
}

fn insert_operation<S: Schema>(&mut self, key: SchemaKey, operation: Operation) {
let column_writes = self.last_writes.entry(S::COLUMN_FAMILY_NAME).or_default();
column_writes.insert(key, operation);
Expand Down Expand Up @@ -108,16 +126,51 @@ impl SchemaBatch {
impl proptest::arbitrary::Arbitrary for SchemaBatch {
type Parameters = &'static [ColumnFamilyName];
fn arbitrary_with(columns: Self::Parameters) -> Self::Strategy {
use proptest::prelude::any;
use proptest::prelude::*;
use proptest::strategy::Strategy;

proptest::collection::vec(any::<BTreeMap<SchemaKey, Operation>>(), columns.len())
.prop_map::<SchemaBatch, _>(|vec_vec_write_ops| {
let mut rows = HashMap::new();
for (col, write_op) in columns.iter().zip(vec_vec_write_ops.into_iter()) {
rows.insert(*col, write_op);
let point_ops_strategy = proptest::collection::vec(
proptest::collection::btree_map(
any::<SchemaKey>(),
prop_oneof![
any::<SchemaValue>().prop_map(|v| Operation::Put { value: v }),
Just(Operation::Delete)
],
0..10,
),
columns.len(),
);

let range_ops_strategy = proptest::collection::vec(
proptest::collection::vec(
(any::<SchemaKey>(), any::<SchemaKey>()).prop_map(|(start, end)| {
Operation::DeleteRange {
from: start,
to: end,
}
}),
0..5,
),
columns.len(),
);

(point_ops_strategy, range_ops_strategy)
.prop_map(|(point_ops, range_ops)| {
let mut last_writes = HashMap::new();
let mut range_operations = HashMap::new();

for (col, point_op) in columns.iter().zip(point_ops.into_iter()) {
last_writes.insert(*col, point_op);
}

for (col, range_op) in columns.iter().zip(range_ops.into_iter()) {
range_operations.insert(*col, range_op);
}

SchemaBatch {
last_writes,
range_ops: range_operations,
}
SchemaBatch { last_writes: rows }
})
.boxed()
}
Expand All @@ -134,6 +187,42 @@ mod tests {

define_schema!(TestSchema1, TestField, TestField, "TestCF1");

mod range {
use super::*;
use crate::schema::KeyDecoder;

#[test]
fn test_delete_range_inserts_expected_operation() {
let mut batch = SchemaBatch::new();
let field_1 = TestField(1);
let field_2 = TestField(5);
batch
.delete_range::<TestSchema1>(&field_1, &field_2)
.unwrap();

let op = &batch
.range_ops
.get(TestSchema1::COLUMN_FAMILY_NAME)
.unwrap()[0];

match op {
Operation::DeleteRange { from, to } => {
let from_key =
<<TestSchema1 as Schema>::Key as KeyDecoder<TestSchema1>>::decode_key(from)
.unwrap()
.0;
assert_eq!(from_key, 1);
let to_key =
<<TestSchema1 as Schema>::Key as KeyDecoder<TestSchema1>>::decode_key(to)
.unwrap()
.0;
assert_eq!(to_key, 5);
}
_ => panic!("Incorrect Operation disctriminate"),
};
}
}

mod iter {
use super::*;
use crate::schema::KeyDecoder;
Expand Down

0 comments on commit 6862887

Please sign in to comment.