diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 6bcbfb2915f2..85409c51c579 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -148,7 +148,7 @@ impl RaftEngineLogStore { if let Some(first_index) = self.engine.first_index(ns_id) { // ensure the first in batch does not override compacted entry. ensure!( - e.id >= first_index, + e.id > first_index, OverrideCompactedEntrySnafu { namespace: ns_id, first_index, @@ -210,7 +210,7 @@ impl LogStore for RaftEngineLogStore { if let Some(first_index) = self.engine.first_index(namespace_id) { ensure!( - entry_id >= first_index, + entry_id > first_index, OverrideCompactedEntrySnafu { namespace: namespace_id, first_index, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e3d075e7de3a..80116ea9fdd8 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -387,10 +387,13 @@ pub(crate) async fn replay_memtable( let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); let replay_from_entry_id = flushed_entry_id + 1; + let mut stale_entry_found = false; let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; if entry_id <= flushed_entry_id { + stale_entry_found = true; + warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id); ensure!( allow_stale_entries, StaleLogEntrySnafu { @@ -399,7 +402,6 @@ pub(crate) async fn replay_memtable( unexpected_entry_id: entry_id, } ); - warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id); } last_entry_id = last_entry_id.max(entry_id); @@ -417,7 +419,7 @@ pub(crate) async fn replay_memtable( region_write_ctx.set_next_entry_id(last_entry_id + 1); region_write_ctx.write_memtable(); - if allow_stale_entries { + if allow_stale_entries && stale_entry_found { wal.obsolete(region_id, flushed_entry_id, wal_options) .await?; info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id);