Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(remote_wal): add unit tests for kafka remote wal #2993

Merged
merged 34 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
81804eb
test: add unit tests
niebayes Dec 25, 2023
a7f639f
feat: introduce kafka runtime backed by testcontainers
niebayes Dec 27, 2023
1e3cafe
test: add test for kafka runtime
niebayes Dec 27, 2023
90fd40a
fix: format
niebayes Dec 27, 2023
796aeae
chore: make kafka image ready to be used
niebayes Dec 28, 2023
1851393
feat: add entry builder
niebayes Dec 28, 2023
ef031ee
tmp
niebayes Dec 29, 2023
f7954dc
test: add unit tests for client manager
niebayes Dec 30, 2023
0b503a5
test: add some unit tests for kafka log store
niebayes Dec 30, 2023
3d489ff
chore: resolve some todos
niebayes Dec 31, 2023
21fbddc
chore: resolve some todos
niebayes Dec 31, 2023
84541ed
test: add unit tests for kafka log store
niebayes Jan 1, 2024
a676ba8
chore: add deprecate develop branch warning
waynexia Dec 28, 2023
7ac84f9
chore: merge with branch main
niebayes Jan 1, 2024
68980c1
tmp: ready to move unit tests to an indie dir
niebayes Jan 1, 2024
d6f2b34
test: update unit tests for client manager
niebayes Jan 2, 2024
b06dc94
test: add unit tests for meta srv remote wal
niebayes Jan 2, 2024
070404b
fix: license
niebayes Jan 2, 2024
40eccc9
fix: test
niebayes Jan 3, 2024
815e03a
refactor: kafka image
niebayes Jan 3, 2024
1c504aa
doc: add doc example for kafka image
niebayes Jan 3, 2024
fd31088
chore: migrate kafka image to an indie PR
niebayes Jan 3, 2024
883c6d1
fix: conflicts
niebayes Jan 5, 2024
d38a16f
fix: CR
niebayes Jan 5, 2024
826d5f2
fix: conflicts
niebayes Jan 5, 2024
cf68a87
fix: CR
niebayes Jan 5, 2024
5de92f2
fix: test
niebayes Jan 6, 2024
ae057d0
fix: CR
niebayes Jan 8, 2024
b017551
fix: update Cargo.toml
niebayes Jan 8, 2024
48d967c
fix: CR
niebayes Jan 8, 2024
006b27a
fix: conflicts
niebayes Jan 8, 2024
c117dd8
feat: skip test if no endpoints env
niebayes Jan 8, 2024
8076038
fix: format
niebayes Jan 8, 2024
341ae67
test: rewrite parallel test with barrier
niebayes Jan 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub struct KafkaConfig {
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress log entries.
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
Expand All @@ -27,6 +28,7 @@ common-time.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures-util.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
Expand All @@ -51,3 +53,4 @@ chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }
uuid.workspace = true
1 change: 0 additions & 1 deletion src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub mod sequence;
pub mod state_store;
pub mod table_name;
pub mod util;
#[allow(unused)]
pub mod wal;

pub type ClusterId = u64;
Expand Down
5 changes: 1 addition & 4 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ use std::collections::HashMap;

use common_config::wal::StandaloneWalConfig;
use common_config::WAL_OPTIONS_KEY;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use store_api::storage::{RegionId, RegionNumber};

