From d8ab395141315734f82aff97fc718d742f16e766 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 6 Aug 2024 19:39:38 +0000 Subject: [PATCH] feat: introduce the `CollectionTask` --- Cargo.lock | 17 +- src/log-store/Cargo.toml | 4 + src/log-store/src/error.rs | 26 ++- src/log-store/src/kafka/index.rs | 4 +- src/log-store/src/kafka/index/collector.rs | 146 ++++++++++++++-- src/log-store/src/kafka/index/encoder.rs | 193 +++++++++++++++++++++ src/log-store/test.parquet | Bin 0 -> 1981 bytes 7 files changed, 375 insertions(+), 15 deletions(-) create mode 100644 src/log-store/src/kafka/index/encoder.rs create mode 100644 src/log-store/test.parquet diff --git a/Cargo.lock b/Cargo.lock index b6eed498cfde..b0df9122c7f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3155,6 +3155,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "delta-encoding" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f8513a5eeb3d7b9149563409dc4ab6fd9de5767fd285af5b4d0ee1b778fbce0" +dependencies = [ + "num-traits", +] + [[package]] name = "der" version = "0.5.1" @@ -4680,7 +4689,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -5765,6 +5774,7 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" name = "log-store" version = "0.9.1" dependencies = [ + "arrow", "async-stream", "async-trait", "bytes", @@ -5778,10 +5788,13 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "delta-encoding", "futures", "futures-util", "itertools 0.10.5", "lazy_static", + "object-store", + "parquet", "pin-project", "prometheus", "protobuf", @@ -8457,7 +8470,7 @@ dependencies = [ "indoc", "libc", "memoffset 0.9.1", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "portable-atomic", "pyo3-build-config", "pyo3-ffi", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 6a84965974eb..fd3df5cda6fe 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,6 +13,7 @@ protobuf-build = { version = "0.15", default-features = false, features = [ workspace = true [dependencies] +arrow.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true @@ -25,10 +26,13 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +delta-encoding = "0.4" futures.workspace = true futures-util.workspace = true itertools.workspace = true lazy_static.workspace = true +object-store.workspace = true +parquet.workspace = true pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 706f91e9d836..2b5d13f86000 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -265,7 +265,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to send produce request"))] + #[snafu(display("Failed to wait for ProduceResultReceiver"))] WaitProduceResultReceiver { #[snafu(implicit)] location: Location, @@ -273,6 +273,30 @@ pub enum Error { error: tokio::sync::oneshot::error::RecvError, }, + #[snafu(display("Failed to wait for result of DumpIndex"))] + WaitDumpIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: tokio::sync::oneshot::error::RecvError, + }, + + #[snafu(display("Failed to create writer"))] + CreateWriter { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + + #[snafu(display("Failed to write index"))] + WriteIndex { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: object_store::Error, + }, + #[snafu(display( "The length of meta if exceeded the limit: {}, actual: {}", limit, diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 1c646376165b..567f34f3cb9e 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -13,11 +13,13 @@ // limitations under the License. mod collector; +mod encoder; mod iterator; pub(crate) use collector::{ - GlobalIndexCollector, IndexCollector, IndexEncoder, NoopCollector, ProviderLevelIndexCollector, + GlobalIndexCollector, IndexCollector, NoopCollector, ProviderLevelIndexCollector, }; +pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, RegionWalVecIndex, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index b830a2b46d13..d3367d8fca89 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -14,13 +14,14 @@ use std::collections::{BTreeSet, HashMap}; use std::io::Write; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use bytes::buf::Writer; use bytes::{BufMut, Bytes, BytesMut}; -use common_telemetry::tracing::error; +use common_telemetry::{error, info}; use futures::future::try_join_all; +use object_store::Writer; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; @@ -31,14 +32,10 @@ use tokio::sync::mpsc::Sender; use tokio::sync::Mutex as TokioMutex; use crate::error::{self, Result}; +use crate::kafka::index::encoder::IndexEncoder; +use crate::kafka::index::JsonIndexEncoder; use crate::kafka::worker::{DumpIndexRequest, WorkerRequest}; -pub trait IndexEncoder: Send + Sync { - fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); - - fn finish(&self) -> Result>; -} - /// The [`IndexCollector`] trait defines the operations for managing and collecting index entries. pub trait IndexCollector: Send + Sync { /// Appends an [`EntryId`] for a specific region. @@ -58,9 +55,136 @@ pub trait IndexCollector: Send + Sync { /// The [`GlobalIndexCollector`] struct is responsible for managing index entries /// across multiple providers. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct GlobalIndexCollector { providers: Arc, Sender>>>, + task: CollectionTask, +} + +#[derive(Debug, Clone)] +pub struct CollectionTask { + providers: Arc, Sender>>>, + dump_index_interval: Duration, + checkpoint_interval: Duration, + operator: object_store::ObjectStore, + path: String, + running: Arc, +} + +impl CollectionTask { + async fn dump_index( + providers: &Arc, Sender>>>, + operator: &object_store::ObjectStore, + path: &str, + ) -> Result<()> { + let encoder = Arc::new(JsonIndexEncoder::default()); + let receivers = { + let providers = providers.lock().await; + let mut receivers = Vec::with_capacity(providers.len()); + for (provider, sender) in providers.iter() { + let (req, rx) = DumpIndexRequest::new(encoder.clone()); + receivers.push(rx); + if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() { + error!( + "BackgroundProducerWorker is stopped, topic: {}", + provider.topic + ) + } + } + receivers + }; + try_join_all(receivers) + .await + .context(error::WaitDumpIndexSnafu)?; + let bytes = encoder.finish()?; + let mut writer = operator + .writer(path) + .await + .context(error::CreateWriterSnafu)?; + writer.write(bytes).await.context(error::WriteIndexSnafu)?; + writer.close().await.context(error::WriteIndexSnafu)?; + + Ok(()) + } + + async fn checkpoint( + providers: &Arc, Sender>>>, + ) { + for (provider, sender) in providers.lock().await.iter() { + if sender.send(WorkerRequest::Checkpoint).await.is_err() { + error!( + "BackgroundProducerWorker is stopped, topic: {}", + provider.topic + ) + } + } + } + + /// The background task performs two main operations: + /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. + /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + fn run(&mut self) { + let mut dump_index_interval = tokio::time::interval(self.dump_index_interval); + let mut checkpoint_interval = tokio::time::interval(self.checkpoint_interval); + let providers = self.providers.clone(); + let path = self.path.to_string(); + let operator = self.operator.clone(); + let running = self.running.clone(); + common_runtime::spawn_global(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("shutdown the index collection task"); + break; + } + select! { + _ = dump_index_interval.tick() => { + if let Err(err) = CollectionTask::dump_index(&providers, &operator, &path).await { + error!(err; "Failed to persist the WAL index"); + } + }, + _ = checkpoint_interval.tick() => { + CollectionTask::checkpoint(&providers).await; + } + } + } + }); + } +} + +impl Drop for CollectionTask { + fn drop(&mut self) { + self.running.store(false, Ordering::Relaxed); + } +} + +impl GlobalIndexCollector { + /// Constructs a [`GlobalIndexCollector`]. + /// + /// This method initializes a `GlobalIndexCollector` instance and starts a background task + /// for managing WAL (Write-Ahead Logging) indexes. + /// + /// The background task performs two main operations: + /// - Persists the WAL index to the specified `path` at every `dump_index_interval`. + /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`. + pub fn new( + dump_index_interval: Duration, + checkpoint_interval: Duration, + operator: object_store::ObjectStore, + path: String, + ) -> Self { + let providers: Arc, Sender>>> = + Arc::new(Default::default()); + let mut task = CollectionTask { + providers: providers.clone(), + dump_index_interval, + checkpoint_interval, + operator, + path, + running: Arc::new(AtomicBool::new(true)), + }; + task.run(); + Self { providers, task } + } } impl GlobalIndexCollector { @@ -83,8 +207,8 @@ impl GlobalIndexCollector { /// latest [`EntryId`] across all regions. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct RegionIndexes { - regions: HashMap>, - latest_entry_id: EntryId, + pub(crate) regions: HashMap>, + pub(crate) latest_entry_id: EntryId, } impl RegionIndexes { diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs new file mode 100644 index 000000000000..f2124862530c --- /dev/null +++ b/src/log-store/src/kafka/index/encoder.rs @@ -0,0 +1,193 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; +use std::sync::{Arc, Mutex}; + +use arrow::array::{ + Array, ArrayBuilder, ArrayData, ArrayRef, ListArray, ListBuilder, PrimitiveArray, RecordBatch, + StringArray, StructArray, StructBuilder, UInt64Array, UInt64Builder, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field, Fields, Schema, UInt64Type}; +use arrow::util::pretty::pretty_format_batches; +use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt}; +use parquet::arrow::ArrowWriter; +use parquet::file::page_index::index_reader; +use parquet::schema::types::{Type, TypePtr}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::logstore::provider::KafkaProvider; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::kafka::index::collector::RegionIndexes; + +/// Converts a [`RegionIndexes`] instance into a [`DeltaEncodedRegionIndexes`]. +/// +/// This conversion encodes the index values using delta encoding to reduce storage space. +impl From<&RegionIndexes> for DeltaEncodedRegionIndexes { + fn from(value: &RegionIndexes) -> Self { + let mut regions = HashMap::with_capacity(value.regions.len()); + for (region_id, indexes) in value.regions.iter() { + let indexes = indexes.iter().copied().deltas().collect(); + regions.insert(*region_id, indexes); + } + Self { + regions, + last_index: value.latest_entry_id, + } + } +} + +/// Represents the delta-encoded version of region indexes for efficient storage. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct DeltaEncodedRegionIndexes { + regions: HashMap>, + last_index: u64, +} + +impl DeltaEncodedRegionIndexes { + /// Retrieves the original (decoded) index values for a given region. + fn region(&self, region_id: RegionId) -> Option> { + let decoded = self + .regions + .get(®ion_id) + .map(|delta| delta.iter().copied().original().collect::>()); + + decoded + } + + /// Retrieves the last index. + fn last_index(&self) -> u64 { + self.last_index + } +} + +pub trait IndexEncoder: Send + Sync { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes); + + fn finish(&self) -> Result>; +} + +/// [`DatanodeWalIndexes`] structure holds the WAL indexes for a datanode. +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct DatanodeWalIndexes(HashMap); + +impl DatanodeWalIndexes { + fn insert(&mut self, topic: String, region_index: &RegionIndexes) { + self.0.insert(topic, region_index.into()); + } + + fn encode(&mut self) -> Result> { + let value = serde_json::to_vec(&self.0).context(error::EncodeJsonSnafu); + self.0.clear(); + value + } + + fn decode(byte: &[u8]) -> Result { + serde_json::from_slice(byte).context(error::DecodeJsonSnafu) + } + + /// Retrieves the delta encoded region indexes for a given `provider`. + pub(crate) fn provider(&self, provider: &KafkaProvider) -> Option<&DeltaEncodedRegionIndexes> { + self.0.get(&provider.topic) + } +} + +/// [`JsonIndexEncoder`] encodes the [`RegionIndexes`]s into JSON format. +#[derive(Debug, Default)] +pub(crate) struct JsonIndexEncoder { + buf: Mutex, +} + +impl IndexEncoder for JsonIndexEncoder { + fn encode(&self, provider: &KafkaProvider, region_index: &RegionIndexes) { + self.buf + .lock() + .unwrap() + .insert(provider.topic.to_string(), region_index); + } + + fn finish(&self) -> Result> { + let mut buf = self.buf.lock().unwrap(); + buf.encode() + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap, HashSet}; + + use store_api::logstore::provider::KafkaProvider; + use store_api::storage::RegionId; + + use super::{DatanodeWalIndexes, IndexEncoder, JsonIndexEncoder}; + use crate::kafka::index::collector::RegionIndexes; + + #[test] + fn test_json_index_encoder() { + let encoder = JsonIndexEncoder::default(); + let topic_1 = KafkaProvider::new("my_topic_1".to_string()); + let region_1_indexes = BTreeSet::from([1u64, 2, 4, 5, 20]); + let region_2_indexes = BTreeSet::from([4u64, 12, 43, 54, 75]); + encoder.encode( + &topic_1, + &RegionIndexes { + regions: HashMap::from([ + (RegionId::new(1, 1), region_1_indexes.clone()), + (RegionId::new(1, 2), region_2_indexes.clone()), + ]), + latest_entry_id: 1024, + }, + ); + let topic_2 = KafkaProvider::new("my_topic_2".to_string()); + encoder.encode( + &topic_2, + &RegionIndexes { + regions: HashMap::from([ + ( + RegionId::new(1, 1), + BTreeSet::from([1024u64, 1025, 1026, 1028, 2048]), + ), + (RegionId::new(1, 2), BTreeSet::from([1512])), + ]), + latest_entry_id: 2048, + }, + ); + + let bytes = encoder.finish().unwrap(); + let datanode_index = DatanodeWalIndexes::decode(&bytes).unwrap(); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 1)) + .unwrap(), + region_1_indexes, + ); + assert_eq!( + datanode_index + .provider(&topic_1) + .unwrap() + .region(RegionId::new(1, 2)) + .unwrap(), + region_2_indexes, + ); + assert!(datanode_index + .provider(&KafkaProvider::new("my_topic_3".to_string())) + .is_none()); + } +} diff --git a/src/log-store/test.parquet b/src/log-store/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..a0f24431038c39b7be6915663c0e2f89d17dc681 GIT binary patch literal 1981 zcmbVNPfrs;6rXN4bwxCw%w#v&10i8AA}t6JjWG-$MJ%y2f!4-^tYs~YrRcW&kz?Zr zFdjVk2{`Z>j0bOCJQ?G`aPa8CPhfoS&CZsBnCP_KdGr4L-oM%AWnqk)ypQ+gxWUVj1HL%|+&-S-DYs-I*QRHGKPWMS_d1g-)#upOvv6ytwjP9>2}6V;^hP{( z%Ay;GfWXTs97{{=n9_J=gmUC<~Xupjz8%tkoLp!Cufb>$PUfthIv8loC*IZxiCJ4(`wH zA-HKio#Phu0CjL78m^?;bYU>)AwDz$+EH&T>>4f=G*T2a8YU0OkCil%-=U|fgd1nZ zeeP5ykb)&XCE)8l!nU3P=JXqm8jon>h9ejwcH9z}6`jR#u`UXTHF;8uIe}Ow8i`#o z!RG;mIj(qV@YRTHo4gT`KT^DfBp4hn^!j1AwY%1=Yy_J=Pchy*3?uW_KDKL4N2v0qsX+W{oFt;8Uk)AZ_zMxxzUCGlRboBC=VSIVK}an z^Y-1pT)PR+I(6;DuO2=i-kj&7%=>D%dIC;lcQ2)M-~a*IJqU1gE(Z9|iT#oCUct*No}2CqBHkg^d&Cmo zOksEyYy&t=cCRcH9u#k>QL&HAG_EN*Bc)u1AFlgS?#kJ!TtpV^lB6hEhO?CSkOTw0 z!)XaQRrGKdKFZbXRC*_!&cX@(MQdq{2-H#b&HRmY`sV&s2%>__IegF>-=Yoi7brBs z8$kVCSkbuY`b$*+z$XBJp&OKzy)CXEZZPJzLL| zi;6jesbuThW`Ju z160%U(s&00`kzCH9Rd#Qqa=q}RT%62Q z8ucfBkz9fDvT`ofKuB3tXL3SKCMPom8stIt)St@2;?DYJ{b{*GN6BpGnIAsi j4qDg3=HN~cHsNn&aB^%kGn#Smy%c}hM|cV5;g9<_JcWb9 literal 0 HcmV?d00001