Skip to content

Commit

Permalink
feat(procedure): auto split large value to multiple values (#3605)
Browse files Browse the repository at this point in the history
* feat: implement MultipleValuesStream

* refactor: move KeySet to common-procedure

* refactor: move MultipleValuesStream to common-procedure

* refactor: refactor String to KeySet

* fix: fix dropping `collecting` unexpectedly

* fix: fix typo

* refactor: add the fast path of put

* refactor: remove `single_value_collector`

* refactor: use `extend` instead of `push`

* test: add more tests for `KvStateStore`

* test(etcd_store): add more tests for `KvStateStore`

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: refactor with async_stream

* Update src/common/procedure/src/store/util.rs

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
WenyXu and evenyag authored Apr 1, 2024
1 parent 5e24448 commit 6c316d2
Show file tree
Hide file tree
Showing 5 changed files with 519 additions and 41 deletions.
239 changes: 215 additions & 24 deletions src/common/meta/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::store::state_store::{KeyValueStream, StateStore};
use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
use common_procedure::store::util::multiple_value_stream;
use common_procedure::Result as ProcedureResult;
use futures::future::try_join_all;
use futures::StreamExt;
use snafu::ResultExt;

Expand All @@ -42,16 +44,20 @@ fn strip_prefix(key: &str) -> String {

pub struct KvStateStore {
kv_backend: KvBackendRef,
// limit is set to 0, it is treated as no limit.
max_size_per_range: usize,
// The max num of keys to be returned in a range scan request
// `None` stands no limit.
max_num_per_range: Option<usize>,
// The max bytes of value.
// `None` stands no limit.
max_size_per_value: Option<usize>,
}

impl KvStateStore {
// `max_size_per_range` is set to 0, it is treated as no limit.
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_backend,
max_size_per_range: 0,
max_num_per_range: None,
max_size_per_value: None,
}
}
}
Expand All @@ -64,20 +70,80 @@ fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
Ok((key, value))
}

enum SplitValue<'a> {
Single(&'a [u8]),
Multiple(Vec<&'a [u8]>),
}

fn split_value(value: &[u8], max_size_per_value: Option<usize>) -> SplitValue<'_> {
if let Some(max_size_per_value) = max_size_per_value {
if value.len() <= max_size_per_value {
SplitValue::Single(value)
} else {
SplitValue::Multiple(value.chunks(max_size_per_value).collect::<Vec<_>>())
}
} else {
SplitValue::Single(value)
}
}

#[async_trait]
impl StateStore for KvStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
let _ = self
.kv_backend
.put(PutRequest {
key: with_prefix(key).into_bytes(),
value,
..Default::default()
})
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;
Ok(())
let split = split_value(&value, self.max_size_per_value);
let key = with_prefix(key);
match split {
SplitValue::Single(_) => {
self.kv_backend
.put(
PutRequest::new()
.with_key(key.to_string().into_bytes())
.with_value(value),
)
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;
Ok(())
}
SplitValue::Multiple(values) => {
// Note:
// The length of values can be up to usize::MAX.
// The KeySet::with_segment_suffix method uses a 10-digit number to store the segment number,
// which is large enough for the usize type.

// The first segment key: "0b00001111"
// The 2nd segment key: "0b00001111/0000000001"
// The 3rd segment key: "0b00001111/0000000002"
let operations = values
.into_iter()
.enumerate()
.map(|(idx, value)| {
let key = if idx > 0 {
KeySet::with_segment_suffix(&key, idx)
} else {
key.to_string()
};
let kv_backend = self.kv_backend.clone();
async move {
kv_backend
.put(
PutRequest::new()
.with_key(key.into_bytes())
.with_value(value),
)
.await
}
})
.collect::<Vec<_>>();

try_join_all(operations)
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;

Ok(())
}
}
}

async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
Expand All @@ -90,7 +156,7 @@ impl StateStore for KvStateStore {
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
self.max_size_per_range,
self.max_num_per_range.unwrap_or_default(),
Arc::new(decode_kv),
);

Expand All @@ -100,6 +166,8 @@ impl StateStore for KvStateStore {
.with_context(|_| ListStateSnafu { path })
});

