Skip to content

Commit

Permalink
chore: remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 13, 2024
1 parent 3adb382 commit 8d7bcc9
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 41 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum MetasrvWalConfig {
Kafka(MetasrvKafkaConfig),
}

#[allow(clippy::large_enum_variant)]
/// Wal configurations for datanode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
Expand Down
2 changes: 0 additions & 2 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ protobuf-build = { version = "0.15", default-features = false, features = [
workspace = true

[dependencies]
arrow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
Expand All @@ -32,7 +31,6 @@ futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
object-store.workspace = true
parquet.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }
Expand Down
8 changes: 4 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
// limitations under the License.

pub(crate) mod client_manager;
// TODO(weny): remove it
#[allow(dead_code)]
pub(crate) mod consumer;
#[allow(unused)]
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
// TODO(weny): remove it
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;

Expand Down
4 changes: 3 additions & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ impl ClientManager {

let (tx, rx) = OrderedBatchProducer::channel();
let index_collector = if let Some(global_collector) = self.global_index_collector.as_ref() {
global_collector.provider_level_index_collector(provider.clone(), tx.clone())
global_collector
.provider_level_index_collector(provider.clone(), tx.clone())
.await
} else {
Box::new(NoopCollector)
};
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod encoder;
mod iterator;

pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector, ProviderLevelIndexCollector};
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
Expand Down
10 changes: 4 additions & 6 deletions src/log-store/src/kafka/index/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;

use bytes::{BufMut, Bytes, BytesMut};
use common_telemetry::{error, info};
use futures::future::try_join_all;
use object_store::Writer;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
Expand Down Expand Up @@ -189,11 +186,12 @@ impl GlobalIndexCollector {

impl GlobalIndexCollector {
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub(crate) fn provider_level_index_collector(
pub(crate) async fn provider_level_index_collector(
&self,
provider: Arc<KafkaProvider>,
sender: Sender<WorkerRequest>,
) -> Box<dyn IndexCollector> {
self.providers.lock().await.insert(provider.clone(), sender);
Box::new(ProviderLevelIndexCollector {
indexes: Default::default(),
provider,
Expand Down Expand Up @@ -269,5 +267,5 @@ impl IndexCollector for NoopCollector {

fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}

fn dump(&mut self, encoder: &dyn IndexEncoder) {}
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
}
15 changes: 2 additions & 13 deletions src/log-store/src/kafka/index/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,9 @@
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::sync::{Arc, Mutex};

use arrow::array::{
Array, ArrayBuilder, ArrayData, ArrayRef, ListArray, ListBuilder, PrimitiveArray, RecordBatch,
StringArray, StructArray, StructBuilder, UInt64Array, UInt64Builder,
};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field, Fields, Schema, UInt64Type};
use arrow::util::pretty::pretty_format_batches;
use std::sync::Mutex;

use delta_encoding::{DeltaDecoderExt, DeltaEncoderExt};
use parquet::arrow::ArrowWriter;
use parquet::file::page_index::index_reader;
use parquet::schema::types::{Type, TypePtr};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
Expand Down
10 changes: 3 additions & 7 deletions src/log-store/src/kafka/index/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::{max, min};
use std::cmp::min;
use std::collections::VecDeque;
use std::iter::Peekable;
use std::marker::PhantomData;
use std::ops::{Add, Mul, Range, Sub};
use std::ops::Range;

use chrono::format::Item;
use itertools::Itertools;
use store_api::logstore::EntryId;

use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
Expand Down Expand Up @@ -197,7 +193,7 @@ mod tests {

#[test]
fn test_region_wal_range() {
let mut range = RegionWalRange::new(0..1024, 1024);
let range = RegionWalRange::new(0..1024, 1024);
assert_eq!(
range.next_batch_hint(10),
Some(NextBatchHint {
Expand Down
13 changes: 8 additions & 5 deletions src/log-store/src/test_util/log_store_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng

/// Create a [KafkaLogStore].
pub async fn create_kafka_log_store(broker_endpoints: Vec<String>) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
KafkaLogStore::try_new(
&DatanodeKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
..Default::default()
},
..Default::default()
})
None,
)
.await
.unwrap()
}

0 comments on commit 8d7bcc9

Please sign in to comment.