Skip to content

Commit

Permalink
test: update unit tests for client manager
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 2, 2024
1 parent 68980c1 commit d6f2b34
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
<a href="https://greptime.com/slack"><img src="https://img.shields.io/badge/slack-GreptimeDB-0abd59?logo=slack" alt="slack" /></a>
</p>

> [!WARNING]
> Our default branch has changed from `develop` to `main` (issue [#3025](https://github.com/GreptimeTeam/greptimedb/issues/3025)). Please update your local repository to use the `main` branch.
## What is GreptimeDB

GreptimeDB is an open-source time-series database with a special focus on
Expand Down
43 changes: 27 additions & 16 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,57 +143,68 @@ mod tests {
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::{create_topics, Affix, TopicDecorator};

/// Checks clients for the given topics are created.
async fn ensure_clients_exist(topics: &[Topic], client_manager: &ClientManager) {
let client_pool = client_manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert!(all_exist);
}

async fn test_which(test_name: &str) {
// Creates a collection of topics in Kafka.
/// Prepares for a test in that a collection of topics and a client manager are created.
async fn prepare(test_name: &str, num_topics: usize) -> (ClientManager, Vec<Topic>) {
let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed(test_name.to_string()))
.with_suffix(Affix::TimeNow);
let topics = create_topics(256, decorator, &broker_endpoints, None).await;
let topics = create_topics(num_topics, decorator, &broker_endpoints, None).await;

let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();

(manager, topics)
}

/// Checks clients for the given topics are created.
async fn ensure_clients_exist(topics: &[Topic], client_manager: &ClientManager) {
let client_pool = client_manager.client_pool.read().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert!(all_exist);
}

async fn test_with(test_name: &str) {
let (manager, topics) = prepare(test_name, 128).await;
// Assigns multiple regions to a topic.
let region_topic = (0..512)
.map(|region_id| (region_id, &topics[region_id % topics.len()]))
.collect::<HashMap<_, _>>();

// Gets the assigned topic for each region and then gets the associated client.
match test_name {
"test_sequential" => {
// Gets all clients sequentially.
for topic in topics.iter() {
for (_, topic) in region_topic {
manager.get_or_insert(topic).await.unwrap();
}
}
"test_parallel" => {
// Gets all clients in parallel.
let tasks = topics
.iter()
let tasks = region_topic
.values()
.map(|topic| manager.get_or_insert(topic))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
}
_ => unreachable!(),
}

// Ensures all clients are created successfully.
ensure_clients_exist(&topics, &manager).await;
}

/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
test_which("test_sequential").await;
test_with("test_sequential").await;
}

/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_parallel() {
test_which("test_parallel").await;
test_with("test_parallel").await;
}
}
4 changes: 2 additions & 2 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions};
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
Expand Down Expand Up @@ -178,7 +178,7 @@ impl LogStore for KafkaLogStore {
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
debug!(
warn!(
"No new entries for ns {} in range [{}, {}]",
ns, start_offset, end_offset
);
Expand Down

0 comments on commit d6f2b34

Please sign in to comment.