use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{
Expand All @@ -43,7 +40,7 @@ pub enum WalConfig {
impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine,
StandaloneWalConfig::RaftEngine(_) => WalConfig::RaftEngine,
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig {
broker_endpoints: config.base.broker_endpoints,
num_topics: config.num_topics,
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod topic;
pub mod topic_manager;
pub mod topic_selector;

use std::time::Duration;

use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType};
use common_config::wal::StandaloneWalConfig;
use serde::{Deserialize, Serialize};

pub use crate::wal::kafka::topic::Topic;
Expand Down
33 changes: 33 additions & 0 deletions src/common/meta/src/wal/kafka/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::warn;
use futures_util::future::BoxFuture;

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let Ok(endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
warn!("The endpoints is empty, skipping the test");
return;
};

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
}
1 change: 1 addition & 0 deletions src/common/meta/src/wal/kafka/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
/// Kafka wal topic.
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
/// A topic is simply a string right now. But it may be more complex in the future.
// TODO(niebayes): remove the Topic alias.
pub type Topic = String;
92 changes: 60 additions & 32 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::{debug, error, info};
use common_telemetry::{error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use snafu::{ensure, AsErrorSource, ResultExt};
use snafu::{ensure, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
Expand All @@ -47,9 +46,8 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
// TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created.
topic_pool: Vec<Topic>,
topic_selector: TopicSelectorRef,
pub(crate) topic_pool: Vec<Topic>,
pub(crate) topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}

Expand Down Expand Up @@ -168,7 +166,7 @@ impl TopicManager {
vec![Record {
key: None,
value: None,
timestamp: rskafka::chrono::Utc::now(),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}],
Compression::NoCompression,
Expand Down Expand Up @@ -240,13 +238,9 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use std::env;

use common_telemetry::info;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::{self};
use crate::wal::kafka::test_util::run_test_with_kafka_wal;

// Tests that topics can be successfully persisted into the kv backend and can be successfully restored from the kv backend.
#[tokio::test]
Expand All @@ -273,26 +267,60 @@ mod tests {
assert_eq!(topics, restored_topics);
}

/// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
async fn test_topic_manager() {
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();

if endpoints.is_empty() {
info!("The endpoints is empty, skipping the test.");
return;
}
// TODO: supports topic prefix
let kv_backend = Arc::new(MemoryKvBackend::new());
let config = KafkaConfig {
replication_factor: 1,
broker_endpoints: endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<_>>(),
..Default::default()
};
let manager = TopicManager::new(config, kv_backend);
manager.start().await.unwrap();
async fn test_alloc_topics() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
// Constructs topics that should be created.
let topics = (0..256)
.map(|i| format!("test_alloc_topics_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();

// Creates a topic manager.
let config = KafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut manager = TopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());
manager.start().await.unwrap();

// Selects exactly the number of `num_topics` topics one by one.
let got = (0..topics.len())
.map(|_| manager.select().unwrap())
.cloned()
.collect::<Vec<_>>();
assert_eq!(got, topics);

// Selects exactly the number of `num_topics` topics in a batching manner.
let got = manager
.select_batch(topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
assert_eq!(got, topics);

// Selects more than the number of `num_topics` topics.
let got = manager
.select_batch(2 * topics.len())
.unwrap()
.into_iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let expected = vec![topics.clone(); 2]
.into_iter()
.flatten()
.collect::<Vec<_>>();
assert_eq!(got, expected);
})
})
.await;
}
}
9 changes: 8 additions & 1 deletion src/common/meta/src/wal/kafka/topic_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::ensure;

use crate::error::{EmptyTopicPoolSnafu, Result};
Expand Down Expand Up @@ -60,6 +59,14 @@ impl TopicSelector for RoundRobinTopicSelector {
mod tests {
use super::*;

/// Tests that a selector behaves as expected when the given topic pool is empty.
#[test]
fn test_empty_topic_pool() {
let topic_pool = vec![];
let selector = RoundRobinTopicSelector::default();
assert!(selector.select(&topic_pool).is_err());
}

#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
Expand Down
51 changes: 49 additions & 2 deletions src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ pub fn allocate_region_wal_options(
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::wal::kafka::test_util::run_test_with_kafka_wal;
use crate::wal::kafka::topic_selector::RoundRobinTopicSelector;
use crate::wal::kafka::KafkaConfig;

// Tests the wal options allocator could successfully allocate raft-engine wal options.
// Note: tests for allocator with kafka are integration tests.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = WalConfig::RaftEngine;
let mut allocator = WalOptionsAllocator::new(wal_config, kv_backend);
let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
allocator.start().await.unwrap();

let num_regions = 32;
Expand All @@ -128,4 +130,49 @@ mod tests {
.collect();
assert_eq!(got, expected);
}

// Tests that the wal options allocator could successfully allocate Kafka wal options.
#[tokio::test]
async fn test_allocator_with_kafka() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let topics = (0..256)
.map(|i| format!("test_allocator_with_kafka_{}_{}", i, uuid::Uuid::new_v4()))
.collect::<Vec<_>>();

// Creates a topic manager.
let config = KafkaConfig {
replication_factor: broker_endpoints.len() as i16,
broker_endpoints,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
// Replaces the default topic pool with the constructed topics.
topic_manager.topic_pool = topics.clone();
// Replaces the default selector with a round-robin selector without shuffled.
topic_manager.topic_selector = Arc::new(RoundRobinTopicSelector::default());

// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_manager);
allocator.start().await.unwrap();

let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();

// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
.map(|i| {
let options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[i as usize].clone(),
});
(i, serde_json::to_string(&options).unwrap())
})
.collect::<HashMap<_, _>>();
assert_eq!(got, expected);
})
})
.await;
}
}
4 changes: 2 additions & 2 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ common-macro.workspace = true
common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rskafka.workspace = true
Expand All @@ -39,5 +37,7 @@ tokio.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
itertools.workspace = true
rand.workspace = true
rand_distr = "0.4"
uuid.workspace = true
Loading
Loading