Skip to content

Commit

Permalink
feat: introduce the CollectionTask
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 8, 2024
1 parent 4a8c063 commit d8ab395
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 15 deletions.
17 changes: 15 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ protobuf-build = { version = "0.15", default-features = false, features = [
workspace = true

[dependencies]
arrow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
Expand All @@ -25,10 +26,13 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
object-store.workspace = true
parquet.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }
Expand Down
26 changes: 25 additions & 1 deletion src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,38 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to send produce request"))]
#[snafu(display("Failed to wait for ProduceResultReceiver"))]
WaitProduceResultReceiver {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to wait for result of DumpIndex"))]
WaitDumpIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::sync::oneshot::error::RecvError,
},

#[snafu(display("Failed to create writer"))]
CreateWriter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display("Failed to write index"))]
WriteIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
},

#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
Expand Down
4 changes: 3 additions & 1 deletion src/log-store/src/kafka/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// limitations under the License.

mod collector;
mod encoder;
mod iterator;

pub(crate) use collector::{
GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector,
GlobalIndexCollector, IndexCollector, NoopCollector, ProviderLevelIndexCollector,
};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
Expand Down
146 changes: 135 additions & 11 deletions src/log-store/src/kafka/index/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use std::collections::{BTreeSet, HashMap};
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bytes::buf::Writer;
use bytes::{BufMut, Bytes, BytesMut};
use common_telemetry::tracing::error;
use common_telemetry::{error, info};
use futures::future::try_join_all;
use object_store::Writer;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
Expand All @@ -31,14 +32,10 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;

use crate::error::{self, Result};
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, WorkerRequest};

pub trait IndexEncoder: Send + Sync {
fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes);

fn finish(&self) -> Result<Vec<u8>>;
}

/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
pub trait IndexCollector: Send + Sync {
/// Appends an [`EntryId`] for a specific region.
Expand All @@ -58,9 +55,136 @@ pub trait IndexCollector: Send + Sync {

/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
task: CollectionTask,
}

#[derive(Debug, Clone)]
pub struct CollectionTask {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
running: Arc<AtomicBool>,
}

impl CollectionTask {
async fn dump_index(
providers: &Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
operator: &object_store::ObjectStore,
path: &str,
) -> Result<()> {
let encoder = Arc::new(JsonIndexEncoder::default());
let receivers = {
let providers = providers.lock().await;
let mut receivers = Vec::with_capacity(providers.len());
for (provider, sender) in providers.iter() {
let (req, rx) = DumpIndexRequest::new(encoder.clone());
receivers.push(rx);
if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
error!(
"BackgroundProducerWorker is stopped, topic: {}",
provider.topic
)
}
}
receivers
};
try_join_all(receivers)
.await
.context(error::WaitDumpIndexSnafu)?;
let bytes = encoder.finish()?;
let mut writer = operator
.writer(path)
.await
.context(error::CreateWriterSnafu)?;
writer.write(bytes).await.context(error::WriteIndexSnafu)?;
writer.close().await.context(error::WriteIndexSnafu)?;

Ok(())
}

async fn checkpoint(
providers: &Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
) {
for (provider, sender) in providers.lock().await.iter() {
if sender.send(WorkerRequest::Checkpoint).await.is_err() {
error!(
"BackgroundProducerWorker is stopped, topic: {}",
provider.topic
)
}
}
}

/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(&mut self) {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval);
let providers = self.providers.clone();
let path = self.path.to_string();
let operator = self.operator.clone();
let running = self.running.clone();
common_runtime::spawn_global(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown the index collection task");
break;
}
select! {
_ = dump_index_interval.tick() => {
if let Err(err) = CollectionTask::dump_index(&providers, &operator, &path).await {
error!(err; "Failed to persist the WAL index");
}
},
_ = checkpoint_interval.tick() => {
CollectionTask::checkpoint(&providers).await;
}
}
}
});
}
}

impl Drop for CollectionTask {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}

impl GlobalIndexCollector {
/// Constructs a [`GlobalIndexCollector`].
///
/// This method initializes a `GlobalIndexCollector` instance and starts a background task
/// for managing WAL (Write-Ahead Logging) indexes.
///
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
pub fn new(
dump_index_interval: Duration,
checkpoint_interval: Duration,
operator: object_store::ObjectStore,
path: String,
) -> Self {
let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
Arc::new(Default::default());
let mut task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
checkpoint_interval,
operator,
path,
running: Arc::new(AtomicBool::new(true)),
};
task.run();
Self { providers, task }
}
}

impl GlobalIndexCollector {
Expand All @@ -83,8 +207,8 @@ impl GlobalIndexCollector {
/// latest [`EntryId`] across all regions.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RegionIndexes {
regions: HashMap<RegionId, BTreeSet<EntryId>>,
latest_entry_id: EntryId,
pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
pub(crate) latest_entry_id: EntryId,
}

impl RegionIndexes {
Expand Down
Loading

0 comments on commit d8ab395

Please sign in to comment.