diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 8f81829eab40..7783e5a1d703 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -116,6 +116,8 @@ sst_write_buffer_size = "8MB" scan_parallelism = 0 # Capacity of the channel to send data from parallel scan tasks to the main task (default 32). parallel_scan_channel_size = 32 +# Whether to allow stale WAL entries read during replay. +allow_stale_entries = false # Log options, see `standalone.example.toml` # [logging] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index cb2c0bd61ab9..1f7821dfc807 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -207,6 +207,8 @@ sst_write_buffer_size = "8MB" scan_parallelism = 0 # Capacity of the channel to send data from parallel scan tasks to the main task (default 32). parallel_scan_channel_size = 32 +# Whether to allow stale WAL entries read during replay. +allow_stale_entries = false # Log options # [logging] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 51d8290ba34d..a0ca2d0d3326 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -185,9 +185,6 @@ pub enum Error { last_index: u64, attempt_index: u64, }, - - #[snafu(display("Duplicate log entry, region: {}, attempt index: {}", region_id, index,))] - DuplicateLogIndex { region_id: RegionId, index: u64 }, } impl ErrorExt for Error { diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 2c11f8c78f18..6bcbfb2915f2 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -28,9 +29,9 @@ use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTr use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error::{ - AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, DuplicateLogIndexSnafu, Error, - FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, - RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, + AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, + IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, + StartGcTaskSnafu, StopGcTaskSnafu, }; use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; @@ -124,63 +125,57 @@ impl RaftEngineLogStore { entries: Vec, ) -> Result<(LogBatch, HashMap)> { // Records the last entry id for each region's entries. - let mut entry_ids: HashMap> = - HashMap::with_capacity(entries.len()); - + let mut entry_ids: HashMap = HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { let ns_id = e.namespace_id; - if !entry_ids.entry(ns_id).or_default().insert(e.id) { - return DuplicateLogIndexSnafu { - region_id: ns_id, - index: e.id, + match entry_ids.entry(ns_id) { + Entry::Occupied(mut o) => { + let prev = *o.get(); + ensure!( + e.id == prev + 1, + DiscontinuousLogIndexSnafu { + region_id: ns_id, + last_index: prev, + attempt_index: e.id + } + ); + o.insert(e.id); + } + Entry::Vacant(v) => { + // this entry is the first in batch of given region. + if let Some(first_index) = self.engine.first_index(ns_id) { + // ensure the first in batch does not override compacted entry. + ensure!( + e.id >= first_index, + OverrideCompactedEntrySnafu { + namespace: ns_id, + first_index, + attempt_index: e.id, + } + ); + } + // ensure the first in batch does not form a hole in raft-engine. + if let Some(last_index) = self.engine.last_index(ns_id) { + ensure!( + e.id == last_index + 1, + DiscontinuousLogIndexSnafu { + region_id: ns_id, + last_index, + attempt_index: e.id + } + ); + } + v.insert(e.id); } - .fail(); } batch .add_entries::(ns_id, &[e]) .context(AddEntryLogBatchSnafu)?; } - let mut last_entry_ids = HashMap::with_capacity(entry_ids.len()); - for (region, ids) in entry_ids { - let first_in_batch = *ids.first().unwrap(); - let last_in_batch = *ids.last().unwrap(); - ensure!( - (last_in_batch - first_in_batch) == ids.len() as u64 - 1, - DiscontinuousLogIndexSnafu { - region_id: region, - last_index: first_in_batch, - attempt_index: last_in_batch - } - ); - - if let Some(first_index) = self.engine.first_index(region) { - ensure!( - first_in_batch >= first_index, - OverrideCompactedEntrySnafu { - namespace: region, - first_index, - attempt_index: first_in_batch, - } - ); - } - - if let Some(last_index) = self.engine.last_index(region) { - ensure!( - first_in_batch == last_index + 1, - DiscontinuousLogIndexSnafu { - region_id: region, - last_index, - attempt_index: first_in_batch - } - ); - } - last_entry_ids.insert(region, last_in_batch); - } - - Ok((batch, last_entry_ids)) + Ok((batch, entry_ids)) } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 2e779b760260..8a7a30b933f8 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -78,6 +78,8 @@ pub struct MitoConfig { pub scan_parallelism: usize, /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). pub parallel_scan_channel_size: usize, + /// Whether to allow stale entries read during replay. + pub allow_stale_entries: bool, } impl Default for MitoConfig { @@ -98,6 +100,7 @@ impl Default for MitoConfig { sst_write_buffer_size: ReadableSize::mb(8), scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, + allow_stale_entries: false, } } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index b5a4c74c461e..d16fe1b8f739 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -269,6 +269,7 @@ impl RegionOpener { region_id, flushed_entry_id, &version_control, + config.allow_stale_entries, ) .await?; } else { @@ -377,6 +378,7 @@ pub(crate) async fn replay_memtable( region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, + allow_stale_entries: bool, ) -> Result { let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no @@ -388,14 +390,18 @@ pub(crate) async fn replay_memtable( let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; - ensure!( - entry_id > flushed_entry_id, - StaleLogEntrySnafu { - region_id, - flushed_entry_id, - unexpected_entry_id: entry_id - } - ); + if entry_id <= flushed_entry_id { + ensure!( + allow_stale_entries, + StaleLogEntrySnafu { + region_id, + flushed_entry_id, + unexpected_entry_id: entry_id, + } + ); + warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id); + } + last_entry_id = last_entry_id.max(entry_id); for mutation in entry.mutations { rows_replayed += mutation @@ -411,6 +417,12 @@ pub(crate) async fn replay_memtable( region_write_ctx.set_next_entry_id(last_entry_id + 1); region_write_ctx.write_memtable(); + if allow_stale_entries { + wal.obsolete(region_id, flushed_entry_id, &wal_options) + .await?; + info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id); + } + info!( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", region_id, rows_replayed, last_entry_id diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index e8f3655cf71a..f99cc88f159d 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -73,6 +73,7 @@ impl RegionWorkerLoop { region_id, flushed_entry_id, ®ion.version_control, + self.config.allow_stale_entries, ) .await?; if let Some(expected_last_entry_id) = request.entry_id { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9341ba5f09ce..e5e33f38511c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -809,6 +809,7 @@ vector_cache_size = "512MiB" page_cache_size = "512MiB" sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 +allow_stale_entries = false [[datanode.region_engine]]