From 6c316d268fb4590d064d4ae6d6972939d690e14d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 1 Apr 2024 20:04:29 +0800 Subject: [PATCH] feat(procedure): auto split large value to multiple values (#3605) * feat: implement MultipleValuesStream * refactor: move KeySet to common-procedure * refactor: move MultipleValuesStream to common-procedure * refactor: refactor String to KeySet * fix: fix dropping `collecting` unexpectedly * fix: fix typo * refactor: add the fast path of put * refactor: remove `single_value_collector` * refactor: use `extend` instead of `push` * test: add more tests for `KvStateStore` * test(etcd_store): add more tests for `KvStateStore` * chore: apply suggestions from CR * chore: apply suggestions from CR * refactor: refactor with async_stream * Update src/common/procedure/src/store/util.rs Co-authored-by: Yingwen --------- Co-authored-by: Yingwen --- src/common/meta/src/state_store.rs | 239 ++++++++++++++++-- src/common/procedure/src/error.rs | 16 +- src/common/procedure/src/store.rs | 15 +- src/common/procedure/src/store/state_store.rs | 76 +++++- src/common/procedure/src/store/util.rs | 214 ++++++++++++++++ 5 files changed, 519 insertions(+), 41 deletions(-) create mode 100644 src/common/procedure/src/store/util.rs diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 28e5d88273ad..686e7477cec6 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -17,8 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu}; -use common_procedure::store::state_store::{KeyValueStream, StateStore}; +use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore}; +use common_procedure::store::util::multiple_value_stream; use common_procedure::Result as ProcedureResult; +use futures::future::try_join_all; use futures::StreamExt; use snafu::ResultExt; @@ -42,16 +44,20 @@ fn strip_prefix(key: &str) -> String { pub struct KvStateStore { kv_backend: KvBackendRef, - // limit is set to 0, it is treated as no limit. - max_size_per_range: usize, + // The max num of keys to be returned in a range scan request + // `None` stands no limit. + max_num_per_range: Option, + // The max bytes of value. + // `None` stands no limit. + max_size_per_value: Option, } impl KvStateStore { - // `max_size_per_range` is set to 0, it is treated as no limit. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend, - max_size_per_range: 0, + max_num_per_range: None, + max_size_per_value: None, } } } @@ -64,20 +70,80 @@ fn decode_kv(kv: KeyValue) -> Result<(String, Vec)> { Ok((key, value)) } +enum SplitValue<'a> { + Single(&'a [u8]), + Multiple(Vec<&'a [u8]>), +} + +fn split_value(value: &[u8], max_size_per_value: Option) -> SplitValue<'_> { + if let Some(max_size_per_value) = max_size_per_value { + if value.len() <= max_size_per_value { + SplitValue::Single(value) + } else { + SplitValue::Multiple(value.chunks(max_size_per_value).collect::>()) + } + } else { + SplitValue::Single(value) + } +} + #[async_trait] impl StateStore for KvStateStore { async fn put(&self, key: &str, value: Vec) -> ProcedureResult<()> { - let _ = self - .kv_backend - .put(PutRequest { - key: with_prefix(key).into_bytes(), - value, - ..Default::default() - }) - .await - .map_err(BoxedError::new) - .context(PutStateSnafu { key })?; - Ok(()) + let split = split_value(&value, self.max_size_per_value); + let key = with_prefix(key); + match split { + SplitValue::Single(_) => { + self.kv_backend + .put( + PutRequest::new() + .with_key(key.to_string().into_bytes()) + .with_value(value), + ) + .await + .map_err(BoxedError::new) + .context(PutStateSnafu { key })?; + Ok(()) + } + SplitValue::Multiple(values) => { + // Note: + // The length of values can be up to usize::MAX. + // The KeySet::with_segment_suffix method uses a 10-digit number to store the segment number, + // which is large enough for the usize type. + + // The first segment key: "0b00001111" + // The 2nd segment key: "0b00001111/0000000001" + // The 3rd segment key: "0b00001111/0000000002" + let operations = values + .into_iter() + .enumerate() + .map(|(idx, value)| { + let key = if idx > 0 { + KeySet::with_segment_suffix(&key, idx) + } else { + key.to_string() + }; + let kv_backend = self.kv_backend.clone(); + async move { + kv_backend + .put( + PutRequest::new() + .with_key(key.into_bytes()) + .with_value(value), + ) + .await + } + }) + .collect::>(); + + try_join_all(operations) + .await + .map_err(BoxedError::new) + .context(PutStateSnafu { key })?; + + Ok(()) + } + } } async fn walk_top_down(&self, path: &str) -> ProcedureResult { @@ -90,7 +156,7 @@ impl StateStore for KvStateStore { let stream = PaginationStream::new( self.kv_backend.clone(), req, - self.max_size_per_range, + self.max_num_per_range.unwrap_or_default(), Arc::new(decode_kv), ); @@ -100,6 +166,8 @@ impl StateStore for KvStateStore { .with_context(|_| ListStateSnafu { path }) }); + let stream = multiple_value_stream(Box::pin(stream)); + Ok(Box::pin(stream)) } @@ -128,19 +196,26 @@ impl StateStore for KvStateStore { #[cfg(test)] mod tests { + use std::env; use std::sync::Arc; use common_procedure::store::state_store::KeyValue; + use common_telemetry::info; use futures::TryStreamExt; + use rand::{Rng, RngCore}; + use uuid::Uuid; use super::*; + use crate::kv_backend::chroot::ChrootKvBackend; + use crate::kv_backend::etcd::EtcdStore; use crate::kv_backend::memory::MemoryKvBackend; #[tokio::test] async fn test_meta_state_store() { let store = &KvStateStore { kv_backend: Arc::new(MemoryKvBackend::new()), - max_size_per_range: 1, // for testing "more" in range + max_num_per_range: Some(1), // for testing "more" in range + max_size_per_value: None, }; let walk_top_down = async move |path: &str| -> Vec { @@ -165,9 +240,9 @@ mod tests { let data = walk_top_down("/").await; assert_eq!( vec![ - ("a/1".to_string(), b"v1".to_vec()), - ("a/2".to_string(), b"v2".to_vec()), - ("b/1".to_string(), b"v3".to_vec()) + ("a/1".into(), b"v1".to_vec()), + ("a/2".into(), b"v2".to_vec()), + ("b/1".into(), b"v3".to_vec()) ], data ); @@ -175,8 +250,8 @@ mod tests { let data = walk_top_down("a/").await; assert_eq!( vec![ - ("a/1".to_string(), b"v1".to_vec()), - ("a/2".to_string(), b"v2".to_vec()), + ("a/1".into(), b"v1".to_vec()), + ("a/2".into(), b"v2".to_vec()), ], data ); @@ -187,6 +262,122 @@ mod tests { .unwrap(); let data = walk_top_down("a/").await; - assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data); + assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data); + } + + struct TestCase { + prefix: String, + key: String, + value: Vec, + } + + async fn test_meta_state_store_split_value_with_size_limit( + kv_backend: KvBackendRef, + size_limit: u32, + num_per_range: u32, + max_bytes: u32, + ) { + let num_cases = rand::thread_rng().gen_range(1..=26); + let mut cases = Vec::with_capacity(num_cases); + for i in 0..num_cases { + let size = rand::thread_rng().gen_range(size_limit..=max_bytes); + let mut large_value = vec![0u8; size as usize]; + rand::thread_rng().fill_bytes(&mut large_value); + + // Starts from `a`. + let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap()); + cases.push(TestCase { + key: format!("{}{}.commit", prefix, Uuid::new_v4()), + prefix, + value: large_value, + }) + } + let store = &KvStateStore { + kv_backend: kv_backend.clone(), + max_num_per_range: Some(num_per_range as usize), // for testing "more" in range + max_size_per_value: Some(size_limit as usize), + }; + let walk_top_down = async move |path: &str| -> Vec { + let mut data = store + .walk_top_down(path) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + data + }; + + // Puts the values + for TestCase { key, value, .. } in &cases { + store.put(key, value.clone()).await.unwrap(); + } + + // Validates the values + for TestCase { prefix, key, value } in &cases { + let data = walk_top_down(prefix).await; + assert_eq!(data.len(), 1); + let (keyset, got) = data.into_iter().next().unwrap(); + let num_expected_keys = value.len().div_ceil(size_limit as usize); + assert_eq!(&got, value); + assert_eq!(keyset.key(), key); + assert_eq!(keyset.keys().len(), num_expected_keys); + } + + // Deletes the values + for TestCase { prefix, .. } in &cases { + let data = walk_top_down(prefix).await; + let (keyset, _) = data.into_iter().next().unwrap(); + // Deletes values + store.batch_delete(keyset.keys().as_slice()).await.unwrap(); + let data = walk_top_down(prefix).await; + assert_eq!(data.len(), 0); + } + } + + #[tokio::test] + async fn test_meta_state_store_split_value() { + let size_limit = rand::thread_rng().gen_range(128..=512); + let page_size = rand::thread_rng().gen_range(1..10); + let kv_backend = Arc::new(MemoryKvBackend::new()); + test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192) + .await; + } + + #[tokio::test] + async fn test_etcd_store_split_value() { + common_telemetry::init_default_ut_logging(); + let prefix = "test_etcd_store_split_value/"; + let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + let kv_backend: KvBackendRef = if endpoints.is_empty() { + Arc::new(MemoryKvBackend::new()) + } else { + let endpoints = endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(); + let backend = EtcdStore::with_endpoints(endpoints, 128) + .await + .expect("malformed endpoints"); + // Each retry requires a new isolation namespace. + let chroot = format!("{}{}", prefix, Uuid::new_v4()); + info!("chroot length: {}", chroot.len()); + Arc::new(ChrootKvBackend::new(chroot.into(), backend)) + }; + + let key_preserve_size = 1024; + // The etcd default size limit of any requests is 1.5MiB. + // However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`; + // we don't know the exact size of the key. + let size_limit = 1536 * 1024 - key_preserve_size; + let page_size = rand::thread_rng().gen_range(1..10); + test_meta_state_store_split_value_with_size_limit( + kv_backend, + size_limit, + page_size, + size_limit * 10, + ) + .await; } } diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 25a974df86c6..98ecbfef2527 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -134,6 +134,17 @@ pub enum Error { source: Arc, location: Location, }, + + #[snafu(display("Failed to parse segment key: {key}"))] + ParseSegmentKey { + location: Location, + key: String, + #[snafu(source)] + error: std::num::ParseIntError, + }, + + #[snafu(display("Unexpected: {err_msg}"))] + Unexpected { location: Location, err_msg: String }, } pub type Result = std::result::Result; @@ -156,7 +167,10 @@ impl ErrorExt for Error { Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } - Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected, + Error::ProcedurePanic { .. } + | Error::CorruptedData { .. } + | Error::ParseSegmentKey { .. } + | Error::Unexpected { .. } => StatusCode::Unexpected, Error::ProcedureExec { source, .. } => source.status_code(), Error::StartRemoveOutdatedMetaTask { source, .. } | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(), diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 84dd39520d9a..e0a129c52ae7 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -25,6 +25,7 @@ pub(crate) use crate::store::state_store::StateStoreRef; use crate::ProcedureId; pub mod state_store; +pub mod util; /// Key prefix of procedure store. const PROC_PATH: &str = "procedure/"; @@ -143,16 +144,17 @@ impl ProcedureStore { // 8 should be enough for most procedures. let mut step_keys = Vec::with_capacity(8); let mut finish_keys = Vec::new(); - while let Some((key, _)) = key_values.try_next().await? { - let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else { + while let Some((key_set, _)) = key_values.try_next().await? { + let key = key_set.key(); + let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else { logging::warn!("Unknown key while deleting procedures, key: {}", key); continue; }; if curr_key.key_type == KeyType::Step { - step_keys.push(key); + step_keys.extend(key_set.keys()); } else { // .commit or .rollback - finish_keys.push(key); + finish_keys.extend(key_set.keys()); } } @@ -184,8 +186,9 @@ impl ProcedureStore { // Scan all procedures. let mut key_values = self.store.walk_top_down(&self.proc_path).await?; - while let Some((key, value)) = key_values.try_next().await? { - let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else { + while let Some((key_set, value)) = key_values.try_next().await? { + let key = key_set.key(); + let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else { logging::warn!("Unknown key while loading procedures, key: {}", key); continue; }; diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index ed168c4dd712..f02ccc793772 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -25,8 +25,64 @@ use snafu::ResultExt; use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result}; +/// The set of keys. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KeySet { + key: String, + segments: usize, +} + +impl PartialOrd for KeySet { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for KeySet { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.key.cmp(&other.key) + } +} + +impl From<&str> for KeySet { + fn from(value: &str) -> Self { + KeySet { + key: value.to_string(), + segments: 0, + } + } +} + +impl KeySet { + pub fn new(key: String, segments: usize) -> Self { + Self { key, segments } + } + + pub fn with_segment_suffix(key: &str, version: usize) -> String { + format!("{key}/{version:010}") + } + + pub fn with_prefix(key: &str) -> String { + format!("{key}/") + } + + pub fn keys(&self) -> Vec { + let mut keys = Vec::with_capacity(self.segments + 1); + keys.push(self.key.to_string()); + for i in 1..=self.segments { + keys.push(Self::with_segment_suffix(&self.key, i)) + } + + keys + } + + pub fn key(&self) -> &str { + &self.key + } +} + /// Key value from state store. -pub type KeyValue = (String, Vec); +pub type KeyValue = (KeySet, Vec); /// Stream that yields [KeyValue]. pub type KeyValueStream = Pin> + Send>>; @@ -123,7 +179,7 @@ impl StateStore for ObjectStateStore { )) }) .context(ListStateSnafu { path: key })?; - yield (key.to_string(), value); + yield (key.into(), value); } } }); @@ -193,9 +249,9 @@ mod tests { data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); assert_eq!( vec![ - ("a/1".to_string(), b"v1".to_vec()), - ("a/2".to_string(), b"v2".to_vec()), - ("b/1".to_string(), b"v3".to_vec()) + ("a/1".into(), b"v1".to_vec()), + ("a/2".into(), b"v2".to_vec()), + ("b/1".into(), b"v3".to_vec()) ], data ); @@ -210,8 +266,8 @@ mod tests { data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); assert_eq!( vec![ - ("a/1".to_string(), b"v1".to_vec()), - ("a/2".to_string(), b"v2".to_vec()), + ("a/1".into(), b"v1".to_vec()), + ("a/2".into(), b"v2".to_vec()), ], data ); @@ -228,7 +284,7 @@ mod tests { .await .unwrap(); data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data); + assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data); } #[tokio::test] @@ -257,8 +313,8 @@ mod tests { data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); assert_eq!( vec![ - ("a/1".to_string(), b"v1".to_vec()), - ("a/2".to_string(), b"v2".to_vec()), + ("a/1".into(), b"v1".to_vec()), + ("a/2".into(), b"v2".to_vec()), ], data ); diff --git a/src/common/procedure/src/store/util.rs b/src/common/procedure/src/store/util.rs new file mode 100644 index 000000000000..924a3b069bd0 --- /dev/null +++ b/src/common/procedure/src/store/util.rs @@ -0,0 +1,214 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; + +use async_stream::try_stream; +use futures::{Stream, TryStreamExt}; +use snafu::{ensure, ResultExt}; + +use super::state_store::KeySet; +use crate::error; +use crate::error::Result; + +pub struct CollectingState { + pairs: Vec<(String, Vec)>, +} + +fn parse_segments(segments: Vec<(String, Vec)>, prefix: &str) -> Result)>> { + segments + .into_iter() + .map(|(key, value)| { + let suffix = key.trim_start_matches(prefix); + let index = suffix + .parse::() + .context(error::ParseSegmentKeySnafu { key })?; + + Ok((index, value)) + }) + .collect::>>() +} + +/// Collects multiple values into a single key-value pair. +/// Returns an error if: +/// - Part values are lost. +/// - Failed to parse the key of segment. +fn multiple_values_collector( + CollectingState { mut pairs }: CollectingState, +) -> Result<(KeySet, Vec)> { + if pairs.len() == 1 { + // Safety: must exist. + let (key, value) = pairs.into_iter().next().unwrap(); + Ok((KeySet::new(key, 0), value)) + } else { + let segments = pairs.split_off(1); + // Safety: must exist. + let (key, value) = pairs.into_iter().next().unwrap(); + let prefix = KeySet::with_prefix(&key); + let mut parsed_segments = parse_segments(segments, &prefix)?; + parsed_segments.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + + // Safety: `parsed_segments` must larger than 0. + let segment_num = parsed_segments.last().unwrap().0; + ensure!( + // The segment index start from 1. + parsed_segments.len() == segment_num, + error::UnexpectedSnafu { + err_msg: format!( + "Corrupted segment keys, parsed segment indexes: {:?}", + parsed_segments + .into_iter() + .map(|(key, _)| key) + .collect::>() + ) + } + ); + + let segment_values = parsed_segments.into_iter().map(|(_, value)| value); + let mut values = Vec::with_capacity(segment_values.len() + 1); + values.push(value); + values.extend(segment_values); + + Ok((KeySet::new(key, segment_num), values.concat())) + } +} + +impl CollectingState { + fn new(key: String, value: Vec) -> CollectingState { + Self { + pairs: vec![(key, value)], + } + } + + fn push(&mut self, key: String, value: Vec) { + self.pairs.push((key, value)); + } + + fn key(&self) -> &str { + self.pairs[0].0.as_str() + } +} + +pub type Upstream = dyn Stream)>> + Send; + +pub fn multiple_value_stream( + mut upstream: Pin>, +) -> impl Stream)>> { + try_stream! { + let mut collecting: Option = None; + while let Some((key, value)) = upstream.try_next().await? { + match collecting.take() { + Some(mut current) => { + if key.starts_with(current.key()) { + // Pushes the key value pair into `collecting`. + current.push(key, value); + collecting = Some(current); + } else { + // Starts to collect next key value pair. + collecting = Some(CollectingState::new(key, value)); + yield multiple_values_collector(current)?; + } + } + None => collecting = Some(CollectingState::new(key, value)), + } + } + if let Some(current) = collecting.take() { + yield multiple_values_collector(current)? + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use futures::stream::{self}; + use futures::TryStreamExt; + + use super::*; + use crate::error::{self}; + + #[test] + fn test_key_set_keys() { + let key = KeySet::new("baz".to_string(), 3); + let keys = key.keys(); + assert_eq!(keys.len(), 4); + assert_eq!(&keys[0], "baz"); + assert_eq!(&keys[1], &KeySet::with_segment_suffix("baz", 1)); + } + + #[tokio::test] + async fn test_multiple_values_collector() { + let upstream = stream::iter(vec![ + Ok(("foo".to_string(), vec![0, 1, 2, 3])), + Ok(("foo/0002".to_string(), vec![6, 7])), + Ok(("foo/0003".to_string(), vec![8])), + Ok(("foo/0001".to_string(), vec![4, 5])), + Ok(("bar".to_string(), vec![0, 1, 2, 3])), + Ok(("baz".to_string(), vec![0, 1, 2, 3])), + Ok(("baz/0003".to_string(), vec![8])), + Ok(("baz/0001".to_string(), vec![4, 5])), + Ok(("baz/0002".to_string(), vec![6, 7])), + ]); + let mut stream = Box::pin(multiple_value_stream(Box::pin(upstream))); + let (key, value) = stream.try_next().await.unwrap().unwrap(); + let keys = key.keys(); + assert_eq!(keys[0], "foo"); + assert_eq!(keys.len(), 4); + assert_eq!(value, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + let (key, value) = stream.try_next().await.unwrap().unwrap(); + let keys = key.keys(); + assert_eq!(keys[0], "bar"); + assert_eq!(keys.len(), 1); + assert_eq!(value, vec![0, 1, 2, 3]); + let (key, value) = stream.try_next().await.unwrap().unwrap(); + let keys = key.keys(); + assert_eq!(keys[0], "baz"); + assert_eq!(keys.len(), 4); + assert_eq!(value, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + assert!(stream.try_next().await.unwrap().is_none()); + // Call again + assert!(stream.try_next().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_empty_upstream() { + let upstream = stream::iter(vec![]); + let mut stream = Box::pin(multiple_value_stream(Box::pin(upstream))); + assert!(stream.try_next().await.unwrap().is_none()); + // Call again + assert!(stream.try_next().await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_multiple_values_collector_err() { + let upstream = stream::iter(vec![ + Err(error::UnexpectedSnafu { err_msg: "mock" }.build()), + Ok(("foo".to_string(), vec![0, 1, 2, 3])), + Ok(("foo/0001".to_string(), vec![4, 5])), + ]); + let mut stream = Box::pin(multiple_value_stream(Box::pin(upstream))); + let err = stream.try_next().await.unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + + let upstream = stream::iter(vec![ + Ok(("foo".to_string(), vec![0, 1, 2, 3])), + Ok(("foo/0001".to_string(), vec![4, 5])), + Err(error::UnexpectedSnafu { err_msg: "mock" }.build()), + ]); + let mut stream = Box::pin(multiple_value_stream(Box::pin(upstream))); + let err = stream.try_next().await.unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + } +}