From 60087341a4b064bd57f47575efe940990209e59b Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 23 Dec 2023 22:53:48 +0800 Subject: [PATCH 1/7] chore: update comments for log store stuff --- src/store-api/src/logstore.rs | 42 +++++++++++-------------- src/store-api/src/logstore/entry.rs | 15 ++++++--- src/store-api/src/logstore/namespace.rs | 3 ++ 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 3fb81d9a624c..efb2824e25c2 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use common_config::wal::WalOptions; use common_error::ext::ErrorExt; -use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset}; +use crate::logstore::entry::{Entry, Id as EntryId}; use crate::logstore::entry_stream::SendableEntryStream; use crate::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -34,21 +34,20 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Namespace: Namespace; type Entry: Entry; - /// Stop components of logstore. + /// Stops components of the logstore. async fn stop(&self) -> Result<(), Self::Error>; - /// Append an `Entry` to WAL with given namespace and return append response containing - /// the entry id. + /// Appends an entry to the log store and returns a response containing the id of the append entry. async fn append(&self, entry: Self::Entry) -> Result; - /// Append a batch of entries and return an append batch response containing the start entry ids of - /// log entries written to each region. + /// Appends a batch of entries and returns a response containing a map where the key is a region id + /// while the value is the id of the entry, the last entry of the entries belong to the region, written into the log store. async fn append_batch( &self, entries: Vec, ) -> Result; - /// Create a new `EntryStream` to asynchronously generates `Entry` with ids + /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. async fn read( &self, @@ -56,43 +55,40 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { id: EntryId, ) -> Result, Self::Error>; - /// Create a new `Namespace`. + /// Creates a new `Namespace` from the given ref. async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// Delete an existing `Namespace` with given ref. + /// Deletes an existing `Namespace` specified by the given ref. async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// List all existing namespaces. + /// Lists all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; - /// Create an entry of the associate Entry type + /// Creates an entry of the associated Entry type fn entry>(&self, data: D, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry; - /// Create a namespace of the associate Namespace type + /// Creates a namespace of the associated Namespace type // TODO(sunng87): confusion with `create_namespace` fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace; - /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete - /// the log files if all entries inside are obsolete. This method may not delete log - /// files immediately. + /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, + /// so that the log store can safely delete those entries. This method does not guarantee + /// that the obsolete entries are deleted immediately. async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>; } /// The response of an `append` operation. #[derive(Debug)] pub struct AppendResponse { - /// The entry id of the appended log entry. - pub entry_id: EntryId, - /// The start entry offset of the appended log entry. - /// Depends on the `LogStore` implementation, the entry offset may be missing. - pub offset: Option, + /// The id of the entry appended to the log store. + pub last_entry_id: EntryId, } /// The response of an `append_batch` operation. #[derive(Debug, Default)] pub struct AppendBatchResponse { - /// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region. - /// Depends on the `LogStore` implementation, the entry offsets may be missing. - pub offsets: HashMap, + /// Key: region id (as u64). + /// Value: the id of the entry, the last entry of the entries belong to the region, appended to the log store. + pub last_entry_ids: HashMap, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index cb2538086e6d..17d32f3bc1ce 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -16,21 +16,26 @@ use common_error::ext::ErrorExt; use crate::logstore::namespace::Namespace; -/// An entry's logical id, allocated by log store users. +/// An entry's id. +/// Different log store implementations may interpret the id to different meanings. pub type Id = u64; -/// An entry's physical offset in the underlying log store. +/// An entry's offset. +/// Notice: it's currently not used. pub type Offset = usize; -/// Entry is the minimal data storage unit in `LogStore`. +/// Entry is the minimal data storage unit through which users interact with the log store. +/// The log store implementation may have larger or smaller data storage unit than an entry. pub trait Entry: Send + Sync { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; - /// Return contained data of entry. + /// Returns the contained data of the entry. fn data(&self) -> &[u8]; - /// Return entry id that monotonically increments. + /// Returns the id of the entry. + /// Usually the namespace id is identical with the region id. fn id(&self) -> Id; + /// Returns the namespace of the entry. fn namespace(&self) -> Self::Namespace; } diff --git a/src/store-api/src/logstore/namespace.rs b/src/store-api/src/logstore/namespace.rs index 35a136d809ac..ac1b62e31bd4 100644 --- a/src/store-api/src/logstore/namespace.rs +++ b/src/store-api/src/logstore/namespace.rs @@ -14,8 +14,11 @@ use std::hash::Hash; +/// The namespace id. +/// Usually the namespace id is identical with the region id. pub type Id = u64; pub trait Namespace: Send + Sync + Clone + std::fmt::Debug + Hash + PartialEq + Eq { + /// Returns the namespace id. fn id(&self) -> Id; } From aa9a2f2db5e5b5769f8b34a096be145e9b0ed5d4 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sun, 24 Dec 2023 00:24:16 +0800 Subject: [PATCH 2/7] refactor: entry id usage --- src/log-store/src/noop.rs | 5 ++- src/log-store/src/raft_engine/log_store.rs | 16 ++++++--- src/mito2/src/error.rs | 9 ++++- src/mito2/src/region_write_ctx.rs | 1 + src/mito2/src/wal.rs | 6 ++-- src/mito2/src/worker/handle_write.rs | 38 ++++++++++++++++++---- 6 files changed, 55 insertions(+), 20 deletions(-) diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1929e59a2365..694641156fa4 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -66,14 +66,13 @@ impl LogStore for NoopLogStore { async fn append(&self, mut _e: Self::Entry) -> Result { Ok(AppendResponse { - entry_id: 0, - offset: None, + last_entry_id: Default::default(), }) } async fn append_batch(&self, _e: Vec) -> Result { Ok(AppendBatchResponse { - offsets: HashMap::new(), + last_entry_ids: HashMap::new(), }) } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index eb14bf0cf90a..7588a630962c 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -179,8 +180,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; Ok(AppendResponse { - entry_id, - offset: None, + last_entry_id: entry_id, }) } @@ -192,11 +192,18 @@ impl LogStore for RaftEngineLogStore { return Ok(AppendBatchResponse::default()); } + // Records the last entry id for each region's entries. + let mut last_entry_ids = HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { self.check_entry(&e)?; + // For raft-engine log store, the namespace id is the region id. let ns_id = e.namespace_id; + last_entry_ids + .entry(ns_id) + .and_modify(|x: &mut u64| *x = (*x).max(e.id)) + .or_insert(e.id); batch .add_entries::(ns_id, &[e]) .context(AddEntryLogBatchSnafu)?; @@ -207,8 +214,7 @@ impl LogStore for RaftEngineLogStore { .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - // The user of raft-engine log store does not care about the response. - Ok(AppendBatchResponse::default()) + Ok(AppendBatchResponse { last_entry_ids }) } /// Create a stream of entries from logstore in the given namespace. The end of stream is @@ -452,7 +458,7 @@ mod tests { )) .await .unwrap(); - assert_eq!(i, response.entry_id); + assert_eq!(i, response.last_entry_id); } let mut entries = HashSet::with_capacity(1024); let mut s = logstore.read(&Namespace::with_id(1), 0).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b1d48f8c654e..3e223d88e82c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -416,6 +416,12 @@ pub enum Error { error: ArrowError, location: Location, }, + + #[snafu(display("Missing the last entry id for region {}", region_id))] + MissingLastEntryId { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -453,7 +459,8 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } - | UnexpectedReplay { .. } => StatusCode::Unexpected, + | UnexpectedReplay { .. } + | MissingLastEntryId { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 8a6decefb4ac..82a7117f4923 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -168,6 +168,7 @@ impl RegionWriteCtx { &self.wal_options, )?; // We only call this method one time, but we still bump next entry id for consistency. + // Warning: this update will be ignored since the `set_next_entry_id` is called by the writer explicitly. self.next_entry_id += 1; Ok(()) } diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 3bbfefe96b93..ac17d3df5415 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use prost::Message; use snafu::ResultExt; use store_api::logstore::entry::Entry; -use store_api::logstore::LogStore; +use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::storage::RegionId; use crate::error::{ @@ -165,8 +165,7 @@ impl WalWriter { } /// Write all buffered entries to the WAL. - // TODO(niebayes): returns an `AppendBatchResponse` and handle it properly. - pub async fn write_to_wal(&mut self) -> Result<()> { + pub async fn write_to_wal(&mut self) -> Result { // TODO(yingwen): metrics. let entries = mem::take(&mut self.entries); @@ -175,7 +174,6 @@ impl WalWriter { .await .map_err(BoxedError::new) .context(WriteWalSnafu) - .map(|_| ()) } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 97a481d7d4dc..f1ac4af11a29 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,11 +17,11 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use store_api::logstore::LogStore; +use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{RejectWriteSnafu, Result}; +use crate::error::{MissingLastEntryIdSnafu, RejectWriteSnafu, Result}; use crate::metrics::{ WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, }; @@ -73,12 +73,17 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } - if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { - // Failed to write wal. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.set_error(e.clone()); + match wal_writer.write_to_wal().await.map_err(Arc::new) { + Ok(response) => { + update_next_entry_ids(&mut region_ctxs, &response); + } + Err(e) => { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; } - return; } } @@ -202,3 +207,22 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad Ok(()) } + +/// Updates the next entry id for each region with the returned last entry id contained in the response. +fn update_next_entry_ids( + region_ctxs: &mut HashMap, + response: &AppendBatchResponse, +) { + for (region_id, region_ctx) in region_ctxs.iter_mut() { + // Missing a last entry id is not deemed as a critical error and hence no need to abort writing to memtable. + let Some(last_entry_id) = response.last_entry_ids.get(®ion_id.as_u64()) else { + let e = MissingLastEntryIdSnafu { + region_id: *region_id, + } + .build(); + region_ctx.set_error(Arc::new(e)); + continue; + }; + region_ctx.set_next_entry_id(last_entry_id + 1); + } +} From fcfcda28512bb6ab03b4cbae08b15c6bb07d1b6e Mon Sep 17 00:00:00 2001 From: niebayes Date: Sun, 24 Dec 2023 00:26:15 +0800 Subject: [PATCH 3/7] tmp update --- src/mito2/src/worker/handle_write.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index f1ac4af11a29..7a012b03251a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -73,6 +73,7 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } + match wal_writer.write_to_wal().await.map_err(Arc::new) { Ok(response) => { update_next_entry_ids(&mut region_ctxs, &response); From 4c36a3c9ee5c8d491f939137220daa57db69d6a9 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sun, 24 Dec 2023 00:27:37 +0800 Subject: [PATCH 4/7] Revert "tmp update" This reverts commit fcfcda28512bb6ab03b4cbae08b15c6bb07d1b6e. --- src/mito2/src/worker/handle_write.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 7a012b03251a..f1ac4af11a29 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -73,7 +73,6 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } - match wal_writer.write_to_wal().await.map_err(Arc::new) { Ok(response) => { update_next_entry_ids(&mut region_ctxs, &response); From b4043a82855cb22de63434f888681a17177c4e58 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 14:55:49 +0800 Subject: [PATCH 5/7] fix: resolve review conversations --- src/log-store/src/raft_engine/log_store.rs | 75 ++++++++++++---------- src/mito2/src/error.rs | 9 +-- src/mito2/src/region_write_ctx.rs | 3 - src/mito2/src/worker/handle_write.rs | 31 +++------ src/store-api/src/logstore.rs | 5 +- 5 files changed, 52 insertions(+), 71 deletions(-) diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 7588a630962c..8dea0ca04217 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -387,7 +387,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_telemetry::debug; - use common_test_util::temp_dir::create_temp_dir; + use common_test_util::temp_dir::{create_temp_dir, TempDir}; use futures_util::StreamExt; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; @@ -532,10 +532,7 @@ mod tests { size } - #[tokio::test] - async fn test_compaction() { - common_telemetry::init_default_ut_logging(); - let dir = create_temp_dir("raft-engine-logstore-test"); + async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore { let path = dir.path().to_str().unwrap().to_string(); let config = RaftEngineConfig { @@ -545,7 +542,15 @@ mod tests { ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + RaftEngineLogStore::try_new(path, config).await.unwrap() + } + + #[tokio::test] + async fn test_compaction() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("raft-engine-logstore-test"); + let logstore = new_test_log_store(&dir).await; + let namespace = Namespace::with_id(42); for id in 0..4096 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -568,16 +573,8 @@ mod tests { async fn test_obsolete() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("raft-engine-logstore-test"); - let path = dir.path().to_str().unwrap().to_string(); + let logstore = new_test_log_store(&dir).await; - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..1024 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -597,16 +594,7 @@ mod tests { async fn test_append_batch() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); - let path = dir.path().to_str().unwrap().to_string(); - - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + let logstore = new_test_log_store(&dir).await; let entries = (0..8) .flat_map(|ns_id| { @@ -628,16 +616,7 @@ mod tests { async fn test_append_batch_interleaved() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); - - let path = dir.path().to_str().unwrap().to_string(); - let config = RaftEngineConfig { - file_size: ReadableSize::mb(2), - purge_threshold: ReadableSize::mb(4), - purge_interval: Duration::from_secs(5), - ..Default::default() - }; - - let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); + let logstore = new_test_log_store(&dir).await; let entries = vec![ Entry::create(0, 0, [b'0'; 4096].to_vec()), @@ -652,4 +631,30 @@ mod tests { assert_eq!((Some(0), Some(2)), logstore.span(&Namespace::with_id(0))); assert_eq!((Some(0), Some(1)), logstore.span(&Namespace::with_id(1))); } + + #[tokio::test] + async fn test_append_batch_response() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("logstore-append-batch-test"); + let logstore = new_test_log_store(&dir).await; + + let entries = vec![ + // Entry[0] from region 0. + Entry::create(0, 0, [b'0'; 4096].to_vec()), + // Entry[0] from region 1. + Entry::create(0, 1, [b'1'; 4096].to_vec()), + // Entry[1] from region 1. + Entry::create(1, 0, [b'1'; 4096].to_vec()), + // Entry[1] from region 0. + Entry::create(1, 1, [b'0'; 4096].to_vec()), + // Entry[2] from region 2. + Entry::create(2, 2, [b'2'; 4096].to_vec()), + ]; + + // Ensure the last entry id returned for each region is the expected one. + let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids; + assert_eq!(last_entry_ids[&0], 1); + assert_eq!(last_entry_ids[&1], 1); + assert_eq!(last_entry_ids[&2], 2); + } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3e223d88e82c..b1d48f8c654e 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -416,12 +416,6 @@ pub enum Error { error: ArrowError, location: Location, }, - - #[snafu(display("Missing the last entry id for region {}", region_id))] - MissingLastEntryId { - region_id: RegionId, - location: Location, - }, } pub type Result = std::result::Result; @@ -459,8 +453,7 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } - | UnexpectedReplay { .. } - | MissingLastEntryId { .. } => StatusCode::Unexpected, + | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 82a7117f4923..7d27e49eaf73 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -167,9 +167,6 @@ impl RegionWriteCtx { &self.wal_entry, &self.wal_options, )?; - // We only call this method one time, but we still bump next entry id for consistency. - // Warning: this update will be ignored since the `set_next_entry_id` is called by the writer explicitly. - self.next_entry_id += 1; Ok(()) } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index f1ac4af11a29..e10012d57447 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -17,11 +17,11 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use store_api::logstore::{AppendBatchResponse, LogStore}; +use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{MissingLastEntryIdSnafu, RejectWriteSnafu, Result}; +use crate::error::{RejectWriteSnafu, Result}; use crate::metrics::{ WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL, }; @@ -75,7 +75,13 @@ impl RegionWorkerLoop { } match wal_writer.write_to_wal().await.map_err(Arc::new) { Ok(response) => { - update_next_entry_ids(&mut region_ctxs, &response); + for (region_id, region_ctx) in region_ctxs.iter_mut() { + // Safety: the log store implementation ensures that either the `write_to_wal` fails and no + // response is returned or the last entry ids for each region do exist. + let last_entry_id = + response.last_entry_ids.get(®ion_id.as_u64()).unwrap(); + region_ctx.set_next_entry_id(last_entry_id + 1); + } } Err(e) => { // Failed to write wal. @@ -207,22 +213,3 @@ fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetad Ok(()) } - -/// Updates the next entry id for each region with the returned last entry id contained in the response. -fn update_next_entry_ids( - region_ctxs: &mut HashMap, - response: &AppendBatchResponse, -) { - for (region_id, region_ctx) in region_ctxs.iter_mut() { - // Missing a last entry id is not deemed as a critical error and hence no need to abort writing to memtable. - let Some(last_entry_id) = response.last_entry_ids.get(®ion_id.as_u64()) else { - let e = MissingLastEntryIdSnafu { - region_id: *region_id, - } - .build(); - region_ctx.set_error(Arc::new(e)); - continue; - }; - region_ctx.set_next_entry_id(last_entry_id + 1); - } -} diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index efb2824e25c2..fd08f2d6522b 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -41,7 +41,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { async fn append(&self, entry: Self::Entry) -> Result; /// Appends a batch of entries and returns a response containing a map where the key is a region id - /// while the value is the id of the entry, the last entry of the entries belong to the region, written into the log store. + /// while the value is the id of the last successfully written entry of the region. async fn append_batch( &self, entries: Vec, @@ -88,7 +88,6 @@ pub struct AppendResponse { /// The response of an `append_batch` operation. #[derive(Debug, Default)] pub struct AppendBatchResponse { - /// Key: region id (as u64). - /// Value: the id of the entry, the last entry of the entries belong to the region, appended to the log store. + /// Key: region id (as u64). Value: the id of the last successfully written entry of the region. pub last_entry_ids: HashMap, } From e98601b78ac9366ee0dc6426b381d0b7f1627368 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 15:05:47 +0800 Subject: [PATCH 6/7] fix: resolve review conversations --- src/log-store/src/raft_engine/log_store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 8dea0ca04217..33d1d6247687 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -193,7 +193,8 @@ impl LogStore for RaftEngineLogStore { } // Records the last entry id for each region's entries. - let mut last_entry_ids = HashMap::with_capacity(entries.len()); + let mut last_entry_ids: HashMap = + HashMap::with_capacity(entries.len()); let mut batch = LogBatch::with_capacity(entries.len()); for e in entries { @@ -202,7 +203,7 @@ impl LogStore for RaftEngineLogStore { let ns_id = e.namespace_id; last_entry_ids .entry(ns_id) - .and_modify(|x: &mut u64| *x = (*x).max(e.id)) + .and_modify(|x| *x = (*x).max(e.id)) .or_insert(e.id); batch .add_entries::(ns_id, &[e]) From 55b9d61cf057a5d05328ceec537888305897f186 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 15:21:16 +0800 Subject: [PATCH 7/7] chore: remove entry offset --- src/store-api/src/logstore/entry.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 17d32f3bc1ce..1748ff5621be 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -19,9 +19,6 @@ use crate::logstore::namespace::Namespace; /// An entry's id. /// Different log store implementations may interpret the id to different meanings. pub type Id = u64; -/// An entry's offset. -/// Notice: it's currently not used. -pub type Offset = usize; /// Entry is the minimal data storage unit through which users interact with the log store. /// The log store implementation may have larger or smaller data storage unit than an entry.