let stream = multiple_value_stream(Box::pin(stream));

Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -128,19 +196,26 @@ impl StateStore for KvStateStore {

#[cfg(test)]
mod tests {
use std::env;
use std::sync::Arc;

use common_procedure::store::state_store::KeyValue;
use common_telemetry::info;
use futures::TryStreamExt;
use rand::{Rng, RngCore};
use uuid::Uuid;

use super::*;
use crate::kv_backend::chroot::ChrootKvBackend;
use crate::kv_backend::etcd::EtcdStore;
use crate::kv_backend::memory::MemoryKvBackend;

#[tokio::test]
async fn test_meta_state_store() {
let store = &KvStateStore {
kv_backend: Arc::new(MemoryKvBackend::new()),
max_size_per_range: 1, // for testing "more" in range
max_num_per_range: Some(1), // for testing "more" in range
max_size_per_value: None,
};

let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
Expand All @@ -165,18 +240,18 @@ mod tests {
let data = walk_top_down("/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("b/1".to_string(), b"v3".to_vec())
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
("b/1".into(), b"v3".to_vec())
],
data
);

let data = walk_top_down("a/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
],
data
);
Expand All @@ -187,6 +262,122 @@ mod tests {
.unwrap();

let data = walk_top_down("a/").await;
assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data);
assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
}

struct TestCase {
prefix: String,
key: String,
value: Vec<u8>,
}

async fn test_meta_state_store_split_value_with_size_limit(
kv_backend: KvBackendRef,
size_limit: u32,
num_per_range: u32,
max_bytes: u32,
) {
let num_cases = rand::thread_rng().gen_range(1..=26);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
let mut large_value = vec![0u8; size as usize];
rand::thread_rng().fill_bytes(&mut large_value);

// Starts from `a`.
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
cases.push(TestCase {
key: format!("{}{}.commit", prefix, Uuid::new_v4()),
prefix,
value: large_value,
})
}
let store = &KvStateStore {
kv_backend: kv_backend.clone(),
max_num_per_range: Some(num_per_range as usize), // for testing "more" in range
max_size_per_value: Some(size_limit as usize),
};
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
let mut data = store
.walk_top_down(path)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
data
};

// Puts the values
for TestCase { key, value, .. } in &cases {
store.put(key, value.clone()).await.unwrap();
}

// Validates the values
for TestCase { prefix, key, value } in &cases {
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 1);
let (keyset, got) = data.into_iter().next().unwrap();
let num_expected_keys = value.len().div_ceil(size_limit as usize);
assert_eq!(&got, value);
assert_eq!(keyset.key(), key);
assert_eq!(keyset.keys().len(), num_expected_keys);
}

// Deletes the values
for TestCase { prefix, .. } in &cases {
let data = walk_top_down(prefix).await;
let (keyset, _) = data.into_iter().next().unwrap();
// Deletes values
store.batch_delete(keyset.keys().as_slice()).await.unwrap();
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 0);
}
}

#[tokio::test]
async fn test_meta_state_store_split_value() {
let size_limit = rand::thread_rng().gen_range(128..=512);
let page_size = rand::thread_rng().gen_range(1..10);
let kv_backend = Arc::new(MemoryKvBackend::new());
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
.await;
}

#[tokio::test]
async fn test_etcd_store_split_value() {
common_telemetry::init_default_ut_logging();
let prefix = "test_etcd_store_split_value/";
let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
let kv_backend: KvBackendRef = if endpoints.is_empty() {
Arc::new(MemoryKvBackend::new())
} else {
let endpoints = endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints, 128)
.await
.expect("malformed endpoints");
// Each retry requires a new isolation namespace.
let chroot = format!("{}{}", prefix, Uuid::new_v4());
info!("chroot length: {}", chroot.len());
Arc::new(ChrootKvBackend::new(chroot.into(), backend))
};

let key_preserve_size = 1024;
// The etcd default size limit of any requests is 1.5MiB.
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
// we don't know the exact size of the key.
let size_limit = 1536 * 1024 - key_preserve_size;
let page_size = rand::thread_rng().gen_range(1..10);
test_meta_state_store_split_value_with_size_limit(
kv_backend,
size_limit,
page_size,
size_limit * 10,
)
.await;
}
}
16 changes: 15 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ pub enum Error {
source: Arc<Error>,
location: Location,
},

#[snafu(display("Failed to parse segment key: {key}"))]
ParseSegmentKey {
location: Location,
key: String,
#[snafu(source)]
error: std::num::ParseIntError,
},

#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { location: Location, err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -156,7 +167,10 @@ impl ErrorExt for Error {
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected,
Error::ProcedurePanic { .. }
| Error::CorruptedData { .. }
| Error::ParseSegmentKey { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
Expand Down
15 changes: 9 additions & 6 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) use crate::store::state_store::StateStoreRef;
use crate::ProcedureId;

pub mod state_store;
pub mod util;

/// Key prefix of procedure store.
const PROC_PATH: &str = "procedure/";
Expand Down Expand Up @@ -143,16 +144,17 @@ impl ProcedureStore {
// 8 should be enough for most procedures.
let mut step_keys = Vec::with_capacity(8);
let mut finish_keys = Vec::new();
while let Some((key, _)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
while let Some((key_set, _)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
step_keys.push(key);
step_keys.extend(key_set.keys());
} else {
// .commit or .rollback
finish_keys.push(key);
finish_keys.extend(key_set.keys());
}
}

Expand Down Expand Up @@ -184,8 +186,9 @@ impl ProcedureStore {

// Scan all procedures.
let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
while let Some((key, value)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
while let Some((key_set, value)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
continue;
};
Expand Down
Loading

0 comments on commit 6c316d2

Please sign in to comment.