diff --git a/benchmarks/src/wal_bench.rs b/benchmarks/src/wal_bench.rs index 10e88f99f37c..681dacfbb60e 100644 --- a/benchmarks/src/wal_bench.rs +++ b/benchmarks/src/wal_bench.rs @@ -28,6 +28,7 @@ use rand::distributions::{Alphanumeric, DistString, Uniform}; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; use serde::{Deserialize, Serialize}; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -210,7 +211,7 @@ impl From for Config { pub struct Region { id: RegionId, schema: Vec, - wal_options: WalOptions, + provider: Provider, next_sequence: AtomicU64, next_entry_id: AtomicU64, next_timestamp: AtomicI64, @@ -227,10 +228,14 @@ impl Region { num_rows: u32, rng_seed: u64, ) -> Self { + let provider = match wal_options { + WalOptions::RaftEngine => Provider::raft_engine_provider(id.as_u64()), + WalOptions::Kafka(opts) => Provider::kafka_provider(opts.topic), + }; Self { id, schema, - wal_options, + provider, next_sequence: AtomicU64::new(1), next_entry_id: AtomicU64::new(1), next_timestamp: AtomicI64::new(1655276557000), @@ -258,14 +263,14 @@ impl Region { self.id, self.next_entry_id.fetch_add(1, Ordering::Relaxed), &entry, - &self.wal_options, + &self.provider, ) .unwrap(); } /// Replays the region. pub async fn replay(&self, wal: &Arc>) { - let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap(); + let mut wal_stream = wal.scan(self.id, 0, &self.provider).unwrap(); while let Some(res) = wal_stream.next().await { let (_, entry) = res.unwrap(); metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 62fa9a5bf60b..4088c5236ca8 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -94,7 +94,7 @@ pub fn init_default_ut_logging() { env::var("UNITTEST_LOG_DIR").unwrap_or_else(|_| "/tmp/__unittest_logs".to_string()); let level = env::var("UNITTEST_LOG_LEVEL").unwrap_or_else(|_| - "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info".to_string() + "debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info,rskafka=info".to_string() ); let opts = LoggingOptions { dir: dir.clone(), diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 45449c9d65e8..280ce6410609 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -21,12 +21,18 @@ use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; -use crate::kafka::NamespaceImpl as KafkaNamespace; - #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))] + InvalidProvider { + #[snafu(implicit)] + location: Location, + expected: String, + actual: String, + }, + #[snafu(display("Failed to start log store gc task"))] StartGcTask { #[snafu(implicit)] @@ -170,34 +176,28 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Failed to produce records to Kafka, topic: {}, size: {}, limit: {}", - topic, - size, - limit, - ))] + #[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))] ProduceRecord { topic: String, size: usize, - limit: usize, #[snafu(implicit)] location: Location, #[snafu(source)] error: rskafka::client::producer::Error, }, - #[snafu(display("Failed to read a record from Kafka, ns: {}", ns))] + #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))] ConsumeRecord { - ns: KafkaNamespace, + topic: String, #[snafu(implicit)] location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to get the latest offset, ns: {}", ns))] + #[snafu(display("Failed to get the latest offset, topic: {}", topic))] GetOffset { - ns: KafkaNamespace, + topic: String, #[snafu(implicit)] location: Location, #[snafu(source)] diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index dc068f3b4b52..415cc53ddbce 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,17 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::mem::size_of; pub(crate) mod client_manager; pub mod log_store; pub(crate) mod util; -use std::fmt::Display; - use serde::{Deserialize, Serialize}; -use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; -use store_api::logstore::namespace::Namespace; -use store_api::storage::RegionId; +use store_api::logstore::entry::Id as EntryId; /// Kafka Namespace implementation. #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] @@ -31,18 +26,6 @@ pub struct NamespaceImpl { pub topic: String, } -impl Namespace for NamespaceImpl { - fn id(&self) -> u64 { - self.region_id - } -} - -impl Display for NamespaceImpl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[topic: {}, region: {}]", self.topic, self.region_id) - } -} - /// Kafka Entry implementation. #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { @@ -53,65 +36,3 @@ pub struct EntryImpl { /// The namespace used to identify and isolate log entries from different regions. pub ns: NamespaceImpl, } - -impl Entry for EntryImpl { - fn into_raw_entry(self) -> RawEntry { - RawEntry { - region_id: self.region_id(), - entry_id: self.id(), - data: self.data, - } - } - - fn data(&self) -> &[u8] { - &self.data - } - - fn id(&self) -> EntryId { - self.id - } - - fn region_id(&self) -> RegionId { - RegionId::from_u64(self.ns.region_id) - } - - fn estimated_size(&self) -> usize { - size_of::() + self.data.capacity() * size_of::() + self.ns.topic.capacity() - } -} - -impl Display for EntryImpl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Entry [ns: {}, id: {}, data_len: {}]", - self.ns, - self.id, - self.data.len() - ) - } -} - -#[cfg(test)] -mod tests { - use std::mem::size_of; - - use store_api::logstore::entry::Entry; - - use crate::kafka::{EntryImpl, NamespaceImpl}; - - #[test] - fn test_estimated_size() { - let entry = EntryImpl { - data: Vec::with_capacity(100), - id: 0, - ns: NamespaceImpl { - region_id: 0, - topic: String::with_capacity(10), - }, - }; - let expected = size_of::() + 100 * size_of::() + 10; - let got = entry.estimated_size(); - assert_eq!(expected, got); - } -} diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 1708efed1d09..81feaddb6627 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -27,6 +27,7 @@ use tokio::sync::RwLock; use crate::error::{ BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, }; +use crate::kafka::util::record::MIN_BATCH_SIZE; // Each topic only has one partition for now. // The `DEFAULT_PARTITION` refers to the index of the partition. @@ -48,7 +49,8 @@ pub(crate) struct Client { impl Client { /// Creates a Client from the raw client. pub(crate) fn new(raw_client: Arc, config: &DatanodeKafkaConfig) -> Self { - let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); + let record_aggregator = + RecordAggregator::new((config.max_batch_size.as_bytes() as usize).max(MIN_BATCH_SIZE)); let batch_producer = BatchProducerBuilder::new(raw_client.clone()) .with_compression(config.compression) .with_linger(config.linger) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 1a0f96b6587b..ceca6fc30bd7 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -17,21 +17,23 @@ use std::sync::Arc; use common_telemetry::{debug, warn}; use common_wal::config::kafka::DatanodeKafkaConfig; -use common_wal::options::WalOptions; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; -use snafu::ResultExt; -use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; -use store_api::logstore::entry_stream::SendableEntryStream; -use store_api::logstore::namespace::Id as NamespaceId; -use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; - -use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, IllegalSequenceSnafu, Result}; +use snafu::{OptionExt, ResultExt}; +use store_api::logstore::entry::{ + Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry, +}; +use store_api::logstore::provider::{KafkaProvider, Provider}; +use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream}; +use store_api::storage::RegionId; + +use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::util::offset::Offset; -use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer}; -use crate::kafka::{EntryImpl, NamespaceImpl}; +use crate::kafka::util::record::{ + maybe_emit_entry, remaining_entries, Record, RecordProducer, ESTIMATED_META_SIZE, +}; use crate::metrics; /// A log store backed by Kafka. @@ -52,41 +54,81 @@ impl KafkaLogStore { } } +fn build_entry( + data: &mut Vec, + entry_id: EntryId, + region_id: RegionId, + provider: &Provider, + max_data_size: usize, +) -> Entry { + if data.len() <= max_data_size { + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id, + entry_id, + data: std::mem::take(data), + }) + } else { + let parts = std::mem::take(data) + .chunks(max_data_size) + .map(|s| s.into()) + .collect::>(); + let num_parts = parts.len(); + + let mut headers = Vec::with_capacity(num_parts); + headers.push(MultiplePartHeader::First); + headers.extend((1..num_parts - 1).map(MultiplePartHeader::Middle)); + headers.push(MultiplePartHeader::Last); + + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id, + headers, + parts, + }) + } +} + #[async_trait::async_trait] impl LogStore for KafkaLogStore { type Error = Error; - type Entry = EntryImpl; - type Namespace = NamespaceImpl; - - /// Creates an entry of the associated Entry type. - fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry { - EntryImpl { - data: std::mem::take(data), - id: entry_id, - ns, - } - } - /// Appends an entry to the log store and returns a response containing the entry id of the appended entry. - async fn append(&self, entry: Self::Entry) -> Result { - let entry_id = RecordProducer::new(entry.ns.clone()) - .with_entries(vec![entry]) - .produce(&self.client_manager) - .await - .map(TryInto::try_into)??; - Ok(AppendResponse { - last_entry_id: entry_id, - }) + /// Creates an [Entry]. + fn entry( + &self, + data: &mut Vec, + entry_id: EntryId, + region_id: RegionId, + provider: &Provider, + ) -> Result { + provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + + let max_data_size = + self.client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE; + Ok(build_entry( + data, + entry_id, + region_id, + provider, + max_data_size, + )) } + // TODO(weny): refactor the writing. /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. - async fn append_batch(&self, entries: Vec) -> Result { + async fn append_batch(&self, entries: Vec) -> Result { metrics::METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL.inc(); metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by( entries .iter() - .map(EntryTrait::estimated_size) + .map(|entry| entry.estimated_size()) .sum::() as u64, ); let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer(); @@ -98,9 +140,17 @@ impl LogStore for KafkaLogStore { // Groups entries by region id and pushes them to an associated record producer. let mut producers = HashMap::with_capacity(entries.len()); for entry in entries { + let provider = entry + .provider() + .as_kafka_provider() + .context(error::InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: entry.provider().type_name(), + })? + .clone(); producers - .entry(entry.ns.region_id) - .or_insert_with(|| RecordProducer::new(entry.ns.clone())) + .entry(entry.region_id()) + .or_insert_with(|| RecordProducer::new(provider)) .push(entry); } @@ -122,20 +172,27 @@ impl LogStore for KafkaLogStore { Ok(AppendBatchResponse { last_entry_ids }) } - /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids - /// starting from `entry_id`. The generated entries will be filtered by the namespace. + /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids. + /// Returns entries belonging to `provider`, starting from `entry_id`. async fn read( &self, - ns: &Self::Namespace, + provider: &Provider, entry_id: EntryId, - ) -> Result> { + ) -> Result> { + let provider = provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + metrics::METRIC_KAFKA_READ_CALLS_TOTAL.inc(); let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer(); // Gets the client associated with the topic. let client = self .client_manager - .get_or_insert(&ns.topic) + .get_or_insert(&provider.topic) .await? .raw_client .clone(); @@ -147,14 +204,16 @@ impl LogStore for KafkaLogStore { let end_offset = client .get_offset(OffsetAt::Latest) .await - .context(GetOffsetSnafu { ns: ns.clone() })? + .context(GetOffsetSnafu { + topic: &provider.topic, + })? - 1; // Reads entries with offsets in the range [start_offset, end_offset]. let start_offset = Offset::try_from(entry_id)?.0; debug!( "Start reading entries in range [{}, {}] for ns {}", - start_offset, end_offset, ns + start_offset, end_offset, provider ); // Abort if there're no new entries. @@ -162,7 +221,7 @@ impl LogStore for KafkaLogStore { if start_offset > end_offset { warn!( "No new entries for ns {} in range [{}, {}]", - ns, start_offset, end_offset + provider, start_offset, end_offset ); return Ok(futures_util::stream::empty().boxed()); } @@ -174,20 +233,20 @@ impl LogStore for KafkaLogStore { debug!( "Built a stream consumer for ns {} to consume entries in range [{}, {}]", - ns, start_offset, end_offset + provider, start_offset, end_offset ); - // Key: entry id, Value: the records associated with the entry. - let mut entry_records: HashMap<_, Vec<_>> = HashMap::new(); - let ns_clone = ns.clone(); + // A buffer is used to collect records to construct a complete entry. + let mut entry_records: HashMap> = HashMap::new(); + let provider = provider.clone(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { // Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset. // The `RecordAndOffset` contains the record data and its start offset. // The high watermark offset is the offset of the last record plus one. let (record_and_offset, high_watermark) = - consume_result.with_context(|_| ConsumeRecordSnafu { - ns: ns_clone.clone(), + consume_result.context(ConsumeRecordSnafu { + topic: &provider.topic, })?; let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset); @@ -195,37 +254,35 @@ impl LogStore for KafkaLogStore { .inc_by(kafka_record.approximate_size() as u64); debug!( - "Read a record at offset {} for ns {}, high watermark: {}", - offset, ns_clone, high_watermark + "Read a record at offset {} for topic {}, high watermark: {}", + offset, provider.topic, high_watermark ); // Ignores no-op records. if kafka_record.value.is_none() { - if check_termination(offset, end_offset, &entry_records)? { + if check_termination(offset, end_offset) { + if let Some(entries) = remaining_entries(&provider, &mut entry_records) { + yield Ok(entries); + } break; } continue; } - // Filters records by namespace. let record = Record::try_from(kafka_record)?; - if record.meta.ns != ns_clone { - if check_termination(offset, end_offset, &entry_records)? { - break; - } - continue; - } - // Tries to construct an entry from records consumed so far. - if let Some(mut entry) = maybe_emit_entry(record, &mut entry_records)? { + if let Some(mut entry) = maybe_emit_entry(&provider, record, &mut entry_records)? { // We don't rely on the EntryId generated by mito2. // Instead, we use the offset return from Kafka as EntryId. // Therefore, we MUST overwrite the EntryId with RecordOffset. - entry.id = offset as u64; + entry.set_entry_id(offset as u64); yield Ok(vec![entry]); } - if check_termination(offset, end_offset, &entry_records)? { + if check_termination(offset, end_offset) { + if let Some(entries) = remaining_entries(&provider, &mut entry_records) { + yield Ok(entries); + } break; } } @@ -233,39 +290,25 @@ impl LogStore for KafkaLogStore { Ok(Box::pin(stream)) } - /// Creates a namespace of the associated Namespace type. - fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - // Safety: upon start, the datanode checks the consistency of the wal providers in the wal config of the - // datanode and that of the metasrv. Therefore, the wal options passed into the kafka log store - // must be of type WalOptions::Kafka. - let WalOptions::Kafka(kafka_options) = wal_options else { - unreachable!() - }; - NamespaceImpl { - region_id: ns_id, - topic: kafka_options.topic.clone(), - } - } - /// Creates a new `Namespace` from the given ref. - async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { + async fn create_namespace(&self, _provider: &Provider) -> Result<()> { Ok(()) } /// Deletes an existing `Namespace` specified by the given ref. - async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { + async fn delete_namespace(&self, _provider: &Provider) -> Result<()> { Ok(()) } /// Lists all existing namespaces. - async fn list_namespaces(&self) -> Result> { + async fn list_namespaces(&self) -> Result> { Ok(vec![]) } /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> { + async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> { Ok(()) } @@ -275,227 +318,249 @@ impl LogStore for KafkaLogStore { } } -fn check_termination( - offset: i64, - end_offset: i64, - entry_records: &HashMap>, -) -> Result { +fn check_termination(offset: i64, end_offset: i64) -> bool { // Terminates the stream if the entry with the end offset was read. if offset >= end_offset { debug!("Stream consumer terminates at offset {}", offset); // There must have no records when the stream terminates. - if !entry_records.is_empty() { - return IllegalSequenceSnafu { - error: "Found records leftover", - } - .fail(); - } - Ok(true) + true } else { - Ok(false) + false } } #[cfg(test)] mod tests { + + use std::assert_matches::assert_matches; + use std::collections::HashMap; + use common_base::readable_size::ReadableSize; - use rand::seq::IteratorRandom; - - use super::*; - use crate::test_util::kafka::{ - create_topics, entries_with_random_data, new_namespace, EntryBuilder, - }; - - // Stores test context for a region. - struct RegionContext { - ns: NamespaceImpl, - entry_builder: EntryBuilder, - expected: Vec, - flushed_entry_id: EntryId, + use common_telemetry::info; + use common_telemetry::tracing::warn; + use common_wal::config::kafka::DatanodeKafkaConfig; + use futures::TryStreamExt; + use rand::prelude::SliceRandom; + use rand::Rng; + use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; + use store_api::logstore::provider::Provider; + use store_api::logstore::LogStore; + use store_api::storage::RegionId; + + use super::build_entry; + use crate::kafka::log_store::KafkaLogStore; + + #[test] + fn test_build_naive_entry() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let region_id = RegionId::new(1, 1); + let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 120); + + assert_eq!( + entry.into_naive_entry().unwrap(), + NaiveEntry { + provider, + region_id, + entry_id: 1, + data: vec![1; 100] + } + ) } - /// Prepares for a test in that a log store is constructed and a collection of topics is created. - async fn prepare( - test_name: &str, - num_topics: usize, - broker_endpoints: Vec, - ) -> (KafkaLogStore, Vec) { - let topics = create_topics( - num_topics, - |i| format!("{test_name}_{}_{}", i, uuid::Uuid::new_v4()), - &broker_endpoints, + #[test] + fn test_build_into_multiple_part_entry() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let region_id = RegionId::new(1, 1); + let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 50); + + assert_eq!( + entry.into_multiple_part_entry().unwrap(), + MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id: 1, + headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last], + parts: vec![vec![1; 50], vec![1; 50]], + } + ); + + let region_id = RegionId::new(1, 1); + let entry = build_entry(&mut vec![1; 100], 1, region_id, &provider, 21); + + assert_eq!( + entry.into_multiple_part_entry().unwrap(), + MultiplePartEntry { + provider, + region_id, + entry_id: 1, + headers: vec![ + MultiplePartHeader::First, + MultiplePartHeader::Middle(1), + MultiplePartHeader::Middle(2), + MultiplePartHeader::Middle(3), + MultiplePartHeader::Last + ], + parts: vec![ + vec![1; 21], + vec![1; 21], + vec![1; 21], + vec![1; 21], + vec![1; 16] + ], + } ) - .await; + } + + fn generate_entries( + logstore: &KafkaLogStore, + provider: &Provider, + num_entries: usize, + region_id: RegionId, + data_len: usize, + ) -> Vec { + (0..num_entries) + .map(|_| { + let mut data: Vec = (0..data_len).map(|_| rand::random::()).collect(); + // Always set `entry_id` to 0, the real entry_id will be set during the read. + logstore.entry(&mut data, 0, region_id, provider).unwrap() + }) + .collect() + } + #[tokio::test] + async fn test_append_batch_basic() { + common_telemetry::init_default_ut_logging(); + let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { + warn!("The endpoints is empty, skipping the test 'test_append_batch_basic'"); + return; + }; + let broker_endpoints = broker_endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); let config = DatanodeKafkaConfig { broker_endpoints, max_batch_size: ReadableSize::kb(32), ..Default::default() }; let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let topic_name = uuid::Uuid::new_v4().to_string(); + let provider = Provider::kafka_provider(topic_name); + let region_entries = (0..5) + .map(|i| { + let region_id = RegionId::new(1, i); + ( + region_id, + generate_entries(&logstore, &provider, 20, region_id, 1024), + ) + }) + .collect::>>(); - // Appends a no-op record to each topic. - for topic in topics.iter() { - let last_entry_id = logstore - .append(EntryImpl { - data: vec![], - id: 0, - ns: new_namespace(topic, 0), - }) - .await - .unwrap() - .last_entry_id; - assert_eq!(last_entry_id, 0); - } - - (logstore, topics) - } - - /// Creates a vector containing indexes of all regions if the `all` is true. - /// Otherwise, creates a subset of the indexes. The cardinality of the subset - /// is nearly a quarter of that of the universe set. - fn all_or_subset(all: bool, num_regions: usize) -> Vec { - assert!(num_regions > 0); - let amount = if all { - num_regions - } else { - (num_regions / 4).max(1) - }; - (0..num_regions as u64).choose_multiple(&mut rand::thread_rng(), amount) - } + let mut all_entries = region_entries + .values() + .flatten() + .cloned() + .collect::>(); + all_entries.shuffle(&mut rand::thread_rng()); - /// Builds entries for regions specified by `which`. Builds large entries if `large` is true. - /// Returns the aggregated entries. - fn build_entries( - region_contexts: &mut HashMap, - which: &[u64], - large: bool, - ) -> Vec { - let mut aggregated = Vec::with_capacity(which.len()); - for region_id in which { - let ctx = region_contexts.get_mut(region_id).unwrap(); - // Builds entries for the region. - ctx.expected = if !large { - entries_with_random_data(3, &ctx.entry_builder) - } else { - // Builds a large entry of size 256KB which is way greater than the configured `max_batch_size` which is 32KB. - let large_entry = ctx.entry_builder.with_data([b'1'; 256 * 1024]); - vec![large_entry] - }; - // Aggregates entries of all regions. - aggregated.push(ctx.expected.clone()); + let response = logstore.append_batch(all_entries.clone()).await.unwrap(); + // 5 region + assert_eq!(response.last_entry_ids.len(), 5); + let got_entries = logstore + .read(&provider, 0) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .into_iter() + .flatten() + .collect::>(); + for (region_id, _) in region_entries { + let expected_entries = all_entries + .iter() + .filter(|entry| entry.region_id() == region_id) + .cloned() + .collect::>(); + let mut actual_entries = got_entries + .iter() + .filter(|entry| entry.region_id() == region_id) + .cloned() + .collect::>(); + actual_entries + .iter_mut() + .for_each(|entry| entry.set_entry_id(0)); + assert_eq!(expected_entries, actual_entries); } - aggregated.into_iter().flatten().collect() } - /// Starts a test with: - /// * `test_name` - The name of the test. - /// * `num_topics` - Number of topics to be created in the preparation phase. - /// * `num_regions` - Number of regions involved in the test. - /// * `num_appends` - Number of append operations to be performed. - /// * `all` - All regions will be involved in an append operation if `all` is true. Otherwise, - /// an append operation will only randomly choose a subset of regions. - /// * `large` - Builds large entries for each region is `large` is true. - async fn test_with( - test_name: &str, - num_topics: usize, - num_regions: usize, - num_appends: usize, - all: bool, - large: bool, - ) { + #[tokio::test] + async fn test_append_batch_basic_large() { + common_telemetry::init_default_ut_logging(); let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { - warn!("The endpoints is empty, skipping the test {test_name}"); + warn!("The endpoints is empty, skipping the test 'test_append_batch_basic_large'"); return; }; + let data_size_kb = rand::thread_rng().gen_range(9..31usize); + info!("Entry size: {}Ki", data_size_kb); let broker_endpoints = broker_endpoints .split(',') .map(|s| s.trim().to_string()) .collect::>(); - - let (logstore, topics) = prepare(test_name, num_topics, broker_endpoints).await; - let mut region_contexts = (0..num_regions) + let config = DatanodeKafkaConfig { + broker_endpoints, + max_batch_size: ReadableSize::kb(8), + ..Default::default() + }; + let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + let topic_name = uuid::Uuid::new_v4().to_string(); + let provider = Provider::kafka_provider(topic_name); + let region_entries = (0..5) .map(|i| { - let topic = &topics[i % topics.len()]; - let ns = new_namespace(topic, i as u64); - let entry_builder = EntryBuilder::new(ns.clone()); + let region_id = RegionId::new(1, i); ( - i as u64, - RegionContext { - ns, - entry_builder, - expected: Vec::new(), - flushed_entry_id: 0, - }, + region_id, + generate_entries(&logstore, &provider, 20, region_id, data_size_kb * 1024), ) }) - .collect(); - - for _ in 0..num_appends { - // Appends entries for a subset of regions. - let which = all_or_subset(all, num_regions); - let entries = build_entries(&mut region_contexts, &which, large); - let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; - - // Reads entries for regions and checks for each region that the gotten entries are identical with the expected ones. - for region_id in which { - let ctx = region_contexts.get_mut(®ion_id).unwrap(); - let stream = logstore - .read(&ctx.ns, ctx.flushed_entry_id + 1) - .await - .unwrap(); - let mut got = stream - .collect::>() - .await - .into_iter() - .flat_map(|x| x.unwrap()) - .collect::>(); - //FIXME(weny): https://github.com/GreptimeTeam/greptimedb/issues/3152 - ctx.expected.iter_mut().for_each(|entry| entry.id = 0); - got.iter_mut().for_each(|entry| entry.id = 0); - assert_eq!(ctx.expected, got); - } + .collect::>>(); - // Simulates a flush for regions. - for (region_id, last_entry_id) in last_entry_ids { - let ctx = region_contexts.get_mut(®ion_id).unwrap(); - ctx.flushed_entry_id = last_entry_id; - } + let mut all_entries = region_entries + .values() + .flatten() + .cloned() + .collect::>(); + assert_matches!(all_entries[0], Entry::MultiplePart(_)); + all_entries.shuffle(&mut rand::thread_rng()); + + let response = logstore.append_batch(all_entries.clone()).await.unwrap(); + // 5 region + assert_eq!(response.last_entry_ids.len(), 5); + let got_entries = logstore + .read(&provider, 0) + .await + .unwrap() + .try_collect::>() + .await + .unwrap() + .into_iter() + .flatten() + .collect::>(); + for (region_id, _) in region_entries { + let expected_entries = all_entries + .iter() + .filter(|entry| entry.region_id() == region_id) + .cloned() + .collect::>(); + let mut actual_entries = got_entries + .iter() + .filter(|entry| entry.region_id() == region_id) + .cloned() + .collect::>(); + actual_entries + .iter_mut() + .for_each(|entry| entry.set_entry_id(0)); + assert_eq!(expected_entries, actual_entries); } } - - /// Appends entries for one region and checks all entries can be read successfully. - #[tokio::test] - async fn test_one_region() { - test_with("test_one_region", 1, 1, 1, true, false).await; - } - - /// Appends entries for multiple regions and checks entries for each region can be read successfully. - /// A topic is assigned only a single region. - #[tokio::test] - async fn test_multi_regions_disjoint() { - test_with("test_multi_regions_disjoint", 5, 5, 1, true, false).await; - } - - /// Appends entries for multiple regions and checks entries for each region can be read successfully. - /// A topic is assigned multiple regions. - #[tokio::test] - async fn test_multi_regions_overlapped() { - test_with("test_multi_regions_overlapped", 5, 20, 1, true, false).await; - } - - /// Appends entries for multiple regions and checks entries for each region can be read successfully. - /// A topic may be assigned multiple regions. The append operation repeats for a several iterations. - /// Each append operation will only append entries for a subset of randomly chosen regions. - #[tokio::test] - async fn test_multi_appends() { - test_with("test_multi_appends", 5, 20, 3, false, false).await; - } - - /// Appends large entries for multiple regions and checks entries for each region can be read successfully. - /// A topic may be assigned multiple regions. - #[tokio::test] - async fn test_append_large_entries() { - test_with("test_append_large_entries", 5, 20, 3, true, true).await; - } } diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index e2035318c4c7..fa6f77171645 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -13,10 +13,14 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use rskafka::record::Record as KafkaRecord; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; +use store_api::logstore::provider::{KafkaProvider, Provider}; +use store_api::storage::RegionId; use crate::error::{ DecodeJsonSnafu, EmptyEntriesSnafu, EncodeJsonSnafu, GetClientSnafu, IllegalSequenceSnafu, @@ -24,7 +28,7 @@ use crate::error::{ }; use crate::kafka::client_manager::ClientManagerRef; use crate::kafka::util::offset::Offset; -use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; +use crate::kafka::{EntryId, NamespaceImpl}; use crate::metrics; /// The current version of Record. @@ -32,7 +36,10 @@ pub(crate) const VERSION: u32 = 0; /// The estimated size in bytes of a serialized RecordMeta. /// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_size - ESTIMATED_META_SIZE. -const ESTIMATED_META_SIZE: usize = 256; +pub(crate) const ESTIMATED_META_SIZE: usize = 256; + +/// The minimum batch size +pub(crate) const MIN_BATCH_SIZE: usize = 4 * 1024; /// The type of a record. /// @@ -110,43 +117,25 @@ impl TryFrom for Record { } } -impl From> for EntryImpl { - fn from(records: Vec) -> Self { - let entry_id = records[0].meta.entry_id; - let ns = records[0].meta.ns.clone(); - let data = records.into_iter().flat_map(|record| record.data).collect(); - EntryImpl { - data, - id: entry_id, - ns, - } - } -} - /// Produces a record to a kafka topic. pub(crate) struct RecordProducer { - /// The namespace of the entries. - ns: NamespaceImpl, + /// The provide of the entries. + provider: Arc, /// Entries are buffered before being built into a record. - entries: Vec, + entries: Vec, } impl RecordProducer { /// Creates a new producer for producing entries with the given namespace. - pub(crate) fn new(ns: NamespaceImpl) -> Self { + pub(crate) fn new(provider: Arc) -> Self { Self { - ns, + provider, entries: Vec::new(), } } - /// Populates the entry buffer with the given entries. - pub(crate) fn with_entries(self, entries: Vec) -> Self { - Self { entries, ..self } - } - /// Pushes an entry into the entry buffer. - pub(crate) fn push(&mut self, entry: EntryImpl) { + pub(crate) fn push(&mut self, entry: Entry) { self.entries.push(entry); } @@ -158,11 +147,11 @@ impl RecordProducer { // Gets the producer in which a record buffer is maintained. let producer = client_manager - .get_or_insert(&self.ns.topic) + .get_or_insert(&self.provider.topic) .await .map_err(|e| { GetClientSnafu { - topic: &self.ns.topic, + topic: &self.provider.topic, error: e.to_string(), } .build() @@ -171,10 +160,8 @@ impl RecordProducer { // Stores the offset of the last successfully produced record. let mut last_offset = None; - let max_record_size = - client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE; for entry in self.entries { - for record in build_records(entry, max_record_size) { + for record in convert_to_records(entry) { let kafka_record = KafkaRecord::try_from(record)?; metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc(); @@ -187,9 +174,8 @@ impl RecordProducer { .await .map(Offset) .with_context(|_| ProduceRecordSnafu { - topic: &self.ns.topic, + topic: &self.provider.topic, size: kafka_record.approximate_size(), - limit: max_record_size, })?; last_offset = Some(offset); } @@ -199,100 +185,188 @@ impl RecordProducer { } } -fn record_type(seq: usize, num_records: usize) -> RecordType { - if seq == 0 { - RecordType::First - } else if seq == num_records - 1 { - RecordType::Last - } else { - RecordType::Middle(seq) - } -} - -fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec { - if entry.data.len() <= max_record_size { - let record = Record { +fn convert_to_records(entry: Entry) -> Vec { + match entry { + Entry::Naive(entry) => vec![Record { meta: RecordMeta { version: VERSION, tp: RecordType::Full, - entry_id: entry.id, - ns: entry.ns, + // TODO(weny): refactor the record meta. + entry_id: 0, + ns: NamespaceImpl { + region_id: entry.region_id.as_u64(), + // TODO(weny): refactor the record meta. + topic: String::new(), + }, }, data: entry.data, + }], + Entry::MultiplePart(entry) => { + let mut entries = Vec::with_capacity(entry.parts.len()); + + for (idx, part) in entry.parts.into_iter().enumerate() { + let tp = match entry.headers[idx] { + MultiplePartHeader::First => RecordType::First, + MultiplePartHeader::Middle(i) => RecordType::Middle(i), + MultiplePartHeader::Last => RecordType::Last, + }; + entries.push(Record { + meta: RecordMeta { + version: VERSION, + tp, + // TODO(weny): refactor the record meta. + entry_id: 0, + ns: NamespaceImpl { + region_id: entry.region_id.as_u64(), + topic: String::new(), + }, + }, + data: part, + }) + } + entries + } + } +} + +fn convert_to_naive_entry(provider: Arc, record: Record) -> Entry { + let region_id = RegionId::from_u64(record.meta.ns.region_id); + + Entry::Naive(NaiveEntry { + provider: Provider::Kafka(provider), + region_id, + // TODO(weny): should be the offset in the topic + entry_id: record.meta.entry_id, + data: record.data, + }) +} + +fn convert_to_multiple_entry( + provider: Arc, + region_id: RegionId, + records: Vec, +) -> Entry { + let mut headers = Vec::with_capacity(records.len()); + let mut parts = Vec::with_capacity(records.len()); + + for record in records { + let header = match record.meta.tp { + RecordType::Full => unreachable!(), + RecordType::First => MultiplePartHeader::First, + RecordType::Middle(i) => MultiplePartHeader::Middle(i), + RecordType::Last => MultiplePartHeader::Last, }; - return vec![record]; + headers.push(header); + parts.push(record.data); } - let chunks = entry.data.chunks(max_record_size); - let num_chunks = chunks.len(); - chunks - .enumerate() - .map(|(i, chunk)| Record { - meta: RecordMeta { - version: VERSION, - tp: record_type(i, num_chunks), - entry_id: entry.id, - ns: entry.ns.clone(), - }, - data: chunk.to_vec(), - }) - .collect() + Entry::MultiplePart(MultiplePartEntry { + provider: Provider::Kafka(provider), + region_id, + // TODO(weny): should be the offset in the topic + entry_id: 0, + headers, + parts, + }) } -pub fn maybe_emit_entry( +/// Constructs entries from `buffered_records` +pub fn remaining_entries( + provider: &Arc, + buffered_records: &mut HashMap>, +) -> Option> { + if buffered_records.is_empty() { + None + } else { + let mut entries = Vec::with_capacity(buffered_records.len()); + for (region_id, records) in buffered_records.drain() { + entries.push(convert_to_multiple_entry( + provider.clone(), + region_id, + records, + )); + } + Some(entries) + } +} + +/// For type of [Entry::Naive] Entry: +/// - Emits a [RecordType::Full] type record immediately. +/// +/// For type of [Entry::MultiplePart] Entry: +/// - Emits a complete or incomplete [Entry] while the next same [RegionId] record arrives. +/// +/// **Incomplete Entry:** +/// If the records arrive in the following order, it emits **the incomplete [Entry]** when the next record arrives. +/// - **[RecordType::First], [RecordType::Middle]**, [RecordType::First] +/// - **[RecordType::Middle]**, [RecordType::First] +/// - **[RecordType::Last]** +pub(crate) fn maybe_emit_entry( + provider: &Arc, record: Record, - entry_records: &mut HashMap>, -) -> Result> { + buffered_records: &mut HashMap>, +) -> Result> { let mut entry = None; match record.meta.tp { - RecordType::Full => { - entry = Some(EntryImpl::from(vec![record])); - } + RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)), RecordType::First => { - ensure!( - !entry_records.contains_key(&record.meta.entry_id), - IllegalSequenceSnafu { - error: "First record must be the first" - } - ); - entry_records.insert(record.meta.entry_id, vec![record]); + let region_id = record.meta.ns.region_id.into(); + if let Some(records) = buffered_records.insert(region_id, vec![record]) { + // Incomplete entry + entry = Some(convert_to_multiple_entry( + provider.clone(), + region_id, + records, + )) + } } RecordType::Middle(seq) => { - let prefix = - entry_records - .get_mut(&record.meta.entry_id) - .context(IllegalSequenceSnafu { - error: "Middle record must not be the first", - })?; - // Safety: the records are guaranteed not empty if the key exists. - let last_record = prefix.last().unwrap(); - let legal = match last_record.meta.tp { - // Legal if this record follows a First record. - RecordType::First => seq == 1, - // Legal if this record follows a Middle record just prior to this record. - RecordType::Middle(last_seq) => last_seq + 1 == seq, - // Illegal sequence. - _ => false, - }; - ensure!( - legal, - IllegalSequenceSnafu { - error: "Illegal prefix for a Middle record" - } - ); + let region_id = record.meta.ns.region_id.into(); + let records = buffered_records.entry(region_id).or_default(); + + // Only validate complete entries. + if !records.is_empty() { + // Safety: the records are guaranteed not empty if the key exists. + let last_record = records.last().unwrap(); + let legal = match last_record.meta.tp { + // Legal if this record follows a First record. + RecordType::First => seq == 1, + // Legal if this record follows a Middle record just prior to this record. + RecordType::Middle(last_seq) => last_seq + 1 == seq, + // Illegal sequence. + _ => false, + }; + ensure!( + legal, + IllegalSequenceSnafu { + error: format!( + "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}", + last_record.meta.tp, + record.meta.tp + ) + } + ); + } - prefix.push(record); + records.push(record); } RecordType::Last => { - // There must have a sequence prefix before a Last record is read. - let mut records = - entry_records - .remove(&record.meta.entry_id) - .context(IllegalSequenceSnafu { - error: "Missing prefix for a Last record", - })?; - records.push(record); - entry = Some(EntryImpl::from(records)); + let region_id = record.meta.ns.region_id.into(); + if let Some(mut records) = buffered_records.remove(®ion_id) { + records.push(record); + entry = Some(convert_to_multiple_entry( + provider.clone(), + region_id, + records, + )) + } else { + // Incomplete entry + entry = Some(convert_to_multiple_entry( + provider.clone(), + region_id, + vec![record], + )) + } } } Ok(entry) @@ -300,278 +374,141 @@ pub fn maybe_emit_entry( #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::sync::Arc; - use common_base::readable_size::ReadableSize; - use common_wal::config::kafka::DatanodeKafkaConfig; - use common_wal::test_util::run_test_with_kafka_wal; - use uuid::Uuid; - use super::*; - use crate::kafka::client_manager::ClientManager; - - // Implements some utility methods for testing. - impl Default for Record { - fn default() -> Self { - Self { - meta: RecordMeta { - version: VERSION, - tp: RecordType::Full, - ns: NamespaceImpl { - region_id: 0, - topic: "greptimedb_wal_topic".to_string(), - }, - entry_id: 0, - }, - data: Vec::new(), - } - } - } - - impl Record { - /// Overrides tp. - fn with_tp(&self, tp: RecordType) -> Self { - Self { - meta: RecordMeta { - tp, - ..self.meta.clone() - }, - ..self.clone() - } - } - - /// Overrides data with the given data. - fn with_data(&self, data: &[u8]) -> Self { - Self { - data: data.to_vec(), - ..self.clone() - } - } - - /// Overrides entry id. - fn with_entry_id(&self, entry_id: EntryId) -> Self { - Self { - meta: RecordMeta { - entry_id, - ..self.meta.clone() - }, - ..self.clone() - } - } - - /// Overrides namespace. - fn with_ns(&self, ns: NamespaceImpl) -> Self { - Self { - meta: RecordMeta { ns, ..self.meta }, - ..self.clone() - } - } - } - - fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { - EntryImpl { - data: data.as_ref().to_vec(), - id: entry_id, - ns, - } - } - - /// Tests that the `build_records` works as expected. - #[test] - fn test_build_records() { - let max_record_size = 128; - - // On a small entry. - let ns = NamespaceImpl { - region_id: 1, - topic: "greptimedb_wal_topic".to_string(), - }; - let entry = new_test_entry([b'1'; 100], 0, ns.clone()); - let records = build_records(entry.clone(), max_record_size); - assert!(records.len() == 1); - assert_eq!(entry.data, records[0].data); - - // On a large entry. - let entry = new_test_entry([b'1'; 150], 0, ns.clone()); - let records = build_records(entry.clone(), max_record_size); - assert!(records.len() == 2); - assert_eq!(&records[0].data, &[b'1'; 128]); - assert_eq!(&records[1].data, &[b'1'; 22]); - - // On a way-too large entry. - let entry = new_test_entry([b'1'; 5000], 0, ns.clone()); - let records = build_records(entry.clone(), max_record_size); - let matched = entry - .data - .chunks(max_record_size) - .enumerate() - .all(|(i, chunk)| records[i].data == chunk); - assert!(matched); - } + use crate::error; - /// Tests that Record and KafkaRecord are able to be converted back and forth. - #[test] - fn test_record_conversion() { - let record = Record { + fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec) -> Record { + Record { meta: RecordMeta { version: VERSION, - tp: RecordType::Full, - entry_id: 1, + tp, ns: NamespaceImpl { - region_id: 1, + region_id, topic: "greptimedb_wal_topic".to_string(), }, + entry_id, }, - data: b"12345".to_vec(), - }; - let kafka_record: KafkaRecord = record.clone().try_into().unwrap(); - let got = Record::try_from(kafka_record).unwrap(); - assert_eq!(record, got); + data, + } } - /// Tests that the reconstruction of an entry works as expected. #[test] - fn test_reconstruct_entry() { - let template = Record::default(); - let records = vec![ - template.with_data(b"111").with_tp(RecordType::First), - template.with_data(b"222").with_tp(RecordType::Middle(1)), - template.with_data(b"333").with_tp(RecordType::Last), - ]; - let entry = EntryImpl::from(records.clone()); - assert_eq!(records[0].meta.entry_id, entry.id); - assert_eq!(records[0].meta.ns, entry.ns); + fn test_maybe_emit_entry_emit_naive_entry() { + let provider = Arc::new(KafkaProvider::new("my_topic".to_string())); + let region_id = RegionId::new(1, 1); + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]); + let entry = maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .unwrap(); assert_eq!( - entry.data, - records - .into_iter() - .flat_map(|record| record.data) - .collect::>() + entry, + Entry::Naive(NaiveEntry { + provider: Provider::Kafka(provider), + region_id, + entry_id: 1, + data: vec![1; 100] + }) ); } - /// Tests that `maybe_emit_entry` works as expected. - /// This test does not check for illegal record sequences since they're already tested in the `test_check_records` test. #[test] - fn test_maybe_emit_entry() { - let ns = NamespaceImpl { - region_id: 1, - topic: "greptimedb_wal_topic".to_string(), - }; - let template = Record::default().with_ns(ns); - let mut entry_records = HashMap::from([ - ( - 1, - vec![template.with_entry_id(1).with_tp(RecordType::First)], - ), - ( - 2, - vec![template.with_entry_id(2).with_tp(RecordType::First)], - ), - ( - 3, - vec![ - template.with_entry_id(3).with_tp(RecordType::First), - template.with_entry_id(3).with_tp(RecordType::Middle(1)), - ], - ), - ]); - - // A Full record arrives. - let got = maybe_emit_entry( - template.with_entry_id(0).with_tp(RecordType::Full), - &mut entry_records, - ) - .unwrap(); - assert!(got.is_some()); - - // A First record arrives with no prefix. - let got = maybe_emit_entry( - template.with_entry_id(0).with_tp(RecordType::First), - &mut entry_records, - ) - .unwrap(); - assert!(got.is_none()); - - // A First record arrives with some prefix. - let got = maybe_emit_entry( - template.with_entry_id(1).with_tp(RecordType::First), - &mut entry_records, - ); - assert!(got.is_err()); - - // A Middle record arrives with legal prefix (First). - let got = maybe_emit_entry( - template.with_entry_id(2).with_tp(RecordType::Middle(1)), - &mut entry_records, - ) - .unwrap(); - assert!(got.is_none()); - - // A Middle record arrives with legal prefix (Middle). - let got = maybe_emit_entry( - template.with_entry_id(2).with_tp(RecordType::Middle(2)), - &mut entry_records, - ) - .unwrap(); - assert!(got.is_none()); - - // A Middle record arrives with illegal prefix. - let got = maybe_emit_entry( - template.with_entry_id(2).with_tp(RecordType::Middle(1)), - &mut entry_records, + fn test_maybe_emit_entry_emit_incomplete_entry() { + let provider = Arc::new(KafkaProvider::new("my_topic".to_string())); + let region_id = RegionId::new(1, 1); + // `First` overwrite `First` + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]); + assert!(maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .is_none()); + let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]); + let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .unwrap(); + + assert_eq!( + incomplete_entry, + Entry::MultiplePart(MultiplePartEntry { + provider: Provider::Kafka(provider.clone()), + region_id, + // TODO(weny): always be 0. + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }) ); - assert!(got.is_err()); - // A Middle record arrives with no prefix. - let got = maybe_emit_entry( - template.with_entry_id(22).with_tp(RecordType::Middle(1)), - &mut entry_records, + // `Last` overwrite `None` + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]); + let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .unwrap(); + + assert_eq!( + incomplete_entry, + Entry::MultiplePart(MultiplePartEntry { + provider: Provider::Kafka(provider.clone()), + region_id, + // TODO(weny): always be 0. + entry_id: 0, + headers: vec![MultiplePartHeader::Last], + parts: vec![vec![1; 100]], + }) ); - assert!(got.is_err()); - // A Last record arrives with no prefix. - let got = maybe_emit_entry( - template.with_entry_id(33).with_tp(RecordType::Last), - &mut entry_records, + // `First` overwrite `Middle(0)` + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]); + assert!(maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .is_none()); + let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]); + let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .unwrap(); + + assert_eq!( + incomplete_entry, + Entry::MultiplePart(MultiplePartEntry { + provider: Provider::Kafka(provider), + region_id, + // TODO(weny): always be 0. + entry_id: 0, + headers: vec![MultiplePartHeader::Middle(0)], + parts: vec![vec![1; 100]], + }) ); - assert!(got.is_err()); - - // A Last record arrives with legal prefix. - let got = maybe_emit_entry( - template.with_entry_id(3).with_tp(RecordType::Last), - &mut entry_records, - ) - .unwrap(); - assert!(got.is_some()); - - // Check state. - assert_eq!(entry_records.len(), 3); - assert_eq!(entry_records[&0].len(), 1); - assert_eq!(entry_records[&1].len(), 1); - assert_eq!(entry_records[&2].len(), 3); } - #[tokio::test] - async fn test_produce_large_entry() { - run_test_with_kafka_wal(|broker_endpoints| { - Box::pin(async { - let topic = format!("greptimedb_wal_topic_{}", Uuid::new_v4()); - let ns = NamespaceImpl { - region_id: 1, - topic, - }; - let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); - let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); - let config = DatanodeKafkaConfig { - broker_endpoints, - max_batch_size: ReadableSize::mb(1), - ..Default::default() - }; - let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); - producer.produce(&manager).await.unwrap(); - }) - }) - .await + #[test] + fn test_maybe_emit_entry_illegal_seq() { + let provider = Arc::new(KafkaProvider::new("my_topic".to_string())); + let region_id = RegionId::new(1, 1); + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]); + assert!(maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .is_none()); + let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]); + let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err(); + assert_matches!(err, error::Error::IllegalSequence { .. }); + + let mut buffer = HashMap::new(); + let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]); + assert!(maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .is_none()); + let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]); + assert!(maybe_emit_entry(&provider, record, &mut buffer) + .unwrap() + .is_none()); + let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]); + let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err(); + assert_matches!(err, error::Error::IllegalSequence { .. }); } } diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index c035e5fcff80..a119aac390c2 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -14,12 +14,10 @@ #![feature(let_chains)] #![feature(io_error_more)] +#![feature(assert_matches)] pub mod error; pub mod kafka; pub mod metrics; -mod noop; pub mod raft_engine; pub mod test_util; - -pub use noop::NoopLogStore; diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs deleted file mode 100644 index e5ed7fd66bd2..000000000000 --- a/src/log-store/src/noop.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_wal::options::WalOptions; -use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; -use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; -use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use store_api::storage::RegionId; - -use crate::error::{Error, Result}; - -/// A noop log store which only for test -#[derive(Debug, Default)] -pub struct NoopLogStore; - -#[derive(Debug, Default, Clone, PartialEq)] -pub struct EntryImpl; - -#[derive(Debug, Clone, Default, Eq, PartialEq, Hash)] -pub struct NamespaceImpl; - -impl Namespace for NamespaceImpl { - fn id(&self) -> NamespaceId { - 0 - } -} - -impl Entry for EntryImpl { - fn into_raw_entry(self) -> RawEntry { - RawEntry { - region_id: self.region_id(), - entry_id: self.id(), - data: vec![], - } - } - - fn data(&self) -> &[u8] { - &[] - } - - fn id(&self) -> EntryId { - 0 - } - - fn region_id(&self) -> RegionId { - RegionId::from_u64(0) - } - - fn estimated_size(&self) -> usize { - 0 - } -} - -#[async_trait::async_trait] -impl LogStore for NoopLogStore { - type Error = Error; - type Namespace = NamespaceImpl; - type Entry = EntryImpl; - - async fn stop(&self) -> Result<()> { - Ok(()) - } - - async fn append(&self, mut _e: Self::Entry) -> Result { - Ok(AppendResponse::default()) - } - - async fn append_batch(&self, _e: Vec) -> Result { - Ok(AppendBatchResponse::default()) - } - - async fn read( - &self, - _ns: &Self::Namespace, - _entry_id: EntryId, - ) -> Result> - { - Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( - vec![], - ))))) - } - - async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { - Ok(()) - } - - async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { - Ok(()) - } - - async fn list_namespaces(&self) -> Result> { - Ok(vec![]) - } - - fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry { - let _ = data; - let _ = entry_id; - let _ = ns; - EntryImpl - } - - fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - let _ = ns_id; - let _ = wal_options; - NamespaceImpl - } - - async fn obsolete( - &self, - ns: Self::Namespace, - entry_id: EntryId, - ) -> std::result::Result<(), Self::Error> { - let _ = ns; - let _ = entry_id; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_mock_entry() { - let e = EntryImpl; - assert_eq!(0, e.data().len()); - assert_eq!(0, e.id()); - } - - #[tokio::test] - async fn test_noop_logstore() { - let store = NoopLogStore; - let e = store.entry(&mut vec![], 1, NamespaceImpl); - let _ = store.append(e.clone()).await.unwrap(); - assert!(store.append_batch(vec![e]).await.is_ok()); - store.create_namespace(&NamespaceImpl).await.unwrap(); - assert_eq!(0, store.list_namespaces().await.unwrap().len()); - store.delete_namespace(&NamespaceImpl).await.unwrap(); - assert_eq!(NamespaceImpl, store.namespace(0, &WalOptions::default())); - store.obsolete(NamespaceImpl, 1).await.unwrap(); - } -} diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index cdb600249caa..86a46bb1a02f 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::hash::{Hash, Hasher}; -use std::mem::size_of; - -use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry}; -use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; -use store_api::storage::RegionId; - -use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; +use crate::raft_engine::protos::logstore::EntryImpl; mod backend; pub mod log_store; pub use backend::RaftEngineBackend; pub use raft_engine::Config; +use store_api::logstore::entry::{Entry, NaiveEntry}; +use store_api::logstore::provider::Provider; +use store_api::storage::RegionId; pub mod protos { include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs"))); @@ -42,65 +38,20 @@ impl EntryImpl { } } -impl NamespaceImpl { - pub fn with_id(id: NamespaceId) -> Self { - Self { +impl From for Entry { + fn from( + EntryImpl { id, - ..Default::default() - } - } -} - -#[allow(clippy::derived_hash_with_manual_eq)] -impl Hash for NamespaceImpl { - fn hash(&self, state: &mut H) { - self.id.hash(state); - } -} - -impl Eq for NamespaceImpl {} - -impl Namespace for NamespaceImpl { - fn id(&self) -> NamespaceId { - self.id - } -} - -impl Entry for EntryImpl { - fn into_raw_entry(self) -> RawEntry { - RawEntry { - region_id: self.region_id(), - entry_id: self.id(), - data: self.data, - } - } - - fn data(&self) -> &[u8] { - self.data.as_slice() - } - - fn id(&self) -> EntryId { - self.id - } - - fn region_id(&self) -> RegionId { - RegionId::from_u64(self.id) - } - - fn estimated_size(&self) -> usize { - self.data.len() + size_of::() + size_of::() - } -} - -#[cfg(test)] -mod tests { - use store_api::logstore::entry::Entry; - - use crate::raft_engine::protos::logstore::EntryImpl; - - #[test] - fn test_estimated_size() { - let entry = EntryImpl::create(1, 1, b"hello, world".to_vec()); - assert_eq!(28, entry.estimated_size()); + namespace_id, + data, + .. + }: EntryImpl, + ) -> Self { + Entry::Naive(NaiveEntry { + provider: Provider::raft_engine_provider(namespace_id), + region_id: RegionId::from_u64(namespace_id), + entry_id: id, + data, + }) } } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index b2070abbf3ec..c9632e6ea341 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{hash_map, HashMap}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; @@ -22,22 +21,21 @@ use async_stream::stream; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use common_wal::config::raft_engine::RaftEngineConfig; -use common_wal::options::WalOptions; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; -use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; -use store_api::logstore::entry_stream::SendableEntryStream; -use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait}; -use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry}; +use store_api::logstore::provider::{Provider, RaftEngineProvider}; +use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream}; +use store_api::storage::RegionId; use crate::error::{ AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, - IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, - StartGcTaskSnafu, StopGcTaskSnafu, + IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu, + RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; use crate::metrics; use crate::raft_engine::backend::SYSTEM_NAMESPACE; -use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; +use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; const NAMESPACE_PREFIX: &str = "$sys/"; @@ -117,10 +115,10 @@ impl RaftEngineLogStore { .context(StartGcTaskSnafu) } - fn span(&self, namespace: &::Namespace) -> (Option, Option) { + fn span(&self, provider: &RaftEngineProvider) -> (Option, Option) { ( - self.engine.first_index(namespace.id()), - self.engine.last_index(namespace.id()), + self.engine.first_index(provider.id), + self.engine.last_index(provider.id), ) } @@ -129,56 +127,65 @@ impl RaftEngineLogStore { /// to append in each namespace(region). fn entries_to_batch( &self, - entries: Vec, - ) -> Result<(LogBatch, HashMap)> { + entries: Vec, + ) -> Result<(LogBatch, HashMap)> { // Records the last entry id for each region's entries. - let mut entry_ids: HashMap = HashMap::with_capacity(entries.len()); + let mut entry_ids: HashMap = HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { - let ns_id = e.namespace_id; - match entry_ids.entry(ns_id) { - Entry::Occupied(mut o) => { + let region_id = e.region_id(); + let entry_id = e.entry_id(); + match entry_ids.entry(region_id) { + hash_map::Entry::Occupied(mut o) => { let prev = *o.get(); ensure!( - e.id == prev + 1, + entry_id == prev + 1, DiscontinuousLogIndexSnafu { - region_id: ns_id, + region_id, last_index: prev, - attempt_index: e.id + attempt_index: entry_id } ); - o.insert(e.id); + o.insert(entry_id); } - Entry::Vacant(v) => { + hash_map::Entry::Vacant(v) => { // this entry is the first in batch of given region. - if let Some(first_index) = self.engine.first_index(ns_id) { + if let Some(first_index) = self.engine.first_index(region_id.as_u64()) { // ensure the first in batch does not override compacted entry. ensure!( - e.id > first_index, + entry_id > first_index, OverrideCompactedEntrySnafu { - namespace: ns_id, + namespace: region_id, first_index, - attempt_index: e.id, + attempt_index: entry_id, } ); } // ensure the first in batch does not form a hole in raft-engine. - if let Some(last_index) = self.engine.last_index(ns_id) { + if let Some(last_index) = self.engine.last_index(region_id.as_u64()) { ensure!( - e.id == last_index + 1, + entry_id == last_index + 1, DiscontinuousLogIndexSnafu { - region_id: ns_id, + region_id, last_index, - attempt_index: e.id + attempt_index: entry_id } ); } - v.insert(e.id); + v.insert(entry_id); } } batch - .add_entries::(ns_id, &[e]) + .add_entries::( + region_id.as_u64(), + &[EntryImpl { + id: entry_id, + namespace_id: region_id.as_u64(), + data: e.into_bytes(), + ..Default::default() + }], + ) .context(AddEntryLogBatchSnafu)?; } @@ -198,62 +205,19 @@ impl Debug for RaftEngineLogStore { #[async_trait::async_trait] impl LogStore for RaftEngineLogStore { type Error = Error; - type Namespace = Namespace; - type Entry = EntryImpl; async fn stop(&self) -> Result<()> { self.gc_task.stop().await.context(StopGcTaskSnafu) } - /// Appends an entry to logstore. Currently the existence of the entry's namespace is not checked. - async fn append(&self, e: Self::Entry) -> Result { - ensure!(self.started(), IllegalStateSnafu); - let entry_id = e.id; - let namespace_id = e.namespace_id; - let mut batch = LogBatch::with_capacity(1); - batch - .add_entries::(namespace_id, &[e]) - .context(AddEntryLogBatchSnafu)?; - - if let Some(first_index) = self.engine.first_index(namespace_id) { - ensure!( - entry_id > first_index, - OverrideCompactedEntrySnafu { - namespace: namespace_id, - first_index, - attempt_index: entry_id, - } - ); - } - - if let Some(last_index) = self.engine.last_index(namespace_id) { - ensure!( - entry_id == last_index + 1, - DiscontinuousLogIndexSnafu { - region_id: namespace_id, - last_index, - attempt_index: entry_id - } - ); - } - - let _ = self - .engine - .write(&mut batch, self.config.sync_write) - .context(RaftEngineSnafu)?; - Ok(AppendResponse { - last_entry_id: entry_id, - }) - } - /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of /// batch append. - async fn append_batch(&self, entries: Vec) -> Result { + async fn append_batch(&self, entries: Vec) -> Result { metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL.inc(); metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by( entries .iter() - .map(EntryTrait::estimated_size) + .map(|entry| entry.estimated_size()) .sum::() as u64, ); let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer(); @@ -287,40 +251,47 @@ impl LogStore for RaftEngineLogStore { /// determined by the current "last index" of the namespace. async fn read( &self, - ns: &Self::Namespace, + provider: &Provider, entry_id: EntryId, - ) -> Result> { + ) -> Result> { + let ns = provider + .as_raft_engine_provider() + .with_context(|| InvalidProviderSnafu { + expected: RaftEngineProvider::type_name(), + actual: provider.type_name(), + })?; + let namespace_id = ns.id; metrics::METRIC_RAFT_ENGINE_READ_CALLS_TOTAL.inc(); let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer(); ensure!(self.started(), IllegalStateSnafu); let engine = self.engine.clone(); - let last_index = engine.last_index(ns.id()).unwrap_or(0); - let mut start_index = entry_id.max(engine.first_index(ns.id()).unwrap_or(last_index + 1)); + let last_index = engine.last_index(namespace_id).unwrap_or(0); + let mut start_index = + entry_id.max(engine.first_index(namespace_id).unwrap_or(last_index + 1)); info!( "Read logstore, namespace: {}, start: {}, span: {:?}", - ns.id(), + namespace_id, entry_id, self.span(ns) ); let max_batch_size = self.config.read_batch_size; let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size); - let ns = ns.clone(); let _handle = common_runtime::spawn_read(async move { while start_index <= last_index { let mut vec = Vec::with_capacity(max_batch_size); match engine .fetch_entries_to::( - ns.id, + namespace_id, start_index, last_index + 1, Some(max_batch_size), &mut vec, ) .context(FetchEntrySnafu { - ns: ns.id, + ns: namespace_id, start: start_index, end: last_index, max_size: max_batch_size, @@ -344,22 +315,40 @@ impl LogStore for RaftEngineLogStore { let s = stream!({ while let Some(res) = rx.recv().await { - yield res; + let res = res?; + + yield Ok(res.into_iter().map(Entry::from).collect::>()); } }); Ok(Box::pin(s)) } - async fn create_namespace(&self, ns: &Self::Namespace) -> Result<()> { + async fn create_namespace(&self, ns: &Provider) -> Result<()> { + let ns = ns + .as_raft_engine_provider() + .with_context(|| InvalidProviderSnafu { + expected: RaftEngineProvider::type_name(), + actual: ns.type_name(), + })?; + let namespace_id = ns.id; ensure!( - ns.id != SYSTEM_NAMESPACE, - IllegalNamespaceSnafu { ns: ns.id } + namespace_id != SYSTEM_NAMESPACE, + IllegalNamespaceSnafu { ns: namespace_id } ); ensure!(self.started(), IllegalStateSnafu); - let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec(); + let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id) + .as_bytes() + .to_vec(); let mut batch = LogBatch::with_capacity(1); batch - .put_message::(SYSTEM_NAMESPACE, key, ns) + .put_message::( + SYSTEM_NAMESPACE, + key, + &NamespaceImpl { + id: namespace_id, + ..Default::default() + }, + ) .context(RaftEngineSnafu)?; let _ = self .engine @@ -368,13 +357,22 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<()> { + async fn delete_namespace(&self, ns: &Provider) -> Result<()> { + let ns = ns + .as_raft_engine_provider() + .with_context(|| InvalidProviderSnafu { + expected: RaftEngineProvider::type_name(), + actual: ns.type_name(), + })?; + let namespace_id = ns.id; ensure!( - ns.id != SYSTEM_NAMESPACE, - IllegalNamespaceSnafu { ns: ns.id } + namespace_id != SYSTEM_NAMESPACE, + IllegalNamespaceSnafu { ns: namespace_id } ); ensure!(self.started(), IllegalStateSnafu); - let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec(); + let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id) + .as_bytes() + .to_vec(); let mut batch = LogBatch::with_capacity(1); batch.delete(SYSTEM_NAMESPACE, key); let _ = self @@ -384,17 +382,17 @@ impl LogStore for RaftEngineLogStore { Ok(()) } - async fn list_namespaces(&self) -> Result> { + async fn list_namespaces(&self) -> Result> { ensure!(self.started(), IllegalStateSnafu); - let mut namespaces: Vec = vec![]; + let mut namespaces: Vec = vec![]; self.engine - .scan_messages::( + .scan_messages::( SYSTEM_NAMESPACE, Some(NAMESPACE_PREFIX.as_bytes()), None, false, |_, v| { - namespaces.push(v); + namespaces.push(Provider::RaftEngine(RaftEngineProvider { id: v.id })); true }, ) @@ -402,32 +400,41 @@ impl LogStore for RaftEngineLogStore { Ok(namespaces) } - fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry { - EntryImpl { - id: entry_id, + fn entry( + &self, + data: &mut Vec, + entry_id: EntryId, + region_id: RegionId, + provider: &Provider, + ) -> Result { + debug_assert_eq!( + provider.as_raft_engine_provider().unwrap().id, + region_id.as_u64() + ); + Ok(Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id, + entry_id, data: std::mem::take(data), - namespace_id: ns.id(), - ..Default::default() - } + })) } - fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - let _ = wal_options; - Namespace { - id: ns_id, - ..Default::default() - } - } - - async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<()> { + async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> { + let ns = provider + .as_raft_engine_provider() + .with_context(|| InvalidProviderSnafu { + expected: RaftEngineProvider::type_name(), + actual: provider.type_name(), + })?; + let namespace_id = ns.id; ensure!(self.started(), IllegalStateSnafu); - let obsoleted = self.engine.compact_to(ns.id(), entry_id + 1); + let obsoleted = self.engine.compact_to(namespace_id, entry_id + 1); info!( "Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}", - ns.id(), + namespace_id, obsoleted, entry_id, - self.span(&ns) + self.span(ns) ); Ok(()) } @@ -444,6 +451,19 @@ impl MessageExt for MessageType { } } +#[cfg(test)] +impl RaftEngineLogStore { + /// Appends a batch of entries and returns a response containing a map where the key is a region id + /// while the value is the id of the last successfully written entry of the region. + async fn append(&self, entry: Entry) -> Result { + let response = self.append_batch(vec![entry]).await?; + if let Some((_, last_entry_id)) = response.last_entry_ids.into_iter().next() { + return Ok(store_api::logstore::AppendResponse { last_entry_id }); + } + unreachable!() + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; @@ -453,14 +473,12 @@ mod tests { use common_telemetry::debug; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use futures_util::StreamExt; - use store_api::logstore::entry_stream::SendableEntryStream; - use store_api::logstore::namespace::Namespace as NamespaceTrait; - use store_api::logstore::LogStore; + use store_api::logstore::{LogStore, SendableEntryStream}; use super::*; use crate::error::Error; use crate::raft_engine::log_store::RaftEngineLogStore; - use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; + use crate::raft_engine::protos::logstore::EntryImpl; #[tokio::test] async fn test_open_logstore() { @@ -487,15 +505,15 @@ mod tests { assert!(logstore.list_namespaces().await.unwrap().is_empty()); logstore - .create_namespace(&Namespace::with_id(42)) + .create_namespace(&Provider::raft_engine_provider(42)) .await .unwrap(); let namespaces = logstore.list_namespaces().await.unwrap(); assert_eq!(1, namespaces.len()); - assert_eq!(Namespace::with_id(42), namespaces[0]); + assert_eq!(Provider::raft_engine_provider(42), namespaces[0]); logstore - .delete_namespace(&Namespace::with_id(42)) + .delete_namespace(&Provider::raft_engine_provider(42)) .await .unwrap(); assert!(logstore.list_namespaces().await.unwrap().is_empty()); @@ -511,24 +529,25 @@ mod tests { .await .unwrap(); - let namespace = Namespace::with_id(1); + let namespace_id = 1; let cnt = 1024; for i in 0..cnt { let response = logstore - .append(Entry::create( - i, - namespace.id, - i.to_string().as_bytes().to_vec(), - )) + .append( + EntryImpl::create(i, namespace_id, i.to_string().as_bytes().to_vec()).into(), + ) .await .unwrap(); assert_eq!(i, response.last_entry_id); } let mut entries = HashSet::with_capacity(1024); - let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap(); + let mut s = logstore + .read(&Provider::raft_engine_provider(1), 0) + .await + .unwrap(); while let Some(r) = s.next().await { let vec = r.unwrap(); - entries.extend(vec.into_iter().map(|e| e.id)); + entries.extend(vec.into_iter().map(|e| e.entry_id())); } assert_eq!((0..cnt).collect::>(), entries); } @@ -552,11 +571,11 @@ mod tests { .await .unwrap(); assert!(logstore - .append(Entry::create(1, 1, "1".as_bytes().to_vec())) + .append(EntryImpl::create(1, 1, "1".as_bytes().to_vec()).into()) .await .is_ok()); let entries = logstore - .read(&Namespace::with_id(1), 1) + .read(&Provider::raft_engine_provider(1), 1) .await .unwrap() .collect::>() @@ -572,11 +591,16 @@ mod tests { .await .unwrap(); - let entries = - collect_entries(logstore.read(&Namespace::with_id(1), 1).await.unwrap()).await; + let entries = collect_entries( + logstore + .read(&Provider::raft_engine_provider(1), 1) + .await + .unwrap(), + ) + .await; assert_eq!(1, entries.len()); - assert_eq!(1, entries[0].id); - assert_eq!(1, entries[0].namespace_id); + assert_eq!(1, entries[0].entry_id()); + assert_eq!(1, entries[0].region_id().as_u64()); } async fn wal_dir_usage(path: impl AsRef) -> usize { @@ -615,14 +639,15 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace = Namespace::with_id(42); + let namespace_id = 42; + let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..4096 { - let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); + let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); let _ = logstore.append(entry).await.unwrap(); } let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; - logstore.obsolete(namespace, 4000).await.unwrap(); + logstore.obsolete(&namespace, 4000).await.unwrap(); tokio::time::sleep(Duration::from_secs(6)).await; let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; @@ -639,19 +664,20 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace = Namespace::with_id(42); + let namespace_id = 42; + let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..1024 { - let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); + let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); let _ = logstore.append(entry).await.unwrap(); } - logstore.obsolete(namespace.clone(), 100).await.unwrap(); - assert_eq!(101, logstore.engine.first_index(namespace.id).unwrap()); + logstore.obsolete(&namespace, 100).await.unwrap(); + assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap()); let res = logstore.read(&namespace, 100).await.unwrap(); let mut vec = collect_entries(res).await; - vec.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); - assert_eq!(101, vec.first().unwrap().id); + vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap()); + assert_eq!(101, vec.first().unwrap().entry_id()); } #[tokio::test] @@ -663,14 +689,14 @@ mod tests { let entries = (0..8) .flat_map(|ns_id| { let data = [ns_id as u8].repeat(4096); - (0..16).map(move |idx| Entry::create(idx, ns_id, data.clone())) + (0..16).map(move |idx| EntryImpl::create(idx, ns_id, data.clone()).into()) }) .collect(); logstore.append_batch(entries).await.unwrap(); for ns_id in 0..8 { - let namespace = Namespace::with_id(ns_id); - let (first, last) = logstore.span(&namespace); + let namespace = &RaftEngineProvider::new(ns_id); + let (first, last) = logstore.span(namespace); assert_eq!(0, first.unwrap()); assert_eq!(15, last.unwrap()); } @@ -681,19 +707,24 @@ mod tests { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); let logstore = new_test_log_store(&dir).await; - let entries = vec![ - Entry::create(0, 0, [b'0'; 4096].to_vec()), - Entry::create(1, 0, [b'0'; 4096].to_vec()), - Entry::create(0, 1, [b'1'; 4096].to_vec()), - Entry::create(2, 0, [b'0'; 4096].to_vec()), - Entry::create(1, 1, [b'1'; 4096].to_vec()), + EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(), + EntryImpl::create(1, 0, [b'0'; 4096].to_vec()).into(), + EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(), + EntryImpl::create(2, 0, [b'0'; 4096].to_vec()).into(), + EntryImpl::create(1, 1, [b'1'; 4096].to_vec()).into(), ]; logstore.append_batch(entries).await.unwrap(); - assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0))); - assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1))); + assert_eq!( + (Some(0), Some(2)), + logstore.span(&RaftEngineProvider::new(0)) + ); + assert_eq!( + (Some(0), Some(1)), + logstore.span(&RaftEngineProvider::new(1)) + ); } #[tokio::test] @@ -704,21 +735,21 @@ mod tests { let entries = vec![ // Entry[0] from region 0. - Entry::create(0, 0, [b'0'; 4096].to_vec()), + EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(), // Entry[0] from region 1. - Entry::create(0, 1, [b'1'; 4096].to_vec()), + EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(), // Entry[1] from region 1. - Entry::create(1, 0, [b'1'; 4096].to_vec()), + EntryImpl::create(1, 0, [b'1'; 4096].to_vec()).into(), // Entry[1] from region 0. - Entry::create(1, 1, [b'0'; 4096].to_vec()), + EntryImpl::create(1, 1, [b'0'; 4096].to_vec()).into(), // Entry[2] from region 2. - Entry::create(2, 2, [b'2'; 4096].to_vec()), + EntryImpl::create(2, 2, [b'2'; 4096].to_vec()).into(), ]; // Ensure the last entry id returned for each region is the expected one. let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; - assert_eq!(last_entry_ids[&0], 1); - assert_eq!(last_entry_ids[&1], 1); - assert_eq!(last_entry_ids[&2], 2); + assert_eq!(last_entry_ids[&(0.into())], 1); + assert_eq!(last_entry_ids[&(1.into())], 1); + assert_eq!(last_entry_ids[&(2.into())], 2); } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 400284fdf124..7d523c4168fe 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -27,6 +27,7 @@ use datatypes::prelude::ConcreteDataType; use object_store::ErrorKind; use prost::{DecodeError, EncodeError}; use snafu::{Location, Snafu}; +use store_api::logstore::provider::Provider; use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; @@ -226,6 +227,14 @@ pub enum Error { source: datatypes::Error, }, + #[snafu(display("Failed to build entry, region_id: {}", region_id))] + BuildEntry { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to encode WAL entry, region_id: {}", region_id))] EncodeWal { region_id: RegionId, @@ -242,17 +251,9 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to read WAL, region_id: {}", region_id))] + #[snafu(display("Failed to read WAL, provider: {}", provider))] ReadWal { - region_id: RegionId, - #[snafu(implicit)] - location: Location, - source: BoxedError, - }, - - #[snafu(display("Failed to read WAL, topic: {}", topic))] - ReadKafkaWal { - topic: String, + provider: Provider, #[snafu(implicit)] location: Location, source: BoxedError, @@ -636,6 +637,13 @@ pub enum Error { unexpected_entry_id: u64, }, + #[snafu(display("Read the corrupted log entry, region_id: {}", region_id))] + CorruptedEntry { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}", region_id, @@ -757,7 +765,6 @@ impl ErrorExt for Error { | ReadParquet { .. } | WriteWal { .. } | ReadWal { .. } - | ReadKafkaWal { .. } | DeleteWal { .. } => StatusCode::StorageUnavailable, CompressObject { .. } | DecompressObject { .. } @@ -789,8 +796,10 @@ impl ErrorExt for Error { | WorkerStopped { .. } | Recv { .. } | EncodeWal { .. } - | DecodeWal { .. } => StatusCode::Internal, + | DecodeWal { .. } + | BuildEntry { .. } => StatusCode::Internal, OpenRegion { source, .. } => source.status_code(), + WriteBuffer { source, .. } => source.status_code(), WriteGroup { source, .. } => source.status_code(), FieldTypeMismatch { source, .. } => source.status_code(), @@ -837,7 +846,9 @@ impl ErrorExt for Error { Upload { .. } => StatusCode::StorageUnavailable, BiError { .. } => StatusCode::Internal, - EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, + EncodeMemtable { .. } | ReadDataPart { .. } | CorruptedEntry { .. } => { + StatusCode::Internal + } ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c9930d2d04a7..8d776bd36b9f 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -24,9 +24,9 @@ use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use common_telemetry::{error, info, warn}; -use common_wal::options::WalOptions; use crossbeam_utils::atomic::AtomicCell; use snafu::{ensure, OptionExt}; +use store_api::logstore::provider::Provider; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -98,8 +98,8 @@ pub(crate) struct MitoRegion { pub(crate) manifest_ctx: ManifestContextRef, /// SST file purger. pub(crate) file_purger: FilePurgerRef, - /// Wal options of this region. - pub(crate) wal_options: WalOptions, + /// The provider of log store. + pub(crate) provider: Provider, /// Last flush time in millis. last_flush_millis: AtomicI64, /// Provider to get current time. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 64e333c9c476..8d05063cc3be 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -24,6 +24,7 @@ use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; use snafu::{ensure, OptionExt}; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{ColumnId, RegionId}; @@ -162,7 +163,7 @@ impl RegionOpener { } } let options = self.options.take().unwrap(); - let wal_options = options.wal_options.clone(); + let provider = self.provider(&options.wal_options); let object_store = self.object_store(&options.storage)?.clone(); // Create a manifest manager for this region and writes regions to the manifest file. @@ -212,7 +213,7 @@ impl RegionOpener { access_layer, self.cache_manager, )), - wal_options, + provider, last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, @@ -250,6 +251,13 @@ impl RegionOpener { Ok(region) } + fn provider(&self, wal_options: &WalOptions) -> Provider { + match wal_options { + WalOptions::RaftEngine => Provider::raft_engine_provider(self.region_id.as_u64()), + WalOptions::Kafka(options) => Provider::kafka_provider(options.topic.to_string()), + } + } + /// Tries to open the region and returns `None` if the region directory is empty. async fn maybe_open( &self, @@ -257,7 +265,6 @@ impl RegionOpener { wal: &Wal, ) -> Result> { let region_options = self.options.as_ref().unwrap().clone(); - let wal_options = region_options.wal_options.clone(); let region_manifest_options = self.manifest_options(config, ®ion_options)?; let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? @@ -269,6 +276,7 @@ impl RegionOpener { let metadata = manifest.metadata.clone(); let region_id = self.region_id; + let provider = self.provider(®ion_options.wal_options); let object_store = self.object_store(®ion_options.storage)?.clone(); debug!("Open region {} with options: {:?}", region_id, self.options); @@ -313,7 +321,7 @@ impl RegionOpener { ); replay_memtable( wal, - &wal_options, + &provider, region_id, flushed_entry_id, &version_control, @@ -338,7 +346,7 @@ impl RegionOpener { RegionState::ReadOnly, )), file_purger, - wal_options, + provider, last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, @@ -430,7 +438,7 @@ pub(crate) fn check_recovered_region( /// Replays the mutations from WAL and inserts mutations to memtable of given region. pub(crate) async fn replay_memtable( wal: &Wal, - wal_options: &WalOptions, + provider: &Provider, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, @@ -442,7 +450,7 @@ pub(crate) async fn replay_memtable( let mut last_entry_id = flushed_entry_id; let replay_from_entry_id = flushed_entry_id + 1; - let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; + let mut wal_stream = wal.scan(region_id, replay_from_entry_id, provider)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; if entry_id <= flushed_entry_id { @@ -459,7 +467,7 @@ pub(crate) async fn replay_memtable( last_entry_id = last_entry_id.max(entry_id); let mut region_write_ctx = - RegionWriteCtx::new(region_id, version_control, wal_options.clone()); + RegionWriteCtx::new(region_id, version_control, provider.clone()); for mutation in entry.mutations { rows_replayed += mutation .rows @@ -474,8 +482,9 @@ pub(crate) async fn replay_memtable( region_write_ctx.write_memtable(); } - wal.obsolete(region_id, flushed_entry_id, wal_options) - .await?; + // TODO(weny): We need to update `flushed_entry_id` in the region manifest + // to avoid reading potentially incomplete entries in the future. + wal.obsolete(region_id, flushed_entry_id, provider).await?; info!( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 36b1a0fac67b..e86ff77ca2f1 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,8 +16,8 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry}; -use common_wal::options::WalOptions; use snafu::ResultExt; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; @@ -86,7 +86,7 @@ pub(crate) struct RegionWriteCtx { /// out of the context to construct the wal entry when we write to the wal. wal_entry: WalEntry, /// Wal options of the region being written to. - wal_options: WalOptions, + provider: Provider, /// Notifiers to send write results to waiters. /// /// The i-th notify is for i-th mutation. @@ -106,7 +106,7 @@ impl RegionWriteCtx { pub(crate) fn new( region_id: RegionId, version_control: &VersionControlRef, - wal_options: WalOptions, + provider: Provider, ) -> RegionWriteCtx { let VersionControlData { version, @@ -122,7 +122,7 @@ impl RegionWriteCtx { next_sequence: committed_sequence + 1, next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), - wal_options, + provider, notifiers: Vec::new(), failed: false, put_num: 0, @@ -163,7 +163,7 @@ impl RegionWriteCtx { self.region_id, self.next_entry_id, &self.wal_entry, - &self.wal_options, + &self.provider, )?; self.next_entry_id += 1; Ok(()) diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 0b3b8282833c..18feb9620473 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -26,20 +26,18 @@ use std::mem; use std::sync::Arc; use api::v1::WalEntry; -use async_stream::try_stream; use common_error::ext::BoxedError; -use common_wal::options::WalOptions; use futures::stream::BoxStream; -use futures::StreamExt; use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; +use store_api::logstore::provider::Provider; use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::storage::RegionId; -use crate::error::{ - DecodeWalSnafu, DeleteWalSnafu, EncodeWalSnafu, ReadWalSnafu, Result, WriteWalSnafu, -}; +use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu}; +use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader}; +use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader}; /// WAL entry id. pub type EntryId = store_api::logstore::entry::Id; @@ -60,6 +58,10 @@ impl Wal { pub fn new(store: Arc) -> Self { Self { store } } + + pub fn store(&self) -> &Arc { + &self.store + } } impl Clone for Wal { @@ -77,7 +79,7 @@ impl Wal { store: self.store.clone(), entries: Vec::new(), entry_encode_buf: Vec::new(), - namespaces: HashMap::new(), + providers: HashMap::new(), } } @@ -86,29 +88,19 @@ impl Wal { &'a self, region_id: RegionId, start_id: EntryId, - wal_options: &'a WalOptions, - ) -> Result { - let stream = try_stream!({ - let namespace = self.store.namespace(region_id.into(), wal_options); - let mut stream = self - .store - .read(&namespace, start_id) - .await - .map_err(BoxedError::new) - .context(ReadWalSnafu { region_id })?; - - while let Some(entries) = stream.next().await { - let entries = entries - .map_err(BoxedError::new) - .context(ReadWalSnafu { region_id })?; - - for entry in entries { - yield decode_entry(region_id, entry)?; - } + namespace: &'a Provider, + ) -> Result> { + match namespace { + Provider::RaftEngine(_) => { + LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone())) + .read(namespace, start_id) } - }); - - Ok(Box::pin(stream)) + Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new( + LogStoreRawEntryReader::new(self.store.clone()), + region_id, + )) + .read(namespace, start_id), + } } /// Mark entries whose ids `<= last_id` as deleted. @@ -116,37 +108,26 @@ impl Wal { &self, region_id: RegionId, last_id: EntryId, - wal_options: &WalOptions, + provider: &Provider, ) -> Result<()> { - let namespace = self.store.namespace(region_id.into(), wal_options); self.store - .obsolete(namespace, last_id) + .obsolete(provider, last_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) } } -/// Decode Wal entry from log store. -fn decode_entry(region_id: RegionId, entry: E) -> Result<(EntryId, WalEntry)> { - let entry_id = entry.id(); - let data = entry.data(); - - let wal_entry = WalEntry::decode(data).context(DecodeWalSnafu { region_id })?; - - Ok((entry_id, wal_entry)) -} - /// WAL batch writer. pub struct WalWriter { /// Log store of the WAL. store: Arc, /// Entries to write. - entries: Vec, + entries: Vec, /// Buffer to encode WAL entry. entry_encode_buf: Vec, - /// Namespaces of regions being written into. - namespaces: HashMap, + /// Providers of regions being written into. + providers: HashMap, } impl WalWriter { @@ -156,14 +137,13 @@ impl WalWriter { region_id: RegionId, entry_id: EntryId, wal_entry: &WalEntry, - wal_options: &WalOptions, + provider: &Provider, ) -> Result<()> { - // Gets or inserts with a newly built namespace. - let namespace = self - .namespaces + // Gets or inserts with a newly built provider. + let provider = self + .providers .entry(region_id) - .or_insert_with(|| self.store.namespace(region_id.into(), wal_options)) - .clone(); + .or_insert_with(|| provider.clone()); // Encode wal entry to log store entry. self.entry_encode_buf.clear(); @@ -172,7 +152,9 @@ impl WalWriter { .context(EncodeWalSnafu { region_id })?; let entry = self .store - .entry(&mut self.entry_encode_buf, entry_id, namespace); + .entry(&mut self.entry_encode_buf, entry_id, region_id, provider) + .map_err(BoxedError::new) + .context(BuildEntrySnafu { region_id })?; self.entries.push(entry); @@ -272,7 +254,6 @@ mod tests { async fn test_write_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); - let wal_options = WalOptions::default(); let entry = WalEntry { mutations: vec![ @@ -282,16 +263,34 @@ mod tests { }; let mut writer = wal.writer(); // Region 1 entry 1. + let region_id = RegionId::new(1, 1); writer - .add_entry(RegionId::new(1, 1), 1, &entry, &wal_options) + .add_entry( + region_id, + 1, + &entry, + &Provider::raft_engine_provider(region_id.as_u64()), + ) .unwrap(); // Region 2 entry 1. + let region_id = RegionId::new(1, 2); writer - .add_entry(RegionId::new(1, 2), 1, &entry, &wal_options) + .add_entry( + region_id, + 1, + &entry, + &Provider::raft_engine_provider(region_id.as_u64()), + ) .unwrap(); // Region 1 entry 2. + let region_id = RegionId::new(1, 2); writer - .add_entry(RegionId::new(1, 1), 2, &entry, &wal_options) + .add_entry( + region_id, + 2, + &entry, + &Provider::raft_engine_provider(region_id.as_u64()), + ) .unwrap(); // Test writing multiple region to wal. @@ -339,32 +338,33 @@ mod tests { async fn test_scan_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); - let wal_options = WalOptions::default(); let entries = sample_entries(); let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2)); + let ns1 = Provider::raft_engine_provider(id1.as_u64()); + let ns2 = Provider::raft_engine_provider(id2.as_u64()); let mut writer = wal.writer(); - writer.add_entry(id1, 1, &entries[0], &wal_options).unwrap(); + writer.add_entry(id1, 1, &entries[0], &ns1).unwrap(); // Insert one entry into region2. Scan should not return this entry. - writer.add_entry(id2, 1, &entries[0], &wal_options).unwrap(); - writer.add_entry(id1, 2, &entries[1], &wal_options).unwrap(); - writer.add_entry(id1, 3, &entries[2], &wal_options).unwrap(); - writer.add_entry(id1, 4, &entries[3], &wal_options).unwrap(); + writer.add_entry(id2, 1, &entries[0], &ns2).unwrap(); + writer.add_entry(id1, 2, &entries[1], &ns1).unwrap(); + writer.add_entry(id1, 3, &entries[2], &ns1).unwrap(); + writer.add_entry(id1, 4, &entries[3], &ns1).unwrap(); writer.write_to_wal().await.unwrap(); // Scan all contents region1 - let stream = wal.scan(id1, 1, &wal_options).unwrap(); + let stream = wal.scan(id1, 1, &ns1).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries, 1, &actual); // Scan parts of contents - let stream = wal.scan(id1, 2, &wal_options).unwrap(); + let stream = wal.scan(id1, 2, &ns1).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries[1..], 2, &actual); // Scan out of range - let stream = wal.scan(id1, 5, &wal_options).unwrap(); + let stream = wal.scan(id1, 5, &ns1).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); assert!(actual.is_empty()); } @@ -373,35 +373,27 @@ mod tests { async fn test_obsolete_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); - let wal_options = WalOptions::default(); let entries = sample_entries(); let mut writer = wal.writer(); let region_id = RegionId::new(1, 1); - writer - .add_entry(region_id, 1, &entries[0], &wal_options) - .unwrap(); - writer - .add_entry(region_id, 2, &entries[1], &wal_options) - .unwrap(); - writer - .add_entry(region_id, 3, &entries[2], &wal_options) - .unwrap(); + let ns = Provider::raft_engine_provider(region_id.as_u64()); + writer.add_entry(region_id, 1, &entries[0], &ns).unwrap(); + writer.add_entry(region_id, 2, &entries[1], &ns).unwrap(); + writer.add_entry(region_id, 3, &entries[2], &ns).unwrap(); writer.write_to_wal().await.unwrap(); // Delete 1, 2. - wal.obsolete(region_id, 2, &wal_options).await.unwrap(); + wal.obsolete(region_id, 2, &ns).await.unwrap(); // Put 4. let mut writer = wal.writer(); - writer - .add_entry(region_id, 4, &entries[3], &wal_options) - .unwrap(); + writer.add_entry(region_id, 4, &entries[3], &ns).unwrap(); writer.write_to_wal().await.unwrap(); // Scan all - let stream = wal.scan(region_id, 1, &wal_options).unwrap(); + let stream = wal.scan(region_id, 1, &ns).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries[2..], 3, &actual); } diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 57cee5845e50..d8afc7915119 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -20,7 +20,8 @@ use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::stream::BoxStream; use futures::TryStreamExt; use snafu::ResultExt; -use store_api::logstore::entry::{Entry, RawEntry}; +use store_api::logstore::entry::Entry; +use store_api::logstore::provider::{KafkaProvider, Provider, RaftEngineProvider}; use store_api::logstore::LogStore; use store_api::storage::RegionId; use tokio_stream::StreamExt; @@ -28,38 +29,12 @@ use tokio_stream::StreamExt; use crate::error::{self, Result}; use crate::wal::EntryId; -/// A stream that yields [RawEntry]. -pub type RawEntryStream<'a> = BoxStream<'a, Result>; +/// A stream that yields [Entry]. +pub type EntryStream<'a> = BoxStream<'a, Result>; -// The namespace of kafka log store -pub struct KafkaNamespace<'a> { - topic: &'a str, -} - -// The namespace of raft engine log store -pub struct RaftEngineNamespace { - region_id: RegionId, -} - -impl RaftEngineNamespace { - pub fn new(region_id: RegionId) -> Self { - Self { region_id } - } -} - -/// The namespace of [RawEntryReader]. -pub(crate) enum LogStoreNamespace<'a> { - RaftEngine(RaftEngineNamespace), - Kafka(KafkaNamespace<'a>), -} - -/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore]. +/// [RawEntryReader] provides the ability to read [Entry] from the underlying [LogStore]. pub(crate) trait RawEntryReader: Send + Sync { - fn read<'a>( - &'a self, - ctx: LogStoreNamespace<'a>, - start_id: EntryId, - ) -> Result>; + fn read(&self, provider: &Provider, start_id: EntryId) -> Result>; } /// Implement the [RawEntryReader] for the [LogStore]. @@ -67,66 +42,35 @@ pub struct LogStoreRawEntryReader { store: Arc, } -impl LogStoreRawEntryReader { +impl LogStoreRawEntryReader { pub fn new(store: Arc) -> Self { Self { store } } +} - fn read_region(&self, ns: RaftEngineNamespace, start_id: EntryId) -> Result { - let region_id = ns.region_id; - let stream = try_stream!({ - // TODO(weny): refactor the `namespace` method. - let namespace = self.store.namespace(region_id.into(), &Default::default()); - let mut stream = self - .store - .read(&namespace, start_id) - .await - .map_err(BoxedError::new) - .context(error::ReadWalSnafu { region_id })?; - - while let Some(entries) = stream.next().await { - let entries = entries - .map_err(BoxedError::new) - .context(error::ReadWalSnafu { region_id })?; - - for entry in entries { - yield entry.into_raw_entry() - } - } - }); - - Ok(Box::pin(stream)) - } - - fn read_topic<'a>( - &'a self, - ns: KafkaNamespace<'a>, - start_id: EntryId, - ) -> Result { - let topic = ns.topic; +impl RawEntryReader for LogStoreRawEntryReader { + fn read(&self, provider: &Provider, start_id: EntryId) -> Result> { + let store = self.store.clone(); + let provider = provider.clone(); let stream = try_stream!({ - // TODO(weny): refactor the `namespace` method. - let namespace = self.store.namespace( - RegionId::from_u64(0).into(), - &WalOptions::Kafka(KafkaWalOptions { - topic: topic.to_string(), - }), - ); - - let mut stream = self - .store - .read(&namespace, start_id) + let mut stream = store + .read(&provider, start_id) .await .map_err(BoxedError::new) - .context(error::ReadKafkaWalSnafu { topic })?; + .with_context(|_| error::ReadWalSnafu { + provider: provider.clone(), + })?; while let Some(entries) = stream.next().await { - let entries = entries - .map_err(BoxedError::new) - .context(error::ReadKafkaWalSnafu { topic })?; + let entries = + entries + .map_err(BoxedError::new) + .with_context(|_| error::ReadWalSnafu { + provider: provider.clone(), + })?; for entry in entries { - yield entry.into_raw_entry() + yield entry } } }); @@ -135,53 +79,33 @@ impl LogStoreRawEntryReader { } } -impl RawEntryReader for LogStoreRawEntryReader { - fn read<'a>( - &'a self, - ctx: LogStoreNamespace<'a>, - start_id: EntryId, - ) -> Result> { - let stream = match ctx { - LogStoreNamespace::RaftEngine(ns) => self.read_region(ns, start_id)?, - LogStoreNamespace::Kafka(ns) => self.read_topic(ns, start_id)?, - }; - - Ok(Box::pin(stream)) - } -} - -/// A filter implement the [RawEntryReader] -pub struct RawEntryReaderFilter { +/// A [RawEntryReader] reads [RawEntry] belongs to a specific region. +pub struct RegionRawEntryReader { reader: R, - filter: F, + region_id: RegionId, } -impl RawEntryReaderFilter +impl RegionRawEntryReader where R: RawEntryReader, - F: Fn(&RawEntry) -> bool + Sync + Send, { - pub fn new(reader: R, filter: F) -> Self { - Self { reader, filter } + pub fn new(reader: R, region_id: RegionId) -> Self { + Self { reader, region_id } } } -impl RawEntryReader for RawEntryReaderFilter +impl RawEntryReader for RegionRawEntryReader where R: RawEntryReader, - F: Fn(&RawEntry) -> bool + Sync + Send, { - fn read<'a>( - &'a self, - ctx: LogStoreNamespace<'a>, - start_id: EntryId, - ) -> Result> { + fn read(&self, ctx: &Provider, start_id: EntryId) -> Result> { let mut stream = self.reader.read(ctx, start_id)?; - let filter = &(self.filter); + let region_id = self.region_id; + let stream = try_stream!({ while let Some(entry) = stream.next().await { let entry = entry?; - if filter(&entry) { + if entry.region_id() == region_id { yield entry } } @@ -197,11 +121,9 @@ mod tests { use common_wal::options::WalOptions; use futures::stream; - use store_api::logstore::entry::{Entry, RawEntry}; - use store_api::logstore::entry_stream::SendableEntryStream; - use store_api::logstore::namespace::Namespace; + use store_api::logstore::entry::{Entry, NaiveEntry}; use store_api::logstore::{ - AppendBatchResponse, AppendResponse, EntryId, LogStore, NamespaceId, + AppendBatchResponse, AppendResponse, EntryId, LogStore, SendableEntryStream, }; use store_api::storage::RegionId; @@ -210,93 +132,79 @@ mod tests { #[derive(Debug)] struct MockLogStore { - entries: Vec, - } - - #[derive(Debug, Eq, PartialEq, Clone, Copy, Default, Hash)] - struct MockNamespace; - - impl Namespace for MockNamespace { - fn id(&self) -> NamespaceId { - 0 - } + entries: Vec, } #[async_trait::async_trait] impl LogStore for MockLogStore { - type Entry = RawEntry; type Error = error::Error; - type Namespace = MockNamespace; async fn stop(&self) -> Result<(), Self::Error> { unreachable!() } - async fn append(&self, entry: Self::Entry) -> Result { - unreachable!() - } - async fn append_batch( &self, - entries: Vec, + entries: Vec, ) -> Result { unreachable!() } async fn read( &self, - ns: &Self::Namespace, + provider: &Provider, id: EntryId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())]))) } - async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn create_namespace(&self, ns: &Provider) -> Result<(), Self::Error> { unreachable!() } - async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + async fn delete_namespace(&self, ns: &Provider) -> Result<(), Self::Error> { unreachable!() } - async fn list_namespaces(&self) -> Result, Self::Error> { + async fn list_namespaces(&self) -> Result, Self::Error> { unreachable!() } async fn obsolete( &self, - ns: Self::Namespace, + provider: &Provider, entry_id: EntryId, ) -> Result<(), Self::Error> { unreachable!() } - fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry { + fn entry( + &self, + data: &mut Vec, + entry_id: EntryId, + region_id: RegionId, + provider: &Provider, + ) -> Result { unreachable!() } - - fn namespace(&self, _ns_id: NamespaceId, _wal_options: &WalOptions) -> Self::Namespace { - MockNamespace - } } #[tokio::test] async fn test_raw_entry_reader() { - let expected_entries = vec![RawEntry { + let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64()); + let expected_entries = vec![Entry::Naive(NaiveEntry { + provider: provider.clone(), region_id: RegionId::new(1024, 1), entry_id: 1, - data: vec![], - }]; + data: vec![1], + })]; let store = MockLogStore { entries: expected_entries.clone(), }; let reader = LogStoreRawEntryReader::new(Arc::new(store)); let entries = reader - .read( - LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))), - 0, - ) + .read(&provider, 0) .unwrap() .try_collect::>() .await @@ -306,37 +214,38 @@ mod tests { #[tokio::test] async fn test_raw_entry_reader_filter() { + let provider = Provider::raft_engine_provider(RegionId::new(1024, 1).as_u64()); let all_entries = vec![ - RawEntry { + Entry::Naive(NaiveEntry { + provider: provider.clone(), region_id: RegionId::new(1024, 1), entry_id: 1, data: vec![1], - }, - RawEntry { + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), region_id: RegionId::new(1024, 2), entry_id: 2, data: vec![2], - }, - RawEntry { + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), region_id: RegionId::new(1024, 3), entry_id: 3, data: vec![3], - }, + }), ]; let store = MockLogStore { entries: all_entries.clone(), }; let expected_region_id = RegionId::new(1024, 3); - let reader = - RawEntryReaderFilter::new(LogStoreRawEntryReader::new(Arc::new(store)), |entry| { - entry.region_id == expected_region_id - }); + let reader = RegionRawEntryReader::new( + LogStoreRawEntryReader::new(Arc::new(store)), + expected_region_id, + ); let entries = reader - .read( - LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))), - 0, - ) + .read(&provider, 0) .unwrap() .try_collect::>() .await @@ -344,7 +253,7 @@ mod tests { assert_eq!( all_entries .into_iter() - .filter(|entry| entry.region_id == expected_region_id) + .filter(|entry| entry.region_id() == expected_region_id) .collect::>(), entries ); diff --git a/src/mito2/src/wal/wal_entry_reader.rs b/src/mito2/src/wal/wal_entry_reader.rs index 8c3e16122254..82db59540059 100644 --- a/src/mito2/src/wal/wal_entry_reader.rs +++ b/src/mito2/src/wal/wal_entry_reader.rs @@ -12,13 +12,183 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::WalEntry; +use common_telemetry::info; +use futures::StreamExt; +use prost::Message; +use snafu::{ensure, ResultExt}; +use store_api::logstore::entry::Entry; +use store_api::logstore::provider::Provider; use store_api::storage::RegionId; -use crate::error::Result; -use crate::wal::raw_entry_reader::LogStoreNamespace; +use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result}; +use crate::wal::raw_entry_reader::RawEntryReader; use crate::wal::{EntryId, WalEntryStream}; -/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store. -pub(crate) trait OneshotWalEntryReader: Send + Sync { - fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result; +pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> { + let entry_id = raw_entry.entry_id(); + let region_id = raw_entry.region_id(); + ensure!(raw_entry.is_complete(), CorruptedEntrySnafu { region_id }); + // TODO(weny): implement the [Buf] for return value, avoid extra memory allocation. + let bytes = raw_entry.into_bytes(); + let wal_entry = WalEntry::decode(bytes.as_slice()).context(DecodeWalSnafu { region_id })?; + + Ok((entry_id, wal_entry)) +} + +/// [WalEntryReader] provides the ability to read and decode entries from the underlying store. +pub(crate) trait WalEntryReader: Send + Sync { + fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result>; +} + +/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry]. +pub struct LogStoreEntryReader { + reader: R, +} + +impl LogStoreEntryReader { + pub fn new(reader: R) -> Self { + Self { reader } + } +} + +impl WalEntryReader for LogStoreEntryReader { + fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result> { + let LogStoreEntryReader { reader } = self; + let mut stream = reader.read(ns, start_id)?; + + let stream = async_stream::stream! { + let mut buffered_entry = None; + while let Some(next_entry) = stream.next().await { + match buffered_entry.take() { + Some(entry) => { + yield decode_raw_entry(entry); + buffered_entry = Some(next_entry?); + }, + None => { + buffered_entry = Some(next_entry?); + } + }; + } + if let Some(entry) = buffered_entry { + // Ignores tail corrupted data. + if entry.is_complete() { + yield decode_raw_entry(entry); + } + } + }; + + Ok(Box::pin(stream)) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::{Mutation, OpType, WalEntry}; + use futures::{stream, TryStreamExt}; + use prost::Message; + use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader}; + use store_api::logstore::provider::Provider; + use store_api::storage::RegionId; + + use crate::error::{self, Result}; + use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; + use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader}; + use crate::wal::EntryId; + + struct MockRawEntryStream { + entries: Vec, + } + + impl RawEntryReader for MockRawEntryStream { + fn read(&self, ns: &Provider, start_id: EntryId) -> Result> { + let entries = self.entries.clone().into_iter().map(Ok); + + Ok(Box::pin(stream::iter(entries))) + } + } + + #[tokio::test] + async fn test_tail_corrupted_stream() { + common_telemetry::init_default_ut_logging(); + let provider = Provider::kafka_provider("my_topic".to_string()); + let wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + }; + let encoded_entry = wal_entry.encode_to_vec(); + let parts = encoded_entry + .chunks(encoded_entry.len() / 2) + .map(Into::into) + .collect::>(); + let raw_entry_stream = MockRawEntryStream { + entries: vec![ + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: RegionId::new(1, 1), + entry_id: 2, + headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last], + parts, + }), + // The tail corrupted data. + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: RegionId::new(1, 1), + entry_id: 1, + headers: vec![MultiplePartHeader::Last], + parts: vec![vec![1; 100]], + }), + ], + }; + + let reader = LogStoreEntryReader::new(raw_entry_stream); + let entries = reader + .read(&provider, 0) + .unwrap() + .try_collect::>() + .await + .unwrap() + .into_iter() + .map(|(_, entry)| entry) + .collect::>(); + + assert_eq!(entries, vec![wal_entry]); + } + + #[tokio::test] + async fn test_corrupted_stream() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let raw_entry_stream = MockRawEntryStream { + entries: vec![ + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: RegionId::new(1, 1), + entry_id: 1, + headers: vec![MultiplePartHeader::Last], + parts: vec![vec![1; 100]], + }), + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: RegionId::new(1, 1), + entry_id: 2, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + ], + }; + + let reader = LogStoreEntryReader::new(raw_entry_stream); + let err = reader + .read(&provider, 0) + .unwrap() + .try_collect::>() + .await + .unwrap_err(); + assert_matches!(err, error::Error::CorruptedEntry { .. }); + } } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index f6d890dc8ff8..595b6ee56635 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -75,7 +75,7 @@ impl RegionWorkerLoop { let timer = Instant::now(); let last_entry_id = replay_memtable( &self.wal, - ®ion.wal_options, + ®ion.provider, region_id, flushed_entry_id, ®ion.version_control, diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index b776f98aaa56..2d1c4b96ca39 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -212,7 +212,7 @@ impl RegionWorkerLoop { ); if let Err(e) = self .wal - .obsolete(region_id, request.flushed_entry_id, ®ion.wal_options) + .obsolete(region_id, request.flushed_entry_id, ®ion.provider) .await { error!(e; "Failed to write wal, region: {}", region_id); diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index f5598286a563..70aca9f6ace4 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -82,7 +82,7 @@ impl RegionWorkerLoop { .obsolete( region_id, truncate_result.truncated_entry_id, - ®ion.wal_options, + ®ion.provider, ) .await { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 3614d1be5de2..85ce49f3150f 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -84,8 +84,7 @@ impl RegionWorkerLoop { for (region_id, region_ctx) in region_ctxs.iter_mut() { // Safety: the log store implementation ensures that either the `write_to_wal` fails and no // response is returned or the last entry ids for each region do exist. - let last_entry_id = - response.last_entry_ids.get(®ion_id.as_u64()).unwrap(); + let last_entry_id = response.last_entry_ids.get(region_id).unwrap(); region_ctx.set_next_entry_id(last_entry_id + 1); } } @@ -162,7 +161,7 @@ impl RegionWorkerLoop { let region_ctx = RegionWriteCtx::new( region.region_id, ®ion.version_control, - region.wal_options.clone(), + region.provider.clone(), ); e.insert(region_ctx); diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 33739ac85fb4..347643982716 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -14,68 +14,64 @@ //! LogStore APIs. +pub mod entry; +pub mod provider; + use std::collections::HashMap; +use std::pin::Pin; use common_error::ext::ErrorExt; -use common_wal::options::WalOptions; +use entry::Entry; +use futures::Stream; -use crate::logstore::entry::Entry; -pub use crate::logstore::entry::Id as EntryId; -use crate::logstore::entry_stream::SendableEntryStream; -pub use crate::logstore::namespace::Id as NamespaceId; -use crate::logstore::namespace::Namespace; +pub type SendableEntryStream<'a, I, E> = Pin, E>> + Send + 'a>>; -pub mod entry; -pub mod entry_stream; -pub mod namespace; +pub use crate::logstore::entry::Id as EntryId; +use crate::logstore::provider::Provider; +use crate::storage::RegionId; /// `LogStore` serves as a Write-Ahead-Log for storage engine. #[async_trait::async_trait] pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Error: ErrorExt + Send + Sync + 'static; - type Namespace: Namespace; - type Entry: Entry; /// Stops components of the logstore. async fn stop(&self) -> Result<(), Self::Error>; - /// Appends an entry to the log store and returns a response containing the id of the append entry. - async fn append(&self, entry: Self::Entry) -> Result; - /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. - async fn append_batch( - &self, - entries: Vec, - ) -> Result; + async fn append_batch(&self, entries: Vec) -> Result; /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. async fn read( &self, - ns: &Self::Namespace, + provider: &Provider, id: EntryId, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; /// Creates a new `Namespace` from the given ref. - async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; + async fn create_namespace(&self, ns: &Provider) -> Result<(), Self::Error>; /// Deletes an existing `Namespace` specified by the given ref. - async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; + async fn delete_namespace(&self, ns: &Provider) -> Result<(), Self::Error>; /// Lists all existing namespaces. - async fn list_namespaces(&self) -> Result, Self::Error>; + async fn list_namespaces(&self) -> Result, Self::Error>; /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>; + async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>; /// Makes an entry instance of the associated Entry type - fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry; - - /// Makes a namespace instance of the associated Namespace type - fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace; + fn entry( + &self, + data: &mut Vec, + entry_id: EntryId, + region_id: RegionId, + provider: &Provider, + ) -> Result; } /// The response of an `append` operation. @@ -89,5 +85,5 @@ pub struct AppendResponse { #[derive(Debug, Default)] pub struct AppendBatchResponse { /// Key: region id (as u64). Value: the id of the last successfully written entry of the region. - pub last_entry_ids: HashMap, + pub last_entry_ids: HashMap, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 09daa2e1abb9..8b7f838be17a 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -12,58 +12,141 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem::size_of; + +use crate::logstore::provider::Provider; use crate::storage::RegionId; /// An entry's id. /// Different log store implementations may interpret the id to different meanings. pub type Id = u64; -/// The raw Wal entry. +/// The [Entry::Naive] is used in RaftEngineLogStore and KafkaLogStore. +/// +/// The [Entry::MultiplePart] contains multiple parts of data that split from a large entry, is used in KafkaLogStore, #[derive(Debug, Clone, PartialEq, Eq)] -pub struct RawEntry { +pub enum Entry { + Naive(NaiveEntry), + MultiplePart(MultiplePartEntry), +} + +impl Entry { + /// Into [NaiveEntry] if it's type of [Entry::Naive]. + pub fn into_naive_entry(self) -> Option { + match self { + Entry::Naive(entry) => Some(entry), + Entry::MultiplePart(_) => None, + } + } + + /// Into [MultiplePartEntry] if it's type of [Entry::MultiplePart]. + pub fn into_multiple_part_entry(self) -> Option { + match self { + Entry::Naive(_) => None, + Entry::MultiplePart(entry) => Some(entry), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NaiveEntry { + pub provider: Provider, pub region_id: RegionId, pub entry_id: Id, pub data: Vec, } -impl Entry for RawEntry { - fn into_raw_entry(self) -> RawEntry { - self +impl NaiveEntry { + fn estimated_size(&self) -> usize { + size_of::() + self.data.capacity() * size_of::() } +} - fn data(&self) -> &[u8] { - &self.data - } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MultiplePartHeader { + First, + Middle(usize), + Last, +} - fn id(&self) -> Id { - self.entry_id - } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MultiplePartEntry { + pub provider: Provider, + pub region_id: RegionId, + pub entry_id: Id, + pub headers: Vec, + pub parts: Vec>, +} - fn region_id(&self) -> RegionId { - self.region_id +impl MultiplePartEntry { + fn is_complete(&self) -> bool { + self.headers.contains(&MultiplePartHeader::First) + && self.headers.contains(&MultiplePartHeader::Last) } fn estimated_size(&self) -> usize { - std::mem::size_of_val(self) + size_of::() + + self + .parts + .iter() + .map(|data| data.capacity() * size_of::()) + .sum::() + + self.headers.capacity() * size_of::() } } -/// Entry is the minimal data storage unit through which users interact with the log store. -/// The log store implementation may have larger or smaller data storage unit than an entry. -pub trait Entry: Send + Sync { - /// Consumes [Entry] and converts to [RawEntry]. - fn into_raw_entry(self) -> RawEntry; +impl Entry { + /// Returns the [Provider] + pub fn provider(&self) -> &Provider { + match self { + Entry::Naive(entry) => &entry.provider, + Entry::MultiplePart(entry) => &entry.provider, + } + } - /// Returns the contained data of the entry. - fn data(&self) -> &[u8]; + /// Returns the [RegionId] + pub fn region_id(&self) -> RegionId { + match self { + Entry::Naive(entry) => entry.region_id, + Entry::MultiplePart(entry) => entry.region_id, + } + } - /// Returns the id of the entry. - /// Usually the namespace id is identical with the region id. - fn id(&self) -> Id; + /// Returns the [Id] + pub fn entry_id(&self) -> Id { + match self { + Entry::Naive(entry) => entry.entry_id, + Entry::MultiplePart(entry) => entry.entry_id, + } + } - /// Returns the [RegionId] - fn region_id(&self) -> RegionId; + /// Returns the [Id] + pub fn set_entry_id(&mut self, id: Id) { + match self { + Entry::Naive(entry) => entry.entry_id = id, + Entry::MultiplePart(entry) => entry.entry_id = id, + } + } + + /// Returns true if it's a complete entry. + pub fn is_complete(&self) -> bool { + match self { + Entry::Naive(_) => true, + Entry::MultiplePart(entry) => entry.is_complete(), + } + } - /// Computes the estimated encoded size. - fn estimated_size(&self) -> usize; + pub fn into_bytes(self) -> Vec { + match self { + Entry::Naive(entry) => entry.data, + Entry::MultiplePart(entry) => entry.parts.concat(), + } + } + + pub fn estimated_size(&self) -> usize { + match self { + Entry::Naive(entry) => entry.estimated_size(), + Entry::MultiplePart(entry) => entry.estimated_size(), + } + } } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs deleted file mode 100644 index 6a5886b0b53f..000000000000 --- a/src/store-api/src/logstore/entry_stream.rs +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::pin::Pin; - -use common_error::ext::ErrorExt; -use futures::Stream; - -use crate::logstore::entry::Entry; - -pub trait EntryStream: Stream, Self::Error>> { - type Error: ErrorExt; - type Entry: Entry; - - fn start_id(&self) -> u64; -} - -pub type SendableEntryStream<'a, I, E> = Pin, E>> + Send + 'a>>; - -#[cfg(test)] -mod tests { - use std::any::Any; - use std::task::{Context, Poll}; - - use common_error::ext::StackError; - use futures::StreamExt; - use snafu::Snafu; - - use super::*; - pub use crate::logstore::entry::Id; - use crate::logstore::entry::RawEntry; - use crate::storage::RegionId; - - pub struct SimpleEntry { - /// Binary data of current entry - data: Vec, - } - - #[derive(Debug, Snafu)] - #[snafu(visibility(pub))] - pub struct Error {} - - impl ErrorExt for Error { - fn as_any(&self) -> &dyn Any { - self - } - } - - impl StackError for Error { - fn debug_fmt(&self, _: usize, _: &mut Vec) {} - - fn next(&self) -> Option<&dyn StackError> { - None - } - } - - impl Entry for SimpleEntry { - fn into_raw_entry(self) -> RawEntry { - RawEntry { - region_id: RegionId::from_u64(0), - entry_id: 0, - data: vec![], - } - } - - fn data(&self) -> &[u8] { - &self.data - } - - fn id(&self) -> Id { - 0u64 - } - - fn region_id(&self) -> RegionId { - RegionId::from_u64(0) - } - - fn estimated_size(&self) -> usize { - self.data.len() - } - } - - impl SimpleEntry { - pub fn new(data: impl AsRef<[u8]>) -> Self { - let data = data.as_ref().to_vec(); - Self { data } - } - } - - pub struct EntryStreamImpl<'a> { - inner: SendableEntryStream<'a, SimpleEntry, Error>, - start_id: u64, - } - - impl<'a> EntryStream for EntryStreamImpl<'a> { - type Error = Error; - type Entry = SimpleEntry; - - fn start_id(&self) -> u64 { - self.start_id - } - } - - impl Stream for EntryStreamImpl<'_> { - type Item = Result, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(v)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } - } - - #[tokio::test] - pub async fn test_entry_stream() { - let stream = - async_stream::stream!(yield Ok(vec![SimpleEntry::new("test_entry".as_bytes())])); - - let mut stream_impl = EntryStreamImpl { - inner: Box::pin(stream), - start_id: 1234, - }; - - if let Some(v) = stream_impl.next().await { - let vec = v.unwrap(); - assert_eq!(1, vec.len()); - assert_eq!(b"test_entry", vec[0].data()); - } - } -} diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs deleted file mode 100644 index ac1b62e31bd4..000000000000 --- a/src/store-api/src/logstore/namespace.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::hash::Hash; - -/// The namespace id. -/// Usually the namespace id is identical with the region id. -pub type Id = u64; - -pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq { - /// Returns the namespace id. - fn id(&self) -> Id; -} diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs new file mode 100644 index 000000000000..f893a47df54f --- /dev/null +++ b/src/store-api/src/logstore/provider.rs @@ -0,0 +1,110 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use crate::storage::RegionId; + +// The Provider of kafka log store +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KafkaProvider { + pub topic: String, +} + +impl KafkaProvider { + pub fn new(topic: String) -> Self { + Self { topic } + } + + /// Returns the type name. + pub fn type_name() -> &'static str { + "KafkaProvider" + } +} + +impl Display for KafkaProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.topic) + } +} + +// The Provider of raft engine log store +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RaftEngineProvider { + pub id: u64, +} + +impl RaftEngineProvider { + pub fn new(id: u64) -> Self { + Self { id } + } + + /// Returns the type name. + pub fn type_name() -> &'static str { + "RaftEngineProvider" + } +} + +/// The Provider of LogStore +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Provider { + RaftEngine(RaftEngineProvider), + Kafka(Arc), +} + +impl Display for Provider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + Provider::RaftEngine(provider) => { + write!(f, "region: {}", RegionId::from_u64(provider.id)) + } + Provider::Kafka(provider) => write!(f, "topic: {}", provider.topic), + } + } +} + +impl Provider { + pub fn raft_engine_provider(id: u64) -> Provider { + Provider::RaftEngine(RaftEngineProvider { id }) + } + + pub fn kafka_provider(topic: String) -> Provider { + Provider::Kafka(Arc::new(KafkaProvider { topic })) + } + + /// Returns the type name. + pub fn type_name(&self) -> &'static str { + match self { + Provider::RaftEngine(_) => RaftEngineProvider::type_name(), + Provider::Kafka(_) => KafkaProvider::type_name(), + } + } + + /// Returns the reference of [`RaftEngineProvider`] if it's the type of [`LogStoreProvider::RaftEngine`]. + pub fn as_raft_engine_provider(&self) -> Option<&RaftEngineProvider> { + if let Provider::RaftEngine(ns) = self { + return Some(ns); + } + None + } + + /// Returns the reference of [`KafkaProvider`] if it's the type of [`LogStoreProvider::Kafka`]. + pub fn as_kafka_provider(&self) -> Option<&Arc> { + if let Provider::Kafka(ns) = self { + return Some(ns); + } + None + } +}