Skip to content

Commit

Permalink
chore: resolve some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jan 9, 2024
1 parent 8105ec5 commit b30f20b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl RaftEngineLogStore {
if let Some(first_index) = self.engine.first_index(ns_id) {
// ensure the first in batch does not override compacted entry.
ensure!(
e.id >= first_index,
e.id > first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
first_index,
Expand Down Expand Up @@ -210,7 +210,7 @@ impl LogStore for RaftEngineLogStore {

if let Some(first_index) = self.engine.first_index(namespace_id) {
ensure!(
entry_id >= first_index,
entry_id > first_index,
OverrideCompactedEntrySnafu {
namespace: namespace_id,
first_index,
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,13 @@ pub(crate) async fn replay_memtable<S: LogStore>(
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone());

let replay_from_entry_id = flushed_entry_id + 1;
let mut stale_entry_found = false;
let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
if entry_id <= flushed_entry_id {
stale_entry_found = true;
warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
ensure!(
allow_stale_entries,
StaleLogEntrySnafu {
Expand All @@ -399,7 +402,6 @@ pub(crate) async fn replay_memtable<S: LogStore>(
unexpected_entry_id: entry_id,
}
);
warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
}

last_entry_id = last_entry_id.max(entry_id);
Expand All @@ -417,7 +419,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();

if allow_stale_entries {
if allow_stale_entries && stale_entry_found {
wal.obsolete(region_id, flushed_entry_id, wal_options)
.await?;
info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id);
Expand Down

0 comments on commit b30f20b

Please sign in to comment.