Skip to content

Commit

Permalink
tmp: ready to move unit tests to an indie dir
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 1, 2024
1 parent 7ac84f9 commit 68980c1
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 179 deletions.
56 changes: 56 additions & 0 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,60 @@ mod tests {
let manager = TopicManager::new(config, kv_backend);
manager.start().await.unwrap();
}

// Tests that the TopicManager allocates topics in a round-robin mannar.
// #[tokio::test]
// async fn test_kafka_alloc_topics() {
// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
// .unwrap()
// .split(',')
// .map(ToString::to_string)
// .collect::<Vec<_>>();
// let config = MetaSrvKafkaConfig {
// topic_name_prefix: "__test_kafka_alloc_topics".to_string(),
// replication_factor: broker_endpoints.len() as i16,
// broker_endpoints,
// ..Default::default()
// };
// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
// let manager = KafkaTopicManager::new(config.clone(), kv_backend);
// manager.start().await.unwrap();

// // Topics should be created.
// let topics = (0..config.num_topics)
// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
// .collect::<Vec<_>>();

// // Selects exactly the number of `num_topics` topics one by one.
// for expected in topics.iter() {
// let got = manager.select().unwrap();
// assert_eq!(got, expected);
// }

// // Selects exactly the number of `num_topics` topics in a batching manner.
// let got = manager
// .select_batch(config.num_topics)
// .unwrap()
// .into_iter()
// .map(ToString::to_string)
// .collect::<Vec<_>>();
// assert_eq!(got, topics);

// // Selects none.
// let got = manager.select_batch(config.num_topics).unwrap();
// assert!(got.is_empty());

// // Selects more than the number of `num_topics` topics.
// let got = manager
// .select_batch(2 * config.num_topics)
// .unwrap()
// .into_iter()
// .map(ToString::to_string)
// .collect::<Vec<_>>();
// let expected = vec![topics.clone(); 2]
// .into_iter()
// .flatten()
// .collect::<Vec<_>>();
// assert_eq!(got, expected);
// }
}
39 changes: 39 additions & 0 deletions src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,43 @@ mod tests {
.collect();
assert_eq!(got, expected);
}

// Tests that the wal options allocator could successfully allocate Kafka wal options.
// #[tokio::test]
// async fn test_kafka_options_allocator() {
// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
// .unwrap()
// .split(',')
// .map(ToString::to_string)
// .collect::<Vec<_>>();
// let config = MetaSrvKafkaConfig {
// topic_name_prefix: "__test_kafka_options_allocator".to_string(),
// replication_factor: broker_endpoints.len() as i16,
// broker_endpoints,
// ..Default::default()
// };
// let wal_config = WalConfig::Kafka(config.clone());
// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
// let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
// allocator.start().await.unwrap();

// let num_regions = 32;
// let regions = (0..num_regions).collect::<Vec<_>>();
// let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();

// // Topics should be allocated.
// let topics = (0..num_regions)
// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
// .collect::<Vec<_>>();
// // Check the allocated wal options contain the expected topics.
// let expected = (0..num_regions)
// .map(|i| {
// let options = WalOptions::Kafka(KafkaWalOptions {
// topic: topics[i as usize].clone(),
// });
// (i, serde_json::to_string(&options).unwrap())
// })
// .collect::<HashMap<_, _>>();
// assert_eq!(got, expected);
// }
}
10 changes: 5 additions & 5 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ impl ClientManager {
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
{
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
}
}
// Manullay releases the read lock.
drop(client_pool);

// Acquires the write lock.
let mut client_pool = self.client_pool.write().await;
Expand Down
7 changes: 2 additions & 5 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ mod tests {
use super::*;
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::{
create_topics, entries_with_random_data, Affix, EntryBuilder, TopicDecorator,
create_topics, entries_with_random_data, new_namespace, Affix, EntryBuilder, TopicDecorator,
};

// Stores test context for a region.
Expand Down Expand Up @@ -325,10 +325,7 @@ mod tests {
(0..num_regions)
.map(|i| {
let topic = &topics[i % topics.len()];
let ns = NamespaceImpl {
region_id: i as u64,
topic: topic.to_string(),
};
let ns = new_namespace(topic, i as u64);
let entry_builder = EntryBuilder::new(ns.clone());
RegionContext {
ns,
Expand Down
31 changes: 5 additions & 26 deletions src/log-store/src/kafka/record_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,26 +145,12 @@ pub(crate) fn decode_from_record(record: Record) -> Result<Vec<EntryImpl>> {
#[cfg(test)]
mod tests {
use super::*;

fn new_test_entry<D: AsRef<[u8]>>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
EntryImpl {
data: data.as_ref().to_vec(),
id: entry_id,
ns,
}
}
use crate::test_util::kafka::{entries_with_random_data, new_namespace, EntryBuilder};

#[test]
fn test_serde_record_meta() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let ns = new_namespace("test_topic", 1);
let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone()));
let meta = RecordMeta::new(ns, &entries);
let encoded = serde_json::to_vec(&meta).unwrap();
let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap();
Expand All @@ -173,15 +159,8 @@ mod tests {

#[test]
fn test_encdec_record() {
let ns = NamespaceImpl {
region_id: 1,
topic: "test_topic".to_string(),
};
let entries = vec![
new_test_entry(b"111", 1, ns.clone()),
new_test_entry(b"2222", 2, ns.clone()),
new_test_entry(b"33333", 3, ns.clone()),
];
let ns = new_namespace("test_topic", 1);
let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone()));
let record = encode_to_record(ns, entries.clone()).unwrap();
let decoded_entries = decode_from_record(record).unwrap();
assert_eq!(entries, decoded_entries);
Expand Down
10 changes: 9 additions & 1 deletion src/log-store/src/test_util/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod topic_decorator;
use common_config::wal::KafkaWalTopic as Topic;
use rskafka::client::ClientBuilder;

use crate::kafka::EntryImpl;
use crate::kafka::{EntryImpl, NamespaceImpl};
pub use crate::test_util::kafka::entry_builder::EntryBuilder;
pub use crate::test_util::kafka::topic_decorator::{Affix, TopicDecorator};

Expand Down Expand Up @@ -66,6 +66,14 @@ pub async fn create_topics(
topics
}

/// Creates a new namespace with the given topic and region id.
pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl {
NamespaceImpl {
topic: topic.to_string(),
region_id,
}
}

/// Builds a batch of entries each with random data.
pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec<EntryImpl> {
(0..batch_size)
Expand Down
142 changes: 0 additions & 142 deletions tests-integration/tests/wal.rs

This file was deleted.

0 comments on commit 68980c1

Please sign in to comment.