Skip to content

Commit

Permalink
feat: truncate indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 13, 2024
1 parent 8d7bcc9 commit 660bff0
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 10 deletions.
4 changes: 4 additions & 0 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl ClientManager {

Ok(Client { client, producer })
}

pub(crate) fn global_index_collector(&self) -> Option<&GlobalIndexCollector> {
self.global_index_collector.as_ref()
}
}

#[cfg(test)]
Expand Down
26 changes: 25 additions & 1 deletion src/log-store/src/kafka/index/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::sync::Mutex as TokioMutex;
use crate::error::{self, Result};
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, WorkerRequest};
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};

/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
pub trait IndexCollector: Send + Sync {
Expand Down Expand Up @@ -197,6 +197,30 @@ impl GlobalIndexCollector {
provider,
})
}

/// Truncates the index for a specific region up to a given [`EntryId`].
///
/// It removes all [`EntryId`]s smaller than `entry_id`.
pub(crate) async fn truncate(
&self,
provider: &Arc<KafkaProvider>,
region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
if let Some(sender) = self.providers.lock().await.get(provider).cloned() {
if sender
.send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
region_id, entry_id,
)))
.await
.is_err()
{
return error::OrderedBatchProducerStoppedSnafu {}.fail();
}
}

Ok(())
}
}

/// The [`RegionIndexes`] struct maintains indexes for a collection of regions.
Expand Down
16 changes: 15 additions & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,21 @@ impl LogStore for KafkaLogStore {
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, _provider: &Provider, _entry_id: EntryId) -> Result<()> {
async fn obsolete(
&self,
provider: &Provider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
if let Some(collector) = self.client_manager.global_index_collector() {
let provider = provider
.as_kafka_provider()
.with_context(|| InvalidProviderSnafu {
expected: KafkaProvider::type_name(),
actual: provider.type_name(),
})?;
collector.truncate(provider, region_id, entry_id).await?;
}
Ok(())
}

Expand Down
9 changes: 9 additions & 0 deletions src/log-store/src/kafka/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ pub(crate) struct TruncateIndexRequest {
entry_id: EntryId,
}

impl TruncateIndexRequest {
pub fn new(region_id: RegionId, entry_id: EntryId) -> Self {
Self {
region_id,
entry_id,
}
}
}

pub(crate) struct ProduceRequest {
region_id: RegionId,
batch: Vec<Record>,
Expand Down
20 changes: 15 additions & 5 deletions src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,12 @@ impl LogStore for RaftEngineLogStore {
}))
}

async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<()> {
async fn obsolete(
&self,
provider: &Provider,
_region_id: RegionId,
entry_id: EntryId,
) -> Result<()> {
let ns = provider
.as_raft_engine_provider()
.with_context(|| InvalidProviderSnafu {
Expand Down Expand Up @@ -639,15 +644,19 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;

let namespace_id = 42;
let region_id = RegionId::new(1, 1);
let namespace_id = region_id.as_u64();
let namespace = Provider::raft_engine_provider(namespace_id);
for id in 0..4096 {
let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
let _ = logstore.append(entry).await.unwrap();
}

let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
logstore.obsolete(&namespace, 4000).await.unwrap();
logstore
.obsolete(&namespace, region_id, 4000)
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(6)).await;
let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
Expand All @@ -664,14 +673,15 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = new_test_log_store(&dir).await;

let namespace_id = 42;
let region_id = RegionId::new(1, 1);
let namespace_id = region_id.as_u64();
let namespace = Provider::raft_engine_provider(namespace_id);
for id in 0..1024 {
let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
let _ = logstore.append(entry).await.unwrap();
}

logstore.obsolete(&namespace, 100).await.unwrap();
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());

let res = logstore.read(&namespace, 100).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<S: LogStore> Wal<S> {
move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
store
.obsolete(provider, last_entry_id)
.obsolete(provider, region_id, last_entry_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<S: LogStore> Wal<S> {
provider: &Provider,
) -> Result<()> {
self.store
.obsolete(provider, last_id)
.obsolete(provider, region_id, last_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mod tests {
async fn obsolete(
&self,
_provider: &Provider,
_region_id: RegionId,
_entry_id: EntryId,
) -> Result<(), Self::Error> {
unreachable!()
Expand Down
7 changes: 6 additions & 1 deletion src/store-api/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
/// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
/// so that the log store can safely delete those entries. This method does not guarantee
/// that the obsolete entries are deleted immediately.
async fn obsolete(&self, provider: &Provider, entry_id: EntryId) -> Result<(), Self::Error>;
async fn obsolete(
&self,
provider: &Provider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<(), Self::Error>;

/// Makes an entry instance of the associated Entry type
fn entry(
Expand Down

0 comments on commit 660bff0

Please sign in to comment.