From 8977c62f8d356dc6afc0eef7a13ffa8db04ce91a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Tue, 13 Aug 2024 08:02:07 +0000 Subject: [PATCH] chore: remove unused code --- Cargo.lock | 2 -- src/common/wal/src/config.rs | 1 + src/log-store/Cargo.toml | 4 +--- src/log-store/src/kafka.rs | 8 ++++---- src/log-store/src/kafka/client_manager.rs | 4 +++- src/log-store/src/kafka/index.rs | 2 +- src/log-store/src/kafka/index/collector.rs | 10 ++++------ src/log-store/src/kafka/index/encoder.rs | 15 ++------------- src/log-store/src/kafka/index/iterator.rs | 10 +++------- src/log-store/src/test_util/log_store_util.rs | 13 ++++++++----- 10 files changed, 27 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8243c9efa735..7a3e48e6456a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5796,7 +5796,6 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" name = "log-store" version = "0.9.1" dependencies = [ - "arrow", "async-stream", "async-trait", "bytes", @@ -5816,7 +5815,6 @@ dependencies = [ "itertools 0.10.5", "lazy_static", "object-store", - "parquet", "pin-project", "prometheus", "protobuf", diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index fa24a9970cfc..90f3e44f9c4a 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig { Kafka(MetasrvKafkaConfig), } +#[allow(clippy::large_enum_variant)] /// Wal configurations for datanode. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] #[serde(tag = "provider", rename_all = "snake_case")] diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index fd3df5cda6fe..12979e3c5afd 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,7 +13,6 @@ protobuf-build = { version = "0.15", default-features = false, features = [ workspace = true [dependencies] -arrow.workspace = true async-stream.workspace = true async-trait.workspace = true bytes.workspace = true @@ -27,12 +26,11 @@ common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true delta-encoding = "0.4" -futures.workspace = true futures-util.workspace = true +futures.workspace = true itertools.workspace = true lazy_static.workspace = true object-store.workspace = true -parquet.workspace = true pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 551e3cc4c518..ad73b770398c 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -13,15 +13,15 @@ // limitations under the License. pub(crate) mod client_manager; -// TODO(weny): remove it -#[allow(dead_code)] pub(crate) mod consumer; -#[allow(unused)] +/// TODO(weny): remove it. +#[allow(dead_code)] +#[allow(unused_imports)] pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; -// TODO(weny): remove it +/// TODO(weny): remove it. #[allow(dead_code)] pub(crate) mod worker; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index d965edfe5569..a2feb2201134 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -151,7 +151,9 @@ impl ClientManager { let (tx, rx) = OrderedBatchProducer::channel(); let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() { - global_collector.provider_level_index_collector(provider.clone(), tx.clone()) + global_collector + .provider_level_index_collector(provider.clone(), tx.clone()) + .await } else { Box::new(NoopCollector) }; diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs index 7e38f14d9a61..1bd0f3e621e8 100644 --- a/src/log-store/src/kafka/index.rs +++ b/src/log-store/src/kafka/index.rs @@ -17,7 +17,7 @@ mod encoder; mod iterator; pub use collector::GlobalIndexCollector; -pub(crate) use collector::{IndexCollector, NoopCollector, ProviderLevelIndexCollector}; +pub(crate) use collector::{IndexCollector, NoopCollector}; pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder}; pub(crate) use iterator::{ MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, diff --git a/src/log-store/src/kafka/index/collector.rs b/src/log-store/src/kafka/index/collector.rs index 76f74d52f59b..228362e7934b 100644 --- a/src/log-store/src/kafka/index/collector.rs +++ b/src/log-store/src/kafka/index/collector.rs @@ -13,15 +13,12 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap}; -use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; -use bytes::{BufMut, Bytes, BytesMut}; use common_telemetry::{error, info}; use futures::future::try_join_all; -use object_store::Writer; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; @@ -189,11 +186,12 @@ impl GlobalIndexCollector { impl GlobalIndexCollector { /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider. - pub(crate) fn provider_level_index_collector( + pub(crate) async fn provider_level_index_collector( &self, provider: Arc, sender: Sender, ) -> Box { + self.providers.lock().await.insert(provider.clone(), sender); Box::new(ProviderLevelIndexCollector { indexes: Default::default(), provider, @@ -269,5 +267,5 @@ impl IndexCollector for NoopCollector { fn set_latest_entry_id(&mut self, _entry_id: EntryId) {} - fn dump(&mut self, encoder: &dyn IndexEncoder) {} + fn dump(&mut self, _encoder: &dyn IndexEncoder) {} } diff --git a/src/log-store/src/kafka/index/encoder.rs b/src/log-store/src/kafka/index/encoder.rs index f2124862530c..bfd11a982c14 100644 --- a/src/log-store/src/kafka/index/encoder.rs +++ b/src/log-store/src/kafka/index/encoder.rs @@ -13,20 +13,9 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap}; -use std::fs::File; -use std::sync::{Arc, Mutex}; - -use arrow::array::{ - Array, ArrayBuilder, ArrayData, ArrayRef, ListArray, ListBuilder, PrimitiveArray, RecordBatch, - StringArray, StructArray, StructBuilder, UInt64Array, UInt64Builder, -}; -use arrow::buffer::OffsetBuffer; -use arrow::datatypes::{DataType, Field, Fields, Schema, UInt64Type}; -use arrow::util::pretty::pretty_format_batches; +use std::sync::Mutex; + use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt}; -use parquet::arrow::ArrowWriter; -use parquet::file::page_index::index_reader; -use parquet::schema::types::{Type, TypePtr}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::provider::KafkaProvider; diff --git a/src/log-store/src/kafka/index/iterator.rs b/src/log-store/src/kafka/index/iterator.rs index 8a33cf1d9a3d..7df2518752bd 100644 --- a/src/log-store/src/kafka/index/iterator.rs +++ b/src/log-store/src/kafka/index/iterator.rs @@ -12,14 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::{max, min}; +use std::cmp::min; use std::collections::VecDeque; -use std::iter::Peekable; -use std::marker::PhantomData; -use std::ops::{Add, Mul, Range, Sub}; +use std::ops::Range; -use chrono::format::Item; -use itertools::Itertools; use store_api::logstore::EntryId; use crate::kafka::util::range::{ConvertIndexToRange, MergeRange}; @@ -197,7 +193,7 @@ mod tests { #[test] fn test_region_wal_range() { - let mut range = RegionWalRange::new(0..1024, 1024); + let range = RegionWalRange::new(0..1024, 1024); assert_eq!( range.next_batch_hint(10), Some(NextBatchHint { diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index f78b5a965d0c..b1fd183fbacc 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -34,13 +34,16 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng /// Create a [KafkaLogStore]. pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { - KafkaLogStore::try_new(&DatanodeKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, + KafkaLogStore::try_new( + &DatanodeKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, ..Default::default() }, - ..Default::default() - }) + None, + ) .await .unwrap() }