diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 80aaa90d402f..a7519493ea63 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -295,4 +295,60 @@ mod tests { let manager = TopicManager::new(config, kv_backend); manager.start().await.unwrap(); } + + // Tests that the TopicManager allocates topics in a round-robin mannar. + // #[tokio::test] + // async fn test_kafka_alloc_topics() { + // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) + // .unwrap() + // .split(',') + // .map(ToString::to_string) + // .collect::>(); + // let config = MetaSrvKafkaConfig { + // topic_name_prefix: "__test_kafka_alloc_topics".to_string(), + // replication_factor: broker_endpoints.len() as i16, + // broker_endpoints, + // ..Default::default() + // }; + // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + // let manager = KafkaTopicManager::new(config.clone(), kv_backend); + // manager.start().await.unwrap(); + + // // Topics should be created. + // let topics = (0..config.num_topics) + // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + // .collect::>(); + + // // Selects exactly the number of `num_topics` topics one by one. + // for expected in topics.iter() { + // let got = manager.select().unwrap(); + // assert_eq!(got, expected); + // } + + // // Selects exactly the number of `num_topics` topics in a batching manner. + // let got = manager + // .select_batch(config.num_topics) + // .unwrap() + // .into_iter() + // .map(ToString::to_string) + // .collect::>(); + // assert_eq!(got, topics); + + // // Selects none. + // let got = manager.select_batch(config.num_topics).unwrap(); + // assert!(got.is_empty()); + + // // Selects more than the number of `num_topics` topics. + // let got = manager + // .select_batch(2 * config.num_topics) + // .unwrap() + // .into_iter() + // .map(ToString::to_string) + // .collect::>(); + // let expected = vec![topics.clone(); 2] + // .into_iter() + // .flatten() + // .collect::>(); + // assert_eq!(got, expected); + // } } diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index 6c2702053b87..b6f6a99ba089 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -128,4 +128,43 @@ mod tests { .collect(); assert_eq!(got, expected); } + + // Tests that the wal options allocator could successfully allocate Kafka wal options. + // #[tokio::test] + // async fn test_kafka_options_allocator() { + // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) + // .unwrap() + // .split(',') + // .map(ToString::to_string) + // .collect::>(); + // let config = MetaSrvKafkaConfig { + // topic_name_prefix: "__test_kafka_options_allocator".to_string(), + // replication_factor: broker_endpoints.len() as i16, + // broker_endpoints, + // ..Default::default() + // }; + // let wal_config = WalConfig::Kafka(config.clone()); + // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; + // let allocator = WalOptionsAllocator::new(wal_config, kv_backend); + // allocator.start().await.unwrap(); + + // let num_regions = 32; + // let regions = (0..num_regions).collect::>(); + // let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); + + // // Topics should be allocated. + // let topics = (0..num_regions) + // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) + // .collect::>(); + // // Check the allocated wal options contain the expected topics. + // let expected = (0..num_regions) + // .map(|i| { + // let options = WalOptions::Kafka(KafkaWalOptions { + // topic: topics[i as usize].clone(), + // }); + // (i, serde_json::to_string(&options).unwrap()) + // }) + // .collect::>(); + // assert_eq!(got, expected); + // } } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 3eb6312b2f72..27b01035c710 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -98,12 +98,12 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - let client_pool = self.client_pool.read().await; - if let Some(client) = client_pool.get(topic) { - return Ok(client.clone()); + { + let client_pool = self.client_pool.read().await; + if let Some(client) = client_pool.get(topic) { + return Ok(client.clone()); + } } - // Manullay releases the read lock. - drop(client_pool); // Acquires the write lock. let mut client_pool = self.client_pool.write().await; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 30ea652c51f7..9177ca423247 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -293,7 +293,7 @@ mod tests { use super::*; use crate::get_broker_endpoints_from_env; use crate::test_util::kafka::{ - create_topics, entries_with_random_data, Affix, EntryBuilder, TopicDecorator, + create_topics, entries_with_random_data, new_namespace, Affix, EntryBuilder, TopicDecorator, }; // Stores test context for a region. @@ -325,10 +325,7 @@ mod tests { (0..num_regions) .map(|i| { let topic = &topics[i % topics.len()]; - let ns = NamespaceImpl { - region_id: i as u64, - topic: topic.to_string(), - }; + let ns = new_namespace(topic, i as u64); let entry_builder = EntryBuilder::new(ns.clone()); RegionContext { ns, diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 547e2c7b3112..2307020fa37d 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -145,26 +145,12 @@ pub(crate) fn decode_from_record(record: Record) -> Result> { #[cfg(test)] mod tests { use super::*; - - fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { - EntryImpl { - data: data.as_ref().to_vec(), - id: entry_id, - ns, - } - } + use crate::test_util::kafka::{entries_with_random_data, new_namespace, EntryBuilder}; #[test] fn test_serde_record_meta() { - let ns = NamespaceImpl { - region_id: 1, - topic: "test_topic".to_string(), - }; - let entries = vec![ - new_test_entry(b"111", 1, ns.clone()), - new_test_entry(b"2222", 2, ns.clone()), - new_test_entry(b"33333", 3, ns.clone()), - ]; + let ns = new_namespace("test_topic", 1); + let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone())); let meta = RecordMeta::new(ns, &entries); let encoded = serde_json::to_vec(&meta).unwrap(); let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap(); @@ -173,15 +159,8 @@ mod tests { #[test] fn test_encdec_record() { - let ns = NamespaceImpl { - region_id: 1, - topic: "test_topic".to_string(), - }; - let entries = vec![ - new_test_entry(b"111", 1, ns.clone()), - new_test_entry(b"2222", 2, ns.clone()), - new_test_entry(b"33333", 3, ns.clone()), - ]; + let ns = new_namespace("test_topic", 1); + let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone())); let record = encode_to_record(ns, entries.clone()).unwrap(); let decoded_entries = decode_from_record(record).unwrap(); assert_eq!(entries, decoded_entries); diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs index 3f926e6c1c8b..1bd9a997c203 100644 --- a/src/log-store/src/test_util/kafka.rs +++ b/src/log-store/src/test_util/kafka.rs @@ -18,7 +18,7 @@ pub mod topic_decorator; use common_config::wal::KafkaWalTopic as Topic; use rskafka::client::ClientBuilder; -use crate::kafka::EntryImpl; +use crate::kafka::{EntryImpl, NamespaceImpl}; pub use crate::test_util::kafka::entry_builder::EntryBuilder; pub use crate::test_util::kafka::topic_decorator::{Affix, TopicDecorator}; @@ -66,6 +66,14 @@ pub async fn create_topics( topics } +/// Creates a new namespace with the given topic and region id. +pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl { + NamespaceImpl { + topic: topic.to_string(), + region_id, + } +} + /// Builds a batch of entries each with random data. pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec { (0..batch_size) diff --git a/tests-integration/tests/wal.rs b/tests-integration/tests/wal.rs deleted file mode 100644 index 8bf09f55c550..000000000000 --- a/tests-integration/tests/wal.rs +++ /dev/null @@ -1,142 +0,0 @@ -// // Copyright 2023 Greptime Team -// // -// // Licensed under the Apache License, Version 2.0 (the "License"); -// // you may not use this file except in compliance with the License. -// // You may obtain a copy of the License at -// // -// // http://www.apache.org/licenses/LICENSE-2.0 -// // -// // Unless required by applicable law or agreed to in writing, software -// // distributed under the License is distributed on an "AS IS" BASIS, -// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// // See the License for the specific language governing permissions and -// // limitations under the License. - -// use std::collections::HashMap; -// use std::sync::Arc; - -// use common_config::wal::KafkaConfig as DatanodeKafkaConfig; -// use common_config::{KafkaWalOptions, WalOptions}; -// use common_meta::kv_backend::memory::MemoryKvBackend; -// use common_meta::kv_backend::KvBackendRef; -// use common_meta::wal::kafka::{ -// KafkaConfig as MetaSrvKafkaConfig, TopicManager as KafkaTopicManager, -// }; -// use common_meta::wal::{allocate_region_wal_options, WalConfig, WalOptionsAllocator}; -// use futures::StreamExt; -// use log_store::error::Result as LogStoreResult; -// use log_store::kafka::log_store::KafkaLogStore; -// use log_store::kafka::{EntryImpl, NamespaceImpl}; -// use rskafka::client::controller::ControllerClient; -// use rskafka::client::ClientBuilder; -// use store_api::logstore::entry::Id as EntryId; -// use store_api::logstore::LogStore; -// use tests_integration::wal_util::{DockerCli, KafkaImage, DEFAULT_EXPOSED_PORT}; - -// // Notice: the following tests are literally unit tests. They are placed at here since -// // it seems too heavy to start a Kafka cluster for each unit test. - -// // The key of an env variable that stores a series of Kafka broker endpoints. -// const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS"; - -// // Tests that the TopicManager allocates topics in a round-robin mannar. -// #[tokio::test] -// async fn test_kafka_alloc_topics() { -// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) -// .unwrap() -// .split(',') -// .map(ToString::to_string) -// .collect::>(); -// let config = MetaSrvKafkaConfig { -// topic_name_prefix: "__test_kafka_alloc_topics".to_string(), -// replication_factor: broker_endpoints.len() as i16, -// broker_endpoints, -// ..Default::default() -// }; -// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; -// let manager = KafkaTopicManager::new(config.clone(), kv_backend); -// manager.start().await.unwrap(); - -// // Topics should be created. -// let topics = (0..config.num_topics) -// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) -// .collect::>(); - -// // Selects exactly the number of `num_topics` topics one by one. -// for expected in topics.iter() { -// let got = manager.select().unwrap(); -// assert_eq!(got, expected); -// } - -// // Selects exactly the number of `num_topics` topics in a batching manner. -// let got = manager -// .select_batch(config.num_topics) -// .unwrap() -// .into_iter() -// .map(ToString::to_string) -// .collect::>(); -// assert_eq!(got, topics); - -// // Selects none. -// let got = manager.select_batch(config.num_topics).unwrap(); -// assert!(got.is_empty()); - -// // Selects more than the number of `num_topics` topics. -// let got = manager -// .select_batch(2 * config.num_topics) -// .unwrap() -// .into_iter() -// .map(ToString::to_string) -// .collect::>(); -// let expected = vec![topics.clone(); 2] -// .into_iter() -// .flatten() -// .collect::>(); -// assert_eq!(got, expected); -// } - -// // Tests that the wal options allocator could successfully allocate Kafka wal options. -// #[tokio::test] -// async fn test_kafka_options_allocator() { -// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) -// .unwrap() -// .split(',') -// .map(ToString::to_string) -// .collect::>(); -// let config = MetaSrvKafkaConfig { -// topic_name_prefix: "__test_kafka_options_allocator".to_string(), -// replication_factor: broker_endpoints.len() as i16, -// broker_endpoints, -// ..Default::default() -// }; -// let wal_config = WalConfig::Kafka(config.clone()); -// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; -// let allocator = WalOptionsAllocator::new(wal_config, kv_backend); -// allocator.start().await.unwrap(); - -// let num_regions = 32; -// let regions = (0..num_regions).collect::>(); -// let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); - -// // Topics should be allocated. -// let topics = (0..num_regions) -// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) -// .collect::>(); -// // Check the allocated wal options contain the expected topics. -// let expected = (0..num_regions) -// .map(|i| { -// let options = WalOptions::Kafka(KafkaWalOptions { -// topic: topics[i as usize].clone(), -// }); -// (i, serde_json::to_string(&options).unwrap()) -// }) -// .collect::>(); -// assert_eq!(got, expected); -// } - -// async fn create_topic(topic: &str, replication_factor: i16, client: &ControllerClient) { -// client -// .create_topic(topic, 1, replication_factor, 500) -// .await -// .unwrap(); -// }