Skip to content

Commit

Permalink
feat: add allow_stale_entries options to force obsolete wal entries
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Jan 4, 2024
1 parent 4b89fd6 commit 8e7b065
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 61 deletions.
2 changes: 2 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ sst_write_buffer_size = "8MB"
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

# Log options, see `standalone.example.toml`
# [logging]
Expand Down
2 changes: 2 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ sst_write_buffer_size = "8MB"
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

# Log options
# [logging]
Expand Down
3 changes: 0 additions & 3 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ pub enum Error {
last_index: u64,
attempt_index: u64,
},

#[snafu(display("Duplicate log entry, region: {}, attempt index: {}", region_id, index,))]
DuplicateLogIndex { region_id: RegionId, index: u64 },
}

impl ErrorExt for Error {
Expand Down
95 changes: 45 additions & 50 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -28,9 +29,9 @@ use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTr
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};

use crate::error::{
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, DuplicateLogIndexSnafu, Error,
FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu,
RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result,
StartGcTaskSnafu, StopGcTaskSnafu,
};
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace};
Expand Down Expand Up @@ -124,63 +125,57 @@ impl RaftEngineLogStore {
entries: Vec<EntryImpl>,
) -> Result<(LogBatch, HashMap<NamespaceId, EntryId>)> {
// Records the last entry id for each region's entries.
let mut entry_ids: HashMap<NamespaceId, BTreeSet<EntryId>> =
HashMap::with_capacity(entries.len());

let mut entry_ids: HashMap<NamespaceId, EntryId> = HashMap::with_capacity(entries.len());
let mut batch = LogBatch::with_capacity(entries.len());

for e in entries {
let ns_id = e.namespace_id;
if !entry_ids.entry(ns_id).or_default().insert(e.id) {
return DuplicateLogIndexSnafu {
region_id: ns_id,
index: e.id,
match entry_ids.entry(ns_id) {
Entry::Occupied(mut o) => {
let prev = *o.get();
ensure!(
e.id == prev + 1,
DiscontinuousLogIndexSnafu {
region_id: ns_id,
last_index: prev,
attempt_index: e.id
}
);
o.insert(e.id);
}
Entry::Vacant(v) => {
// this entry is the first in batch of given region.
if let Some(first_index) = self.engine.first_index(ns_id) {
// ensure the first in batch does not override compacted entry.
ensure!(
e.id >= first_index,
OverrideCompactedEntrySnafu {
namespace: ns_id,
first_index,
attempt_index: e.id,
}
);
}
// ensure the first in batch does not form a hole in raft-engine.
if let Some(last_index) = self.engine.last_index(ns_id) {
ensure!(
e.id == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: ns_id,
last_index,
attempt_index: e.id
}
);
}
v.insert(e.id);
}
.fail();
}
batch
.add_entries::<MessageType>(ns_id, &[e])
.context(AddEntryLogBatchSnafu)?;
}

let mut last_entry_ids = HashMap::with_capacity(entry_ids.len());
for (region, ids) in entry_ids {
let first_in_batch = *ids.first().unwrap();
let last_in_batch = *ids.last().unwrap();
ensure!(
(last_in_batch - first_in_batch) == ids.len() as u64 - 1,
DiscontinuousLogIndexSnafu {
region_id: region,
last_index: first_in_batch,
attempt_index: last_in_batch
}
);

if let Some(first_index) = self.engine.first_index(region) {
ensure!(
first_in_batch >= first_index,
OverrideCompactedEntrySnafu {
namespace: region,
first_index,
attempt_index: first_in_batch,
}
);
}

if let Some(last_index) = self.engine.last_index(region) {
ensure!(
first_in_batch == last_index + 1,
DiscontinuousLogIndexSnafu {
region_id: region,
last_index,
attempt_index: first_in_batch
}
);
}
last_entry_ids.insert(region, last_in_batch);
}

Ok((batch, last_entry_ids))
Ok((batch, entry_ids))
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct MitoConfig {
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
}

impl Default for MitoConfig {
Expand All @@ -98,6 +100,7 @@ impl Default for MitoConfig {
sst_write_buffer_size: ReadableSize::mb(8),
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
}
}
}
Expand Down
28 changes: 20 additions & 8 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl RegionOpener {
region_id,
flushed_entry_id,
&version_control,
config.allow_stale_entries,
)
.await?;
} else {
Expand Down Expand Up @@ -377,6 +378,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(
region_id: RegionId,
flushed_entry_id: EntryId,
version_control: &VersionControlRef,
allow_stale_entries: bool,
) -> Result<EntryId> {
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
Expand All @@ -388,14 +390,18 @@ pub(crate) async fn replay_memtable<S: LogStore>(
let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
ensure!(
entry_id > flushed_entry_id,
StaleLogEntrySnafu {
region_id,
flushed_entry_id,
unexpected_entry_id: entry_id
}
);
if entry_id <= flushed_entry_id {
ensure!(
allow_stale_entries,
StaleLogEntrySnafu {
region_id,
flushed_entry_id,
unexpected_entry_id: entry_id,
}
);
warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
}

last_entry_id = last_entry_id.max(entry_id);
for mutation in entry.mutations {
rows_replayed += mutation
Expand All @@ -411,6 +417,12 @@ pub(crate) async fn replay_memtable<S: LogStore>(
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();

if allow_stale_entries {
wal.obsolete(region_id, flushed_entry_id, &wal_options)
.await?;
info!("Force obsolete WAL entries, region id: {}, flushed entry id: {}, last entry id read: {}", region_id, flushed_entry_id, last_entry_id);
}

info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
region_id, rows_replayed, last_entry_id
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
flushed_entry_id,
&region.version_control,
self.config.allow_stale_entries,
)
.await?;
if let Some(expected_last_entry_id) = request.entry_id {
Expand Down
1 change: 1 addition & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ vector_cache_size = "512MiB"
page_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
[[datanode.region_engine]]
Expand Down

0 comments on commit 8e7b065

Please sign in to comment.