From 47657ebbc80280b9d32955a90e8d8819addd0c58 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 28 Aug 2024 19:37:18 +0800 Subject: [PATCH] feat: replay WAL entries respect index (#4565) * feat(log_store): use new `Consumer` * feat: add `from_peer_id` * feat: read WAL entries respect index * test: add test for `build_region_wal_index_iterator` * fix: keep the handle * fix: incorrect last index * fix: replay last entry id may be greater than expected * chore: remove unused code * chore: apply suggestions from CR * chore: rename `datanode_id` to `location_id` * chore: rename `from_peer_id` to `location_id` * chore: rename `from_peer_id` to `location_id` * chore: apply suggestions from CR --- Cargo.lock | 1 + src/common/meta/src/instruction.rs | 3 + src/datanode/src/heartbeat/handler.rs | 1 + .../src/heartbeat/handler/upgrade_region.rs | 11 +- src/log-store/Cargo.toml | 1 + src/log-store/src/error.rs | 9 + src/log-store/src/kafka.rs | 5 - src/log-store/src/kafka/consumer.rs | 24 ++- src/log-store/src/kafka/index.rs | 9 +- src/log-store/src/kafka/index/collector.rs | 165 +++++++++++++++++- src/log-store/src/kafka/index/encoder.rs | 8 +- src/log-store/src/kafka/index/iterator.rs | 137 ++++++++++++++- src/log-store/src/kafka/log_store.rs | 68 ++++---- src/log-store/src/raft_engine/log_store.rs | 11 +- .../upgrade_candidate_region.rs | 1 + src/metric-engine/src/engine/catchup.rs | 2 + src/mito2/src/engine/catchup_test.rs | 9 + src/mito2/src/region/opener.rs | 2 +- src/mito2/src/wal.rs | 19 +- src/mito2/src/wal/raw_entry_reader.rs | 21 ++- src/mito2/src/worker/handle_catchup.rs | 7 +- src/store-api/src/logstore.rs | 17 ++ src/store-api/src/region_request.rs | 2 + 23 files changed, 460 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4d4e4d2b75a..18d16b268239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5826,6 +5826,7 @@ dependencies = [ "common-time", "common-wal", "delta-encoding", + "derive_builder 0.12.0", "futures", "futures-util", "itertools 0.10.5", diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index fa7126ed3fb0..d620cc3449ed 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -153,6 +153,9 @@ pub struct UpgradeRegion { /// it's helpful to verify whether the leader region is ready. #[serde(with = "humantime_serde")] pub wait_for_replay_timeout: Option, + /// The hint for replaying memtable. + #[serde(default)] + pub location_id: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 48320f9b1101..573b94cb1185 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -206,6 +206,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout: None, + location_id: None, }); assert!( heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index dd95ee3c2b6f..614373166315 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -27,6 +27,7 @@ impl HandlerContext { region_id, last_entry_id, wait_for_replay_timeout, + location_id, }: UpgradeRegion, ) -> BoxFuture<'static, InstructionReply> { Box::pin(async move { @@ -62,6 +63,7 @@ impl HandlerContext { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, + location_id, }), ) .await?; @@ -151,6 +153,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout, + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -191,6 +194,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout, + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -232,6 +236,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout, + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -274,8 +279,9 @@ mod tests { .clone() .handle_upgrade_region_instruction(UpgradeRegion { region_id, - last_entry_id: None, wait_for_replay_timeout, + last_entry_id: None, + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -293,6 +299,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout: Some(Duration::from_millis(500)), + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -337,6 +344,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout: None, + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); @@ -354,6 +362,7 @@ mod tests { region_id, last_entry_id: None, wait_for_replay_timeout: Some(Duration::from_millis(200)), + location_id: None, }) .await; assert_matches!(reply, InstructionReply::UpgradeRegion(_)); diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index b841bb3505ac..d8cbc9b7ec6a 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -26,6 +26,7 @@ common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true delta-encoding = "0.4" +derive_builder.workspace = true futures.workspace = true futures-util.workspace = true itertools.workspace = true diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 14648a8e9288..26753919b53f 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -304,6 +304,15 @@ pub enum Error { error: object_store::Error, }, + #[snafu(display("Failed to read index, path: {path}"))] + ReadIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + path: String, + }, + #[snafu(display( "The length of meta if exceeded the limit: {}, actual: {}", limit, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ad73b770398c..bda7a85e15eb 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,15 +14,10 @@ pub(crate) mod client_manager; pub(crate) mod consumer; -/// TODO(weny): remove it. -#[allow(dead_code)] -#[allow(unused_imports)] pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; -/// TODO(weny): remove it. -#[allow(dead_code)] pub(crate) mod worker; pub use index::{default_index_file, GlobalIndexCollector}; diff --git a/src/log-store/src/kafka/consumer.rs b/src/log-store/src/kafka/consumer.rs index 70fa5e848274..4afde1a116e5 100644 --- a/src/log-store/src/kafka/consumer.rs +++ b/src/log-store/src/kafka/consumer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use common_telemetry::debug; +use derive_builder::Builder; use futures::future::{BoxFuture, Fuse, FusedFuture}; use futures::{FutureExt, Stream}; use pin_project::pin_project; @@ -60,40 +61,61 @@ struct FetchResult { used_offset: i64, } +const MAX_BATCH_SIZE: usize = 52428800; +const AVG_RECORD_SIZE: usize = 256 * 1024; + /// The [`Consumer`] struct represents a Kafka consumer that fetches messages from /// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`]. #[pin_project] +#[derive(Builder)] +#[builder(pattern = "owned")] pub struct Consumer { + #[builder(default = "-1")] last_high_watermark: i64, /// The client is used to fetch records from kafka topic. client: Arc, /// The max batch size in a single fetch request. + #[builder(default = "MAX_BATCH_SIZE")] max_batch_size: usize, /// The max wait milliseconds. + #[builder(default = "500")] max_wait_ms: u32, /// The avg record size + #[builder(default = "AVG_RECORD_SIZE")] avg_record_size: usize, /// Termination flag + #[builder(default = "false")] terminated: bool, /// The buffer of records. buffer: RecordsBuffer, /// The fetch future. + #[builder(default = "Fuse::terminated()")] fetch_fut: Fuse>>, } -struct RecordsBuffer { +pub(crate) struct RecordsBuffer { buffer: VecDeque, index: Box, } +impl RecordsBuffer { + /// Creates an empty [`RecordsBuffer`] + pub fn new(index: Box) -> Self { + RecordsBuffer { + buffer: VecDeque::new(), + index, + } + } +} + impl RecordsBuffer { fn pop_front(&mut self) -> Option { while let Some(index) = self.index.peek() { diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 1bd0f3e621e8..fa98f3fcaf90 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -20,10 +20,11 @@ pub use collector::GlobalIndexCollector; pub(crate) use collector::{IndexCollector, NoopCollector}; pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ - MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, - RegionWalVecIndex, + build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE, }; +#[cfg(test)] +pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex}; -pub fn default_index_file(datanode_id: u64) -> String { - format!("__datanode/{datanode_id}/index.json") +pub fn default_index_file(location_id: u64) -> String { + format!("__wal/{location_id}/index.json") } diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 181222304629..22f1da7cf882 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -19,6 +19,7 @@ use std::time::Duration; use common_telemetry::{error, info}; use futures::future::try_join_all; +use object_store::ErrorKind; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; @@ -28,8 +29,9 @@ use tokio::select; use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; +use super::default_index_file; use crate::error::{self, Result}; -use crate::kafka::index::encoder::IndexEncoder; +use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder}; use crate::kafka::index::JsonIndexEncoder; use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest}; @@ -52,10 +54,11 @@ pub trait IndexCollector: Send + Sync { /// The [`GlobalIndexCollector`] struct is responsible for managing index entries /// across multiple providers. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct GlobalIndexCollector { providers: Arc, Sender>>>, - task: CollectionTask, + operator: object_store::ObjectStore, + _handle: CollectionTaskHandle, } #[derive(Debug, Clone)] @@ -103,7 +106,7 @@ impl CollectionTask { /// The background task performs two main operations: /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. - fn run(&self) { + fn run(self) -> CollectionTaskHandle { let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); let running = self.running.clone(); let moved_self = self.clone(); @@ -122,15 +125,23 @@ impl CollectionTask { } } }); + CollectionTaskHandle { + running: self.running.clone(), + } } } -impl Drop for CollectionTask { +impl Drop for CollectionTaskHandle { fn drop(&mut self) { self.running.store(false, Ordering::Relaxed); } } +#[derive(Debug, Default)] +struct CollectionTaskHandle { + running: Arc, +} + impl GlobalIndexCollector { /// Constructs a [`GlobalIndexCollector`]. /// @@ -148,16 +159,65 @@ impl GlobalIndexCollector { let task = CollectionTask { providers: providers.clone(), dump_index_interval, - operator, + operator: operator.clone(), path, running: Arc::new(AtomicBool::new(true)), }; - task.run(); - Self { providers, task } + let handle = task.run(); + Self { + providers, + operator, + _handle: handle, + } + } + + #[cfg(test)] + pub fn new_for_test(operator: object_store::ObjectStore) -> Self { + Self { + providers: Default::default(), + operator, + _handle: Default::default(), + } } } impl GlobalIndexCollector { + /// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id` + /// that are greater than or equal to a given `entry_id`. + pub(crate) async fn read_remote_region_index( + &self, + location_id: u64, + provider: &KafkaProvider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result, EntryId)>> { + let path = default_index_file(location_id); + + let bytes = match self.operator.read(&path).await { + Ok(bytes) => bytes.to_vec(), + Err(err) => { + if err.kind() == ErrorKind::NotFound { + return Ok(None); + } else { + return Err(err).context(error::ReadIndexSnafu { path }); + } + } + }; + + match DatanodeWalIndexes::decode(&bytes)?.provider(provider) { + Some(indexes) => { + let last_index = indexes.last_index(); + let indexes = indexes + .region(region_id) + .unwrap_or_default() + .split_off(&entry_id); + + Ok(Some((indexes, last_index))) + } + None => Ok(None), + } + } + /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. pub(crate) async fn provider_level_index_collector( &self, @@ -266,3 +326,92 @@ impl IndexCollector for NoopCollector { fn dump(&mut self, _encoder: &dyn IndexEncoder) {} } + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap}; + + use store_api::logstore::provider::KafkaProvider; + use store_api::storage::RegionId; + + use crate::kafka::index::collector::RegionIndexes; + use crate::kafka::index::encoder::IndexEncoder; + use crate::kafka::index::JsonIndexEncoder; + use crate::kafka::{default_index_file, GlobalIndexCollector}; + + #[tokio::test] + async fn test_read_remote_region_index() { + let operator = object_store::ObjectStore::new(object_store::services::Memory::default()) + .unwrap() + .finish(); + + let path = default_index_file(0); + let encoder = JsonIndexEncoder::default(); + encoder.encode( + &KafkaProvider::new("my_topic_0".to_string()), + &RegionIndexes { + regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]), + latest_entry_id: 20, + }, + ); + let bytes = encoder.finish().unwrap(); + let mut writer = operator.writer(&path).await.unwrap(); + writer.write(bytes).await.unwrap(); + writer.close().await.unwrap(); + + let collector = GlobalIndexCollector::new_for_test(operator.clone()); + // Index file doesn't exist + let result = collector + .read_remote_region_index( + 1, + &KafkaProvider::new("my_topic_0".to_string()), + RegionId::new(1, 1), + 1, + ) + .await + .unwrap(); + assert!(result.is_none()); + + // RegionId doesn't exist + let (indexes, last_index) = collector + .read_remote_region_index( + 0, + &KafkaProvider::new("my_topic_0".to_string()), + RegionId::new(1, 2), + 5, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(indexes, BTreeSet::new()); + assert_eq!(last_index, 20); + + // RegionId(1, 1), Start EntryId: 5 + let (indexes, last_index) = collector + .read_remote_region_index( + 0, + &KafkaProvider::new("my_topic_0".to_string()), + RegionId::new(1, 1), + 5, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(indexes, BTreeSet::from([5, 15])); + assert_eq!(last_index, 20); + + // RegionId(1, 1), Start EntryId: 20 + let (indexes, last_index) = collector + .read_remote_region_index( + 0, + &KafkaProvider::new("my_topic_0".to_string()), + RegionId::new(1, 1), + 20, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(indexes, BTreeSet::new()); + assert_eq!(last_index, 20); + } +} diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs index bfd11a982c14..bbbf013ebf9c 100644 --- a/src/log-store/src/kafka/index/encoder.rs +++ b/src/log-store/src/kafka/index/encoder.rs @@ -50,7 +50,7 @@ pub struct DeltaEncodedRegionIndexes { impl DeltaEncodedRegionIndexes { /// Retrieves the original (decoded) index values for a given region. - fn region(&self, region_id: RegionId) -> Option> { + pub(crate) fn region(&self, region_id: RegionId) -> Option> { let decoded = self .regions .get(®ion_id) @@ -60,7 +60,7 @@ impl DeltaEncodedRegionIndexes { } /// Retrieves the last index. - fn last_index(&self) -> u64 { + pub(crate) fn last_index(&self) -> u64 { self.last_index } } @@ -86,7 +86,7 @@ impl DatanodeWalIndexes { value } - fn decode(byte: &[u8]) -> Result { + pub(crate) fn decode(byte: &[u8]) -> Result { serde_json::from_slice(byte).context(error::DecodeJsonSnafu) } @@ -118,7 +118,7 @@ impl IndexEncoder for JsonIndexEncoder { #[cfg(test)] mod tests { - use std::collections::{BTreeSet, HashMap, HashSet}; + use std::collections::{BTreeSet, HashMap}; use store_api::logstore::provider::KafkaProvider; use store_api::storage::RegionId; diff --git a/src/log-store/src/kafka/index/iterator.rs b/src/log-store/src/kafka/index/iterator.rs index 7df2518752bd..9ab350036da5 100644 --- a/src/log-store/src/kafka/index/iterator.rs +++ b/src/log-store/src/kafka/index/iterator.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::min; -use std::collections::VecDeque; +use std::cmp::{max, min}; +use std::collections::{BTreeSet, VecDeque}; +use std::fmt::Debug; use std::ops::Range; use store_api::logstore::EntryId; @@ -27,7 +28,7 @@ pub(crate) struct NextBatchHint { } /// An iterator over WAL (Write-Ahead Log) entries index for a region. -pub trait RegionWalIndexIterator: Send + Sync { +pub trait RegionWalIndexIterator: Send + Sync + Debug { /// Returns next batch hint. fn next_batch_hint(&self, avg_size: usize) -> Option; @@ -36,9 +37,13 @@ pub trait RegionWalIndexIterator: Send + Sync { // Advances the iterator and returns the next EntryId. fn next(&mut self) -> Option; + + #[cfg(test)] + fn as_any(&self) -> &dyn std::any::Any; } /// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region. +#[derive(Debug)] pub struct RegionWalRange { current_entry_id: EntryId, end_entry_id: EntryId, @@ -96,10 +101,18 @@ impl RegionWalIndexIterator for RegionWalRange { None } } + + #[cfg(test)] + fn as_any(&self) -> &dyn std::any::Any { + self + } } +pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024; + /// Represents an index of Write-Ahead Log entries for a region, /// stored as a vector of [EntryId]s. +#[derive(Debug)] pub struct RegionWalVecIndex { index: VecDeque, min_batch_window_size: usize, @@ -134,11 +147,17 @@ impl RegionWalIndexIterator for RegionWalVecIndex { fn next(&mut self) -> Option { self.index.pop_front() } + + #[cfg(test)] + fn as_any(&self) -> &dyn std::any::Any { + self + } } /// Represents an iterator over multiple region WAL indexes. /// /// Allowing iteration through multiple WAL indexes. +#[derive(Debug)] pub struct MultipleRegionWalIndexIterator { iterator: VecDeque>, } @@ -185,6 +204,53 @@ impl RegionWalIndexIterator for MultipleRegionWalIndexIterator { self.iterator.front_mut().and_then(|iter| iter.next()) } + + #[cfg(test)] + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +/// Builds [`RegionWalIndexIterator`]. +/// +/// Returns None means there are no entries to replay. +pub fn build_region_wal_index_iterator( + start_entry_id: EntryId, + end_entry_id: EntryId, + region_indexes: Option<(BTreeSet, EntryId)>, + max_batch_bytes: usize, + min_window_size: usize, +) -> Option> { + if (start_entry_id..end_entry_id).is_empty() { + return None; + } + + match region_indexes { + Some((region_indexes, last_index)) => { + if region_indexes.is_empty() && last_index >= end_entry_id { + return None; + } + + let mut iterator: Vec> = Vec::with_capacity(2); + if !region_indexes.is_empty() { + let index = RegionWalVecIndex::new(region_indexes, min_window_size); + iterator.push(Box::new(index)); + } + let known_last_index = max(last_index, start_entry_id); + if known_last_index < end_entry_id { + let range = known_last_index..end_entry_id; + let index = RegionWalRange::new(range, max_batch_bytes); + iterator.push(Box::new(index)); + } + + Some(Box::new(MultipleRegionWalIndexIterator::new(iterator))) + } + None => { + let range = start_entry_id..end_entry_id; + + Some(Box::new(RegionWalRange::new(range, max_batch_bytes))) + } + } } #[cfg(test)] @@ -353,4 +419,69 @@ mod tests { assert_eq!(iter.peek(), None); assert_eq!(iter.next(), None); } + + #[test] + fn test_build_region_wal_index_iterator() { + let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5); + assert!(iterator.is_none()); + + let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5); + assert!(iterator.is_none()); + + let iterator = + build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5); + assert!(iterator.is_none()); + + let iterator = + build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5); + assert!(iterator.is_none()); + + let iterator = + build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5); + assert!(iterator.is_none()); + + let iterator = build_region_wal_index_iterator( + 1, + 1024, + Some((BTreeSet::from([512, 756]), 1024)), + 5, + 5, + ) + .unwrap(); + let iter = iterator + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(iter.iterator.len(), 1); + let vec_index = iter.iterator[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(vec_index.index, VecDeque::from([512, 756])); + + let iterator = build_region_wal_index_iterator( + 1, + 1024, + Some((BTreeSet::from([512, 756]), 1023)), + 5, + 5, + ) + .unwrap(); + let iter = iterator + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(iter.iterator.len(), 2); + let vec_index = iter.iterator[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(vec_index.index, VecDeque::from([512, 756])); + let wal_range = iter.iterator[1] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(wal_range.current_entry_id, 1023); + assert_eq!(wal_range.end_entry_id, 1024); + } } diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 69afa2e6b7ee..f084f20f022d 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -20,19 +20,20 @@ use common_telemetry::{debug, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; use futures::future::try_join_all; use futures_util::StreamExt; -use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; use snafu::{OptionExt, ResultExt}; use store_api::logstore::entry::{ Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry, }; use store_api::logstore::provider::{KafkaProvider, Provider}; -use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream}; +use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex}; use store_api::storage::RegionId; +use super::index::build_region_wal_index_iterator; use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; -use crate::kafka::index::GlobalIndexCollector; +use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer}; +use crate::kafka::index::{GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE}; use crate::kafka::producer::OrderedBatchProducerRef; use crate::kafka::util::record::{ convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE, @@ -205,6 +206,7 @@ impl LogStore for KafkaLogStore { &self, provider: &Provider, entry_id: EntryId, + index: Option, ) -> Result> { let provider = provider .as_kafka_provider() @@ -232,35 +234,41 @@ impl LogStore for KafkaLogStore { .await .context(GetOffsetSnafu { topic: &provider.topic, - })? - - 1; - // Reads entries with offsets in the range [start_offset, end_offset]. - let start_offset = entry_id as i64; - - debug!( - "Start reading entries in range [{}, {}] for ns {}", - start_offset, end_offset, provider - ); + })?; + + let region_indexes = if let (Some(index), Some(collector)) = + (index, self.client_manager.global_index_collector()) + { + collector + .read_remote_region_index(index.location_id, provider, index.region_id, entry_id) + .await? + } else { + None + }; - // Abort if there're no new entries. - // FIXME(niebayes): how come this case happens? - if start_offset > end_offset { - warn!( - "No new entries for ns {} in range [{}, {}]", - provider, start_offset, end_offset - ); + let Some(iterator) = build_region_wal_index_iterator( + entry_id, + end_offset as u64, + region_indexes, + self.max_batch_bytes, + MIN_BATCH_WINDOW_SIZE, + ) else { + let range = entry_id..end_offset as u64; + warn!("No new entries in range {:?} of ns {}", range, provider); return Ok(futures_util::stream::empty().boxed()); - } + }; - let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) - .with_max_batch_size(self.max_batch_bytes as i32) - .with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32) - .build(); + debug!("Reading entries with {:?} of ns {}", iterator, provider); - debug!( - "Built a stream consumer for ns {} to consume entries in range [{}, {}]", - provider, start_offset, end_offset - ); + // Safety: Must be ok. + let mut stream_consumer = ConsumerBuilder::default() + .client(client) + // Safety: checked before. + .buffer(RecordsBuffer::new(iterator)) + .max_batch_size(self.max_batch_bytes) + .max_wait_ms(self.consumer_wait_timeout.as_millis() as u32) + .build() + .unwrap(); // A buffer is used to collect records to construct a complete entry. let mut entry_records: HashMap> = HashMap::new(); @@ -511,7 +519,7 @@ mod tests { // 5 region assert_eq!(response.last_entry_ids.len(), 5); let got_entries = logstore - .read(&provider, 0) + .read(&provider, 0, None) .await .unwrap() .try_collect::>() @@ -584,7 +592,7 @@ mod tests { // 5 region assert_eq!(response.last_entry_ids.len(), 5); let got_entries = logstore - .read(&provider, 0) + .read(&provider, 0, None) .await .unwrap() .try_collect::>() diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 08f4019e0c9c..8e9ceec710cd 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -25,7 +25,7 @@ use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMo use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry}; use store_api::logstore::provider::{Provider, RaftEngineProvider}; -use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream}; +use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex}; use store_api::storage::RegionId; use crate::error::{ @@ -252,6 +252,7 @@ impl LogStore for RaftEngineLogStore { &self, provider: &Provider, entry_id: EntryId, + _index: Option, ) -> Result> { let ns = provider .as_raft_engine_provider() @@ -545,7 +546,7 @@ mod tests { } let mut entries = HashSet::with_capacity(1024); let mut s = logstore - .read(&Provider::raft_engine_provider(1), 0) + .read(&Provider::raft_engine_provider(1), 0, None) .await .unwrap(); while let Some(r) = s.next().await { @@ -578,7 +579,7 @@ mod tests { .await .is_ok()); let entries = logstore - .read(&Provider::raft_engine_provider(1), 1) + .read(&Provider::raft_engine_provider(1), 1, None) .await .unwrap() .collect::>() @@ -596,7 +597,7 @@ mod tests { let entries = collect_entries( logstore - .read(&Provider::raft_engine_provider(1), 1) + .read(&Provider::raft_engine_provider(1), 1, None) .await .unwrap(), ) @@ -682,7 +683,7 @@ mod tests { logstore.obsolete(&namespace, region_id, 100).await.unwrap(); assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap()); - let res = logstore.read(&namespace, 100).await.unwrap(); + let res = logstore.read(&namespace, 100, None).await.unwrap(); let mut vec = collect_entries(res).await; vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap()); assert_eq!(101, vec.first().unwrap().entry_id()); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 5a80cebb6534..88caf5f08d3e 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -90,6 +90,7 @@ impl UpgradeCandidateRegion { region_id, last_entry_id, wait_for_replay_timeout: Some(self.replay_timeout), + location_id: Some(ctx.persistent_ctx.from_peer.id), }) } diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index faf9f1d83132..576ca54cfa25 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -41,6 +41,7 @@ impl MetricEngineInner { RegionRequest::Catchup(RegionCatchupRequest { set_writable: req.set_writable, entry_id: None, + location_id: req.location_id, }), ) .await @@ -52,6 +53,7 @@ impl MetricEngineInner { RegionRequest::Catchup(RegionCatchupRequest { set_writable: req.set_writable, entry_id: req.entry_id, + location_id: req.location_id, }), ) .await diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 3d0a04017ef2..de72bb6128b0 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -94,6 +94,7 @@ async fn test_catchup_with_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: last_entry_id, + location_id: None, }), ) .await; @@ -125,6 +126,7 @@ async fn test_catchup_with_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, + location_id: None, }), ) .await; @@ -191,6 +193,7 @@ async fn test_catchup_with_incorrect_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: incorrect_last_entry_id, + location_id: None, }), ) .await @@ -207,6 +210,7 @@ async fn test_catchup_with_incorrect_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: incorrect_last_entry_id, + location_id: None, }), ) .await; @@ -255,6 +259,7 @@ async fn test_catchup_without_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: None, + location_id: None, }), ) .await; @@ -285,6 +290,7 @@ async fn test_catchup_without_last_entry_id() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + location_id: None, }), ) .await; @@ -354,6 +360,7 @@ async fn test_catchup_with_manifest_update() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: false, entry_id: None, + location_id: None, }), ) .await; @@ -390,6 +397,7 @@ async fn test_catchup_with_manifest_update() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + location_id: None, }), ) .await; @@ -411,6 +419,7 @@ async fn test_catchup_not_exist() { RegionRequest::Catchup(RegionCatchupRequest { set_writable: true, entry_id: None, + location_id: None, }), ) .await diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ba72ed7ccbf0..f20da8f3d6c2 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -313,7 +313,7 @@ impl RegionOpener { let wal_entry_reader = self .wal_entry_reader .take() - .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id)); + .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None)); let on_region_opened = wal.on_region_opened(); let object_store = self.object_store(®ion_options.storage)?.clone(); diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 7413f52b2c05..710c0ee3c807 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -30,7 +30,7 @@ use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; -use store_api::logstore::{AppendBatchResponse, LogStore}; +use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex}; use store_api::storage::RegionId; use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu}; @@ -102,15 +102,24 @@ impl Wal { &self, provider: &Provider, region_id: RegionId, + location_id: Option, ) -> Box { match provider { Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new( LogStoreRawEntryReader::new(self.store.clone()), )), - Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new( - LogStoreRawEntryReader::new(self.store.clone()), - region_id, - ))), + Provider::Kafka(_) => { + let reader = if let Some(location_id) = location_id { + LogStoreRawEntryReader::new(self.store.clone()) + .with_wal_index(WalIndex::new(region_id, location_id)) + } else { + LogStoreRawEntryReader::new(self.store.clone()) + }; + + Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new( + reader, region_id, + ))) + } } } diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 7436dec06a56..85a0c945b9fd 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -20,7 +20,7 @@ use futures::stream::BoxStream; use snafu::ResultExt; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; -use store_api::logstore::LogStore; +use store_api::logstore::{LogStore, WalIndex}; use store_api::storage::RegionId; use tokio_stream::StreamExt; @@ -38,11 +38,20 @@ pub(crate) trait RawEntryReader: Send + Sync { /// Implement the [RawEntryReader] for the [LogStore]. pub struct LogStoreRawEntryReader { store: Arc, + wal_index: Option, } impl LogStoreRawEntryReader { pub fn new(store: Arc) -> Self { - Self { store } + Self { + store, + wal_index: None, + } + } + + pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self { + self.wal_index = Some(wal_index); + self } } @@ -50,9 +59,10 @@ impl RawEntryReader for LogStoreRawEntryReader { fn read(&self, provider: &Provider, start_id: EntryId) -> Result> { let store = self.store.clone(); let provider = provider.clone(); + let wal_index = self.wal_index; let stream = try_stream!({ let mut stream = store - .read(&provider, start_id) + .read(&provider, start_id, wal_index) .await .map_err(BoxedError::new) .with_context(|_| error::ReadWalSnafu { @@ -119,7 +129,9 @@ mod tests { use futures::{stream, TryStreamExt}; use store_api::logstore::entry::{Entry, NaiveEntry}; - use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream}; + use store_api::logstore::{ + AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex, + }; use store_api::storage::RegionId; use super::*; @@ -149,6 +161,7 @@ mod tests { &self, _provider: &Provider, _id: EntryId, + _index: Option, ) -> Result, Self::Error> { Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())]))) } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index e01680ab17b6..6d16d72c1c0d 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -74,7 +74,9 @@ impl RegionWorkerLoop { let flushed_entry_id = region.version_control.current().last_entry_id; info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); let timer = Instant::now(); - let wal_entry_reader = self.wal.wal_entry_reader(®ion.provider, region_id); + let wal_entry_reader = + self.wal + .wal_entry_reader(®ion.provider, region_id, request.location_id); let on_region_opened = self.wal.on_region_opened(); let last_entry_id = replay_memtable( ®ion.provider, @@ -93,7 +95,8 @@ impl RegionWorkerLoop { ); if let Some(expected_last_entry_id) = request.entry_id { ensure!( - expected_last_entry_id == last_entry_id, + // The replayed last entry id may be greater than the `expected_last_entry_id`. + last_entry_id >= expected_last_entry_id, error::UnexpectedReplaySnafu { region_id, expected_last_entry_id, diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 32ab95f5c8d6..86a2263398dc 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -30,6 +30,22 @@ pub use crate::logstore::entry::Id as EntryId; use crate::logstore::provider::Provider; use crate::storage::RegionId; +// The information used to locate WAL index for the specified region. +#[derive(Debug, Clone, Copy)] +pub struct WalIndex { + pub region_id: RegionId, + pub location_id: u64, +} + +impl WalIndex { + pub fn new(region_id: RegionId, location_id: u64) -> Self { + Self { + region_id, + location_id, + } + } +} + /// `LogStore` serves as a Write-Ahead-Log for storage engine. #[async_trait::async_trait] pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { @@ -48,6 +64,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { &self, provider: &Provider, id: EntryId, + index: Option, ) -> Result, Self::Error>; /// Creates a new `Namespace` from the given ref. diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 1452fcbe6125..cad251988fe5 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -673,6 +673,8 @@ pub struct RegionCatchupRequest { /// The `entry_id` that was expected to reply to. /// `None` stands replaying to latest. pub entry_id: Option, + /// The hint for replaying memtable. + pub location_id: Option, } impl fmt::Display for RegionRequest {