Skip to content

Commit

Permalink
chore: check entry id during replay
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jan 4, 2024
1 parent cdd49e1 commit 33c47e4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -22,7 +22,7 @@ use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{error, info};
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::Id as EntryId;
use store_api::logstore::entry_stream::SendableEntryStream;
use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
Expand Down
13 changes: 13 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ pub enum Error {
blob_type: String,
location: Location,
},

#[snafu(display(
"Stale log entry found during replay, region: {}, flushed: {}, replayed: {}",
region_id,
flushed_entry_id,
unexpected_entry_id
))]
StaleLogEntry {
region_id: RegionId,
flushed_entry_id: u64,
unexpected_entry_id: u64,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -555,6 +567,7 @@ impl ErrorExt for Error {
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
StaleLogEntry { .. } => StatusCode::Unexpected,
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result};
use crate::error::{
EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::MemtableBuilderRef;
Expand Down Expand Up @@ -386,7 +388,14 @@ pub(crate) async fn replay_memtable<S: LogStore>(
let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
debug_assert!(entry_id > flushed_entry_id);
ensure!(
entry_id > flushed_entry_id,
StaleLogEntrySnafu {
region_id,
flushed_entry_id,
unexpected_entry_id: entry_id
}
);
last_entry_id = last_entry_id.max(entry_id);
for mutation in entry.mutations {
rows_replayed += mutation
Expand Down

0 comments on commit 33c47e4

Please sign in to comment.