From 660bff043bee41dfddebbe189de6c953fbcf1364 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 13 Aug 2024 08:25:33 +0000 Subject: [PATCH] feat: truncate indexes --- src/log-store/src/kafka/client_manager.rs | 4 ++++ src/log-store/src/kafka/index/collector.rs | 26 +++++++++++++++++++++- src/log-store/src/kafka/log_store.rs | 16 ++++++++++++- src/log-store/src/kafka/worker.rs | 9 ++++++++ src/log-store/src/raft_engine/log_store.rs | 20 ++++++++++++----- src/mito2/src/wal.rs | 4 ++-- src/mito2/src/wal/raw_entry_reader.rs | 1 + src/store-api/src/logstore.rs | 7 +++++- 8 files changed, 77 insertions(+), 10 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index a2feb2201134..6337683c9392 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -168,6 +168,10 @@ impl ClientManager { Ok(Client { client, producer }) } + + pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> { + self.global_index_collector.as_ref() + } } #[cfg(test)] diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 228362e7934b..8901f5b4ec2b 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -31,7 +31,7 @@ use tokio::sync::Mutex as TokioMutex; use crate::error::{self, Result}; use crate::kafka::index::encoder::IndexEncoder; use crate::kafka::index::JsonIndexEncoder; -use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; +use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest}; /// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. pub trait IndexCollector: Send + Sync { @@ -197,6 +197,30 @@ impl GlobalIndexCollector { provider, }) } + + /// Truncates the index for a specific region up to a given [`EntryId`]. + /// + /// It removes all [`EntryId`]s smaller than `entry_id`. + pub(crate) async fn truncate( + &self, + provider: &Arc, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(sender) = self.providers.lock().await.get(provider).cloned() { + if sender + .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new( + region_id, entry_id, + ))) + .await + .is_err() + { + return error::OrderedBatchProducerStoppedSnafu {}.fail(); + } + } + + Ok(()) + } } /// The [`RegionIndexes`] struct maintains indexes for a collection of regions. diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 283378cbb511..909d2d9e333d 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -336,7 +336,21 @@ impl LogStore for KafkaLogStore { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { + if let Some(collector) = self.client_manager.global_index_collector() { + let provider = provider + .as_kafka_provider() + .with_context(|| InvalidProviderSnafu { + expected: KafkaProvider::type_name(), + actual: provider.type_name(), + })?; + collector.truncate(provider, region_id, entry_id).await?; + } Ok(()) } diff --git a/src/log-store/src/kafka/worker.rs b/src/log-store/src/kafka/worker.rs index 972d56d6f1c4..318ac1c8a587 100644 --- a/src/log-store/src/kafka/worker.rs +++ b/src/log-store/src/kafka/worker.rs @@ -82,6 +82,15 @@ pub(crate) struct TruncateIndexRequest { entry_id: EntryId, } +impl TruncateIndexRequest { + pub fn new(region_id: RegionId, entry_id: EntryId) -> Self { + Self { + region_id, + entry_id, + } + } +} + pub(crate) struct ProduceRequest { region_id: RegionId, batch: Vec, diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index d2a210fb4203..b4b46966f498 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -419,7 +419,12 @@ impl LogStore for RaftEngineLogStore { })) } - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> { + async fn obsolete( + &self, + provider: &Provider, + _region_id: RegionId, + entry_id: EntryId, + ) -> Result<()> { let ns = provider .as_raft_engine_provider() .with_context(|| InvalidProviderSnafu { @@ -639,7 +644,8 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..4096 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); @@ -647,7 +653,10 @@ mod tests { } let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; - logstore.obsolete(&namespace, 4000).await.unwrap(); + logstore + .obsolete(&namespace, region_id, 4000) + .await + .unwrap(); tokio::time::sleep(Duration::from_secs(6)).await; let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await; @@ -664,14 +673,15 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = new_test_log_store(&dir).await; - let namespace_id = 42; + let region_id = RegionId::new(1, 1); + let namespace_id = region_id.as_u64(); let namespace = Provider::raft_engine_provider(namespace_id); for id in 0..1024 { let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into(); let _ = logstore.append(entry).await.unwrap(); } - logstore.obsolete(&namespace, 100).await.unwrap(); + logstore.obsolete(&namespace, region_id, 100).await.unwrap(); assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap()); let res = logstore.read(&namespace, 100).await.unwrap(); diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index de6ad67b3208..7413f52b2c05 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -89,7 +89,7 @@ impl Wal { move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> { Box::pin(async move { store - .obsolete(provider, last_entry_id) + .obsolete(provider, region_id, last_entry_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) @@ -142,7 +142,7 @@ impl Wal { provider: &Provider, ) -> Result<()> { self.store - .obsolete(provider, last_id) + .obsolete(provider, region_id, last_id) .await .map_err(BoxedError::new) .context(DeleteWalSnafu { region_id }) diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index 6dd11c2c8f64..7436dec06a56 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -168,6 +168,7 @@ mod tests { async fn obsolete( &self, _provider: &Provider, + _region_id: RegionId, _entry_id: EntryId, ) -> Result<(), Self::Error> { unreachable!() diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 347643982716..32ab95f5c8d6 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -62,7 +62,12 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, /// so that the log store can safely delete those entries. This method does not guarantee /// that the obsolete entries are deleted immediately. - async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>; + async fn obsolete( + &self, + provider: &Provider, + region_id: RegionId, + entry_id: EntryId, + ) -> Result<(), Self::Error>; /// Makes an entry instance of the associated Entry type fn entry(