From 81804ebdc1168e55b5be6038de2d5b1093d110ae Mon Sep 17 00:00:00 2001
From: niebayes
Date: Mon, 25 Dec 2023 21:05:52 +0800
Subject: [PATCH 01/30] test: add unit tests
---
Cargo.lock | 2 +
src/log-store/src/kafka.rs | 10 +-
tests-integration/Cargo.toml | 2 +
tests-integration/tests/wal.rs | 248 +++++++++++++++++++++++++++++++++
4 files changed, 257 insertions(+), 5 deletions(-)
create mode 100644 tests-integration/tests/wal.rs
diff --git a/Cargo.lock b/Cargo.lock
index ac08a49f6baa..885ca3e2e0e3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -9481,6 +9481,7 @@ dependencies = [
"frontend",
"futures",
"itertools 0.10.5",
+ "log-store",
"meta-client",
"meta-srv",
"mysql_async",
@@ -9494,6 +9495,7 @@ dependencies = [
"prost 0.12.3",
"query",
"rand",
+ "rskafka",
"rstest",
"rstest_reuse",
"script",
diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs
index fefa7823c5c2..a08afc508eac 100644
--- a/src/log-store/src/kafka.rs
+++ b/src/log-store/src/kafka.rs
@@ -29,8 +29,8 @@ use crate::error::Error;
/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct NamespaceImpl {
- region_id: u64,
- topic: Topic,
+ pub region_id: u64,
+ pub topic: Topic,
}
impl Namespace for NamespaceImpl {
@@ -49,11 +49,11 @@ impl Display for NamespaceImpl {
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
/// Entry payload.
- data: Vec,
+ pub data: Vec,
/// The logical entry id.
- id: EntryId,
+ pub id: EntryId,
/// The namespace used to identify and isolate log entries from different regions.
- ns: NamespaceImpl,
+ pub ns: NamespaceImpl,
}
impl Entry for EntryImpl {
diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml
index 1ba6f8e05d36..8d311d3f7ea2 100644
--- a/tests-integration/Cargo.toml
+++ b/tests-integration/Cargo.toml
@@ -34,6 +34,7 @@ datatypes.workspace = true
dotenv = "0.15"
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
+log-store.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
mysql_async = { version = "0.33", default-features = false, features = [
@@ -44,6 +45,7 @@ once_cell.workspace = true
operator.workspace = true
query.workspace = true
rand.workspace = true
+rskafka.workspace = true
rstest = "0.17"
rstest_reuse = "0.5"
secrecy = "0.8"
diff --git a/tests-integration/tests/wal.rs b/tests-integration/tests/wal.rs
new file mode 100644
index 000000000000..30695217087d
--- /dev/null
+++ b/tests-integration/tests/wal.rs
@@ -0,0 +1,248 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use common_config::wal::KafkaConfig as DatanodeKafkaConfig;
+use common_config::{KafkaWalOptions, WalOptions};
+use common_meta::kv_backend::memory::MemoryKvBackend;
+use common_meta::kv_backend::KvBackendRef;
+use common_meta::wal::kafka::{
+ KafkaConfig as MetaSrvKafkaConfig, TopicManager as KafkaTopicManager,
+};
+use common_meta::wal::{allocate_region_wal_options, WalConfig, WalOptionsAllocator};
+use futures::StreamExt;
+use log_store::kafka::log_store::KafkaLogStore;
+use log_store::kafka::{EntryImpl, NamespaceImpl};
+use rskafka::client::controller::ControllerClient;
+use rskafka::client::ClientBuilder;
+use store_api::logstore::entry::Id as EntryId;
+use store_api::logstore::LogStore;
+
+// Notice: the following tests are literally unit tests. They are placed at here since
+// it seems too heavy to start a Kafka cluster for each unit test.
+
+// The key of an env variable that stores a series of Kafka broker endpoints.
+const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS";
+
+// Tests that the TopicManager allocates topics in a round-robin mannar.
+#[tokio::test]
+async fn test_kafka_alloc_topics() {
+ let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
+ .unwrap()
+ .split(',')
+ .map(ToString::to_string)
+ .collect::>();
+ let config = MetaSrvKafkaConfig {
+ topic_name_prefix: "__test_kafka_alloc_topics".to_string(),
+ replication_factor: broker_endpoints.len() as i16,
+ broker_endpoints,
+ ..Default::default()
+ };
+ let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ let manager = KafkaTopicManager::new(config.clone(), kv_backend);
+ manager.start().await.unwrap();
+
+ // Topics should be created.
+ let topics = (0..config.num_topics)
+ .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
+ .collect::>();
+
+ // Selects exactly the number of `num_topics` topics one by one.
+ for expected in topics.iter() {
+ let got = manager.select().unwrap();
+ assert_eq!(got, expected);
+ }
+
+ // Selects exactly the number of `num_topics` topics in a batching manner.
+ let got = manager
+ .select_batch(config.num_topics)
+ .unwrap()
+ .into_iter()
+ .map(ToString::to_string)
+ .collect::>();
+ assert_eq!(got, topics);
+
+ // Selects none.
+ let got = manager.select_batch(config.num_topics).unwrap();
+ assert!(got.is_empty());
+
+ // Selects more than the number of `num_topics` topics.
+ let got = manager
+ .select_batch(2 * config.num_topics)
+ .unwrap()
+ .into_iter()
+ .map(ToString::to_string)
+ .collect::>();
+ let expected = vec![topics.clone(); 2]
+ .into_iter()
+ .flatten()
+ .collect::>();
+ assert_eq!(got, expected);
+}
+
+// Tests that the wal options allocator could successfully allocate Kafka wal options.
+#[tokio::test]
+async fn test_kafka_options_allocator() {
+ let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
+ .unwrap()
+ .split(',')
+ .map(ToString::to_string)
+ .collect::>();
+ let config = MetaSrvKafkaConfig {
+ topic_name_prefix: "__test_kafka_options_allocator".to_string(),
+ replication_factor: broker_endpoints.len() as i16,
+ broker_endpoints,
+ ..Default::default()
+ };
+ let wal_config = WalConfig::Kafka(config.clone());
+ let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
+ allocator.start().await.unwrap();
+
+ let num_regions = 32;
+ let regions = (0..num_regions).collect::>();
+ let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
+
+ // Topics should be allocated.
+ let topics = (0..num_regions)
+ .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
+ .collect::>();
+ // Check the allocated wal options contain the expected topics.
+ let expected = (0..num_regions)
+ .map(|i| {
+ let options = WalOptions::Kafka(KafkaWalOptions {
+ topic: topics[i as usize].clone(),
+ });
+ (i, serde_json::to_string(&options).unwrap())
+ })
+ .collect::>();
+ assert_eq!(got, expected);
+}
+
+fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
+ EntryImpl {
+ data: data.as_ref().to_vec(),
+ id: entry_id,
+ ns,
+ }
+}
+
+async fn create_topic(topic: &str, replication_factor: i16, client: &ControllerClient) {
+ client
+ .create_topic(topic, 1, replication_factor, 5_000)
+ .await
+ .unwrap();
+}
+
+async fn check_entries(
+ ns: &NamespaceImpl,
+ start_offset: EntryId,
+ expected: Vec,
+ logstore: &KafkaLogStore,
+) {
+ let mut stream = logstore.read(ns, start_offset).await.unwrap();
+ for entry in expected {
+ let got = stream.next().await.unwrap().unwrap();
+ assert_eq!(entry, got[0]);
+ }
+}
+
+// Tests that the Kafka log store is able to write and read log entries from Kafka.
+#[tokio::test]
+async fn test_kafka_log_store() {
+ let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
+ .unwrap()
+ .split(',')
+ .map(ToString::to_string)
+ .collect::>();
+ let config = DatanodeKafkaConfig {
+ broker_endpoints,
+ ..Default::default()
+ };
+ let logstore = KafkaLogStore::try_new(&config).await.unwrap();
+
+ let client = ClientBuilder::new(config.broker_endpoints.clone())
+ .build()
+ .await
+ .unwrap()
+ .controller_client()
+ .unwrap();
+
+ // Appends one entry.
+ let topic = "__test_kafka_log_store_topic_append";
+ create_topic(topic, config.broker_endpoints.len() as i16, &client).await;
+ let ns = NamespaceImpl {
+ region_id: 0,
+ topic: topic.to_string(),
+ };
+ let entry = new_test_entry(b"0", 0, ns.clone());
+ let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id;
+ check_entries(&ns, last_entry_id, vec![entry], &logstore).await;
+
+ // Appends a batch of entries.
+ // Region 1, 2 are mapped to topic 1,
+ let topic = "__test_kafka_log_store_topic_append_batch_1";
+ create_topic(topic, config.broker_endpoints.len() as i16, &client).await;
+ let ns_1 = NamespaceImpl {
+ region_id: 1,
+ topic: topic.to_string(),
+ };
+ let ns_2 = NamespaceImpl {
+ region_id: 2,
+ topic: topic.to_string(),
+ };
+
+ // Region 3 is mapped to topic 2.
+ let topic = "__test_kafka_log_store_topic_append_batch_2";
+ create_topic(topic, config.broker_endpoints.len() as i16, &client).await;
+ let ns_3 = NamespaceImpl {
+ region_id: 3,
+ topic: topic.to_string(),
+ };
+
+ // Constructs a batch of entries.
+ let entries_1 = vec![
+ new_test_entry(b"1", 0, ns_1.clone()),
+ new_test_entry(b"1", 1, ns_1.clone()),
+ ];
+ let entries_2 = vec![
+ new_test_entry(b"2", 2, ns_2.clone()),
+ new_test_entry(b"2", 3, ns_2.clone()),
+ ];
+ let entries_3 = vec![
+ new_test_entry(b"3", 7, ns_3.clone()),
+ new_test_entry(b"3", 8, ns_3.clone()),
+ ];
+ let entries = vec![entries_1.clone(), entries_2.clone(), entries_3.clone()]
+ .into_iter()
+ .flatten()
+ .collect::>();
+
+ let last_entry_ids = logstore
+ .append_batch(entries.clone())
+ .await
+ .unwrap()
+ .last_entry_ids;
+
+ // Reads entries for region 1.
+ check_entries(&ns_1, last_entry_ids[&1], entries_1, &logstore).await;
+ // Reads entries from region 2.
+ check_entries(&ns_2, last_entry_ids[&2], entries_2, &logstore).await;
+ // Reads entries from region 3.
+ check_entries(&ns_3, last_entry_ids[&3], entries_3, &logstore).await;
+}
+
+// TODO(niebayes): add more integration tests.
From a7f639ff50f51656a1697e10be5a54831603ee29 Mon Sep 17 00:00:00 2001
From: niebayes
Date: Wed, 27 Dec 2023 19:04:24 +0800
Subject: [PATCH 02/30] feat: introduce kafka runtime backed by testcontainers
---
Cargo.lock | 93 ++++++++++++++++++-
tests-integration/Cargo.toml | 1 +
tests-integration/src/lib.rs | 1 +
tests-integration/src/wal_util.rs | 15 +++
tests-integration/src/wal_util/kafka.rs | 17 ++++
.../src/wal_util/kafka/config.rs | 93 +++++++++++++++++++
tests-integration/src/wal_util/kafka/image.rs | 77 +++++++++++++++
.../src/wal_util/kafka/runtime.rs | 38 ++++++++
8 files changed, 331 insertions(+), 4 deletions(-)
create mode 100644 tests-integration/src/wal_util.rs
create mode 100644 tests-integration/src/wal_util/kafka.rs
create mode 100644 tests-integration/src/wal_util/kafka/config.rs
create mode 100644 tests-integration/src/wal_util/kafka/image.rs
create mode 100644 tests-integration/src/wal_util/kafka/runtime.rs
diff --git a/Cargo.lock b/Cargo.lock
index 885ca3e2e0e3..d8b15f1bb700 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -972,6 +972,16 @@ dependencies = [
"generic-array",
]
+[[package]]
+name = "bollard-stubs"
+version = "1.42.0-rc.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864"
+dependencies = [
+ "serde",
+ "serde_with 1.14.0",
+]
+
[[package]]
name = "borsh"
version = "1.3.0"
@@ -1629,7 +1639,7 @@ dependencies = [
"rskafka",
"serde",
"serde_json",
- "serde_with",
+ "serde_with 3.4.0",
"toml 0.8.8",
]
@@ -1841,7 +1851,7 @@ dependencies = [
"rskafka",
"serde",
"serde_json",
- "serde_with",
+ "serde_with 3.4.0",
"snafu",
"store-api",
"strum 0.25.0",
@@ -2332,6 +2342,16 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "darling"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
+dependencies = [
+ "darling_core 0.13.4",
+ "darling_macro 0.13.4",
+]
+
[[package]]
name = "darling"
version = "0.14.4"
@@ -2352,6 +2372,20 @@ dependencies = [
"darling_macro 0.20.3",
]
+[[package]]
+name = "darling_core"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "strsim 0.10.0",
+ "syn 1.0.109",
+]
+
[[package]]
name = "darling_core"
version = "0.14.4"
@@ -2380,6 +2414,17 @@ dependencies = [
"syn 2.0.43",
]
+[[package]]
+name = "darling_macro"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
+dependencies = [
+ "darling_core 0.13.4",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "darling_macro"
version = "0.14.4"
@@ -4988,7 +5033,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
- "serde_with",
+ "serde_with 3.4.0",
"smallvec",
"snafu",
"store-api",
@@ -8391,6 +8436,16 @@ dependencies = [
"serde",
]
+[[package]]
+name = "serde_with"
+version = "1.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff"
+dependencies = [
+ "serde",
+ "serde_with_macros 1.5.2",
+]
+
[[package]]
name = "serde_with"
version = "3.4.0"
@@ -8404,10 +8459,22 @@ dependencies = [
"indexmap 2.1.0",
"serde",
"serde_json",
- "serde_with_macros",
+ "serde_with_macros 3.4.0",
"time",
]
+[[package]]
+name = "serde_with_macros"
+version = "1.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
+dependencies = [
+ "darling 0.13.4",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "serde_with_macros"
version = "3.4.0"
@@ -9448,6 +9515,23 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
+[[package]]
+name = "testcontainers"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f83d2931d7f521af5bae989f716c3fa43a6af9af7ec7a5e21b59ae40878cec00"
+dependencies = [
+ "bollard-stubs",
+ "futures",
+ "hex",
+ "hmac",
+ "log",
+ "rand",
+ "serde",
+ "serde_json",
+ "sha2",
+]
+
[[package]]
name = "tests-integration"
version = "0.5.0"
@@ -9511,6 +9595,7 @@ dependencies = [
"substrait 0.5.0",
"table",
"tempfile",
+ "testcontainers",
"time",
"tokio",
"tokio-postgres",
diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml
index 8d311d3f7ea2..377c428c2a55 100644
--- a/tests-integration/Cargo.toml
+++ b/tests-integration/Cargo.toml
@@ -64,6 +64,7 @@ sqlx = { version = "0.6", features = [
substrait.workspace = true
table.workspace = true
tempfile.workspace = true
+testcontainers = "0.15.0"
time = "0.3"
tokio.workspace = true
tonic.workspace = true
diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs
index b0b28ba4651c..8dc4bc5a9829 100644
--- a/tests-integration/src/lib.rs
+++ b/tests-integration/src/lib.rs
@@ -20,6 +20,7 @@ mod opentsdb;
mod otlp;
mod prom_store;
pub mod test_util;
+pub mod wal_util;
mod standalone;
#[cfg(test)]
diff --git a/tests-integration/src/wal_util.rs b/tests-integration/src/wal_util.rs
new file mode 100644
index 000000000000..28c04633b640
--- /dev/null
+++ b/tests-integration/src/wal_util.rs
@@ -0,0 +1,15 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod kafka;
diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs
new file mode 100644
index 000000000000..ee71c6410b1c
--- /dev/null
+++ b/tests-integration/src/wal_util/kafka.rs
@@ -0,0 +1,17 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod config;
+mod image;
+pub mod runtime;
diff --git a/tests-integration/src/wal_util/kafka/config.rs b/tests-integration/src/wal_util/kafka/config.rs
new file mode 100644
index 000000000000..4da810dfa93b
--- /dev/null
+++ b/tests-integration/src/wal_util/kafka/config.rs
@@ -0,0 +1,93 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+
+use testcontainers::core::WaitFor;
+
+/// Through which port the Zookeeper node listens for external traffics, i.e. traffics from the Kafka node.
+pub const ZOOKEEPER_PORT: u16 = 2181;
+/// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster.
+pub const KAFAK_LISTENER_PORT: u16 = 19092;
+/// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients.
+pub const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092;
+
+/// Configurations for a Kafka runtime.
+/// Since the runtime corresponds to a cluster with a single Kafka node and a single Zookeeper node, the ports are all singletons.
+pub struct Config {
+ /// The name of the Kafka image hosted in the docker hub.
+ pub image_name: String,
+ /// The tag of the kafka image hosted in the docker hub.
+ /// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that
+ /// the underlying image may suddenly change.
+ pub image_tag: String,
+ /// Through which port clients could connect with the runtime.
+ pub exposed_port: u16,
+ /// The runtime is regarded ready to be used if all ready conditions are met.
+ /// Warning: be sure to update the conditions when necessary if the image is altered.
+ pub ready_conditions: Vec,
+ /// The environment variables required to run the runtime.
+ /// Warning: be sure to update the environment variables when necessary if the image is altered.
+ pub env_vars: HashMap,
+}
+
+impl Default for Config {
+ fn default() -> Self {
+ Self {
+ image_name: "confluentinc/cp-kafka".to_string(),
+ image_tag: "7.4.3".to_string(),
+ exposed_port: KAFKA_ADVERTISED_LISTENER_PORT,
+ // The runtime is safe to be used as long as this message is printed on stdout.
+ ready_conditions: vec![WaitFor::message_on_stdout(
+ "started (kafka.server.KafkaServer)",
+ )],
+ env_vars: build_env_vars(
+ ZOOKEEPER_PORT,
+ KAFAK_LISTENER_PORT,
+ KAFKA_ADVERTISED_LISTENER_PORT,
+ ),
+ }
+ }
+}
+
+fn build_env_vars(
+ zookeeper_port: u16,
+ kafka_listener_port: u16,
+ kafka_advertised_listener_port: u16,
+) -> HashMap {
+ [
+ (
+ "KAFKA_ZOOKEEPER_CONNECT".to_string(),
+ format!("localhost:{zookeeper_port}"),
+ ),
+ (
+ "KAFKA_LISTENERS".to_string(),
+ format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},PLAINTEXT://0.0.0.0:{kafka_listener_port}"),
+ ),
+ (
+ "KAFKA_ADVERTISED_LISTENERS".to_string(),
+ format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},PLAINTEXT://localhost:{kafka_listener_port}",),
+ ),
+ (
+ "KAFKA_INTER_BROKER_LISTENER_NAME".to_string(),
+ "BROKER".to_string(),
+ ),
+ ("KAFKA_BROKER_ID".to_string(), "1".to_string()),
+ (
+ "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_string(),
+ "1".to_string(),
+ ),
+ ]
+ .into()
+}
diff --git a/tests-integration/src/wal_util/kafka/image.rs b/tests-integration/src/wal_util/kafka/image.rs
new file mode 100644
index 000000000000..90cb164efa4b
--- /dev/null
+++ b/tests-integration/src/wal_util/kafka/image.rs
@@ -0,0 +1,77 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use testcontainers::core::{ContainerState, ExecCommand, WaitFor};
+
+use crate::wal_util::kafka::config::{Config, ZOOKEEPER_PORT};
+
+#[derive(Debug, Clone, Default)]
+pub struct ImageArgs;
+
+impl testcontainers::ImageArgs for ImageArgs {
+ fn into_iterator(self) -> Box> {
+ Box::new(
+ vec![
+ "/bin/bash".to_string(),
+ "-c".to_string(),
+ format!(
+ r#"
+ echo 'clientPort={}' > zookeeper.properties;
+ echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
+ echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
+ zookeeper-server-start zookeeper.properties &
+ . /etc/confluent/docker/bash-config &&
+ /etc/confluent/docker/configure &&
+ /etc/confluent/docker/launch
+ "#,
+ ZOOKEEPER_PORT,
+ ),
+ ]
+ .into_iter(),
+ )
+ }
+}
+
+#[derive(Default)]
+pub struct Image {
+ config: Config,
+}
+
+impl testcontainers::Image for Image {
+ type Args = ImageArgs;
+
+ fn name(&self) -> String {
+ self.config.image_name.clone()
+ }
+
+ fn tag(&self) -> String {
+ self.config.image_tag.clone()
+ }
+
+ fn ready_conditions(&self) -> Vec {
+ self.config.ready_conditions.clone()
+ }
+
+ fn env_vars(&self) -> Box + '_> {
+ Box::new(self.config.env_vars.iter())
+ }
+
+ fn expose_ports(&self) -> Vec {
+ vec![self.config.exposed_port]
+ }
+
+ fn exec_after_start(&self, _cs: ContainerState) -> Vec {
+ vec![]
+ }
+}
diff --git a/tests-integration/src/wal_util/kafka/runtime.rs b/tests-integration/src/wal_util/kafka/runtime.rs
new file mode 100644
index 000000000000..24ff66de8c7e
--- /dev/null
+++ b/tests-integration/src/wal_util/kafka/runtime.rs
@@ -0,0 +1,38 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use testcontainers::clients::Cli as DockerCli;
+use testcontainers::Container;
+
+use crate::wal_util::kafka::image::Image;
+
+/// A runtime running a cluster consisting of a single Kafka node and a single ZooKeeper node.
+#[derive(Default)]
+pub struct Runtime {
+ docker: DockerCli,
+}
+
+impl Runtime {
+ /// Starts the runtime. The runtime terminates when the returned container is dropped.
+ pub async fn start(&self) -> Container {
+ self.docker.run(Image::default())
+ }
+}
+
+#[macro_export]
+macro_rules! start_kafka {
+ () => {
+ let _ = Runtime::default().start();
+ };
+}
From 1e3cafe39fb6481187e65000fba6f3d7b4aacc69 Mon Sep 17 00:00:00 2001
From: niebayes
Date: Wed, 27 Dec 2023 19:40:52 +0800
Subject: [PATCH 03/30] test: add test for kafka runtime
---
tests-integration/src/wal_util/kafka.rs | 7 +++
.../src/wal_util/kafka/runtime.rs | 54 +++++++++++++++++--
2 files changed, 56 insertions(+), 5 deletions(-)
diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs
index ee71c6410b1c..2e14f90fc499 100644
--- a/tests-integration/src/wal_util/kafka.rs
+++ b/tests-integration/src/wal_util/kafka.rs
@@ -15,3 +15,10 @@
pub mod config;
mod image;
pub mod runtime;
+
+#[macro_export]
+macro_rules! start_kafka {
+ () => {
+ let _ = $crate::wal_util::kafka::runtime::Runtime::default().start().await;
+ };
+}
diff --git a/tests-integration/src/wal_util/kafka/runtime.rs b/tests-integration/src/wal_util/kafka/runtime.rs
index 24ff66de8c7e..f593d6cf733f 100644
--- a/tests-integration/src/wal_util/kafka/runtime.rs
+++ b/tests-integration/src/wal_util/kafka/runtime.rs
@@ -30,9 +30,53 @@ impl Runtime {
}
}
-#[macro_export]
-macro_rules! start_kafka {
- () => {
- let _ = Runtime::default().start();
- };
+#[cfg(test)]
+mod tests {
+ use rskafka::chrono::Utc;
+ use rskafka::client::partition::UnknownTopicHandling;
+ use rskafka::client::ClientBuilder;
+ use rskafka::record::Record;
+
+ use crate::start_kafka;
+
+ #[tokio::test]
+ async fn test_runtime() {
+ start_kafka!();
+
+ let bootstrap_brokers = vec![9092.to_string()];
+ let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap();
+
+ // Creates a topic.
+ let topic = "test_topic";
+ client
+ .controller_client()
+ .unwrap()
+ .create_topic(topic, 1, 1, 500)
+ .await
+ .unwrap();
+
+ // Produces a record.
+ let partition_client = client
+ .partition_client(topic, 0, UnknownTopicHandling::Error)
+ .await
+ .unwrap();
+ let produced = vec![Record {
+ key: Some(b"111".to_vec()),
+ value: Some(b"222".to_vec()),
+ timestamp: Utc::now(),
+ headers: Default::default(),
+ }];
+ let offset = partition_client
+ .produce(produced.clone(), Default::default())
+ .await
+ .unwrap()[0];
+
+ // Consumes the record.
+ let consumed = partition_client
+ .fetch_records(offset, 1..4096, 500)
+ .await
+ .unwrap()
+ .0;
+ assert_eq!(produced[0], consumed[0].record);
+ }
}
From 90fd40a577d149b5192827d5a8b93a416e7dd710 Mon Sep 17 00:00:00 2001
From: niebayes
Date: Wed, 27 Dec 2023 19:43:22 +0800
Subject: [PATCH 04/30] fix: format
---
tests-integration/src/wal_util/kafka.rs | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs
index 2e14f90fc499..abde462d720d 100644
--- a/tests-integration/src/wal_util/kafka.rs
+++ b/tests-integration/src/wal_util/kafka.rs
@@ -19,6 +19,8 @@ pub mod runtime;
#[macro_export]
macro_rules! start_kafka {
() => {
- let _ = $crate::wal_util::kafka::runtime::Runtime::default().start().await;
+ let _ = $crate::wal_util::kafka::runtime::Runtime::default()
+ .start()
+ .await;
};
}
From 796aeae9e73574ae2d4f84d03af786e0c0626421 Mon Sep 17 00:00:00 2001
From: niebayes
Date: Thu, 28 Dec 2023 15:25:28 +0800
Subject: [PATCH 05/30] chore: make kafka image ready to be used
---
tests-integration/src/wal_util/kafka.rs | 10 ---
.../src/wal_util/kafka/config.rs | 30 +++++--
tests-integration/src/wal_util/kafka/image.rs | 87 ++++++++++++++++++-
.../src/wal_util/kafka/runtime.rs | 82 -----------------
4 files changed, 107 insertions(+), 102 deletions(-)
delete mode 100644 tests-integration/src/wal_util/kafka/runtime.rs
diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs
index abde462d720d..d4d0f8773d91 100644
--- a/tests-integration/src/wal_util/kafka.rs
+++ b/tests-integration/src/wal_util/kafka.rs
@@ -14,13 +14,3 @@
pub mod config;
mod image;
-pub mod runtime;
-
-#[macro_export]
-macro_rules! start_kafka {
- () => {
- let _ = $crate::wal_util::kafka::runtime::Runtime::default()
- .start()
- .await;
- };
-}
diff --git a/tests-integration/src/wal_util/kafka/config.rs b/tests-integration/src/wal_util/kafka/config.rs
index 4da810dfa93b..ca5136aec1ae 100644
--- a/tests-integration/src/wal_util/kafka/config.rs
+++ b/tests-integration/src/wal_util/kafka/config.rs
@@ -16,15 +16,15 @@ use std::collections::HashMap;
use testcontainers::core::WaitFor;
-/// Through which port the Zookeeper node listens for external traffics, i.e. traffics from the Kafka node.
+/// Through which port the Zookeeper node listens for external traffics, e.g. traffics from the Kafka node.
pub const ZOOKEEPER_PORT: u16 = 2181;
/// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster.
-pub const KAFAK_LISTENER_PORT: u16 = 19092;
+pub const KAFKA_LISTENER_PORT: u16 = 19092;
/// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients.
pub const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092;
/// Configurations for a Kafka runtime.
-/// Since the runtime corresponds to a cluster with a single Kafka node and a single Zookeeper node, the ports are all singletons.
+#[derive(Debug, Clone)]
pub struct Config {
/// The name of the Kafka image hosted in the docker hub.
pub image_name: String,
@@ -32,7 +32,10 @@ pub struct Config {
/// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that
/// the underlying image may suddenly change.
pub image_tag: String,
- /// Through which port clients could connect with the runtime.
+ /// The runtime is running in a docker container and has its own network. In order to be used by the host machine,
+ /// the runtime must expose an internal port. For e.g. assume the runtime has an internal port 9092,
+ /// and the `exposed_port` is set to 9092, then the host machine can get a mapped external port with
+ /// `container.get_host_port_ipv4(exposed_port)`. With the mapped port, the host machine could connect with the runtime.
pub exposed_port: u16,
/// The runtime is regarded ready to be used if all ready conditions are met.
/// Warning: be sure to update the conditions when necessary if the image is altered.
@@ -42,6 +45,15 @@ pub struct Config {
pub env_vars: HashMap,
}
+impl Config {
+ pub fn with_exposed_port(port: u16) -> Self {
+ Self {
+ exposed_port: port,
+ ..Default::default()
+ }
+ }
+}
+
impl Default for Config {
fn default() -> Self {
Self {
@@ -54,7 +66,7 @@ impl Default for Config {
)],
env_vars: build_env_vars(
ZOOKEEPER_PORT,
- KAFAK_LISTENER_PORT,
+ KAFKA_LISTENER_PORT,
KAFKA_ADVERTISED_LISTENER_PORT,
),
}
@@ -73,11 +85,15 @@ fn build_env_vars(
),
(
"KAFKA_LISTENERS".to_string(),
- format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},PLAINTEXT://0.0.0.0:{kafka_listener_port}"),
+ format!("PLAINTEXT://0.0.0.0:{kafka_advertised_listener_port},BROKER://0.0.0.0:{kafka_listener_port}"),
),
(
"KAFKA_ADVERTISED_LISTENERS".to_string(),
- format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},PLAINTEXT://localhost:{kafka_listener_port}",),
+ format!("PLAINTEXT://localhost:{kafka_advertised_listener_port},BROKER://localhost:{kafka_listener_port}",),
+ ),
+ (
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_string(),
+ "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_string(),
),
(
"KAFKA_INTER_BROKER_LISTENER_NAME".to_string(),
diff --git a/tests-integration/src/wal_util/kafka/image.rs b/tests-integration/src/wal_util/kafka/image.rs
index 90cb164efa4b..78f513dd8406 100644
--- a/tests-integration/src/wal_util/kafka/image.rs
+++ b/tests-integration/src/wal_util/kafka/image.rs
@@ -14,7 +14,9 @@
use testcontainers::core::{ContainerState, ExecCommand, WaitFor};
-use crate::wal_util::kafka::config::{Config, ZOOKEEPER_PORT};
+use crate::wal_util::kafka::config::{
+ Config, KAFKA_ADVERTISED_LISTENER_PORT, KAFKA_LISTENER_PORT, ZOOKEEPER_PORT,
+};
#[derive(Debug, Clone, Default)]
pub struct ImageArgs;
@@ -48,6 +50,13 @@ pub struct Image {
config: Config,
}
+impl Image {
+ #[allow(unused)]
+ pub fn new(config: Config) -> Self {
+ Self { config }
+ }
+}
+
impl testcontainers::Image for Image {
type Args = ImageArgs;
@@ -71,7 +80,79 @@ impl testcontainers::Image for Image {
vec![self.config.exposed_port]
}
- fn exec_after_start(&self, _cs: ContainerState) -> Vec {
- vec![]
+ fn exec_after_start(&self, cs: ContainerState) -> Vec {
+ let mut commands = vec![];
+ let cmd = format!(
+ "kafka-configs --alter --bootstrap-server 0.0.0.0:{} --entity-type brokers --entity-name 1 --add-config advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
+ KAFKA_LISTENER_PORT,
+ cs.host_port_ipv4(KAFKA_ADVERTISED_LISTENER_PORT),
+ );
+ let ready_conditions = vec![WaitFor::message_on_stdout(
+ "Checking need to trigger auto leader balancing",
+ )];
+ commands.push(ExecCommand {
+ cmd,
+ ready_conditions,
+ });
+ commands
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use chrono::TimeZone;
+ use rskafka::chrono::Utc;
+ use rskafka::client::partition::UnknownTopicHandling;
+ use rskafka::client::ClientBuilder;
+ use rskafka::record::Record;
+ use testcontainers::clients::Cli as DockerCli;
+
+ use crate::wal_util::kafka::config::{Config, KAFKA_ADVERTISED_LISTENER_PORT};
+ use crate::wal_util::kafka::image::Image;
+
+ #[tokio::test]
+ async fn test_image() {
+ // Starts a Kafka container.
+ let port = KAFKA_ADVERTISED_LISTENER_PORT;
+ let config = Config::with_exposed_port(port);
+ let docker = DockerCli::default();
+ let container = docker.run(Image::new(config));
+
+ // Creates a Kafka client.
+ let bootstrap_brokers = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))];
+ let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap();
+
+ // Creates a topic.
+ let topic = "test_topic";
+ client
+ .controller_client()
+ .unwrap()
+ .create_topic(topic, 1, 1, 500)
+ .await
+ .unwrap();
+
+ // Produces a record.
+ let partition_client = client
+ .partition_client(topic, 0, UnknownTopicHandling::Error)
+ .await
+ .unwrap();
+ let produced = vec![Record {
+ key: Some(b"111".to_vec()),
+ value: Some(b"222".to_vec()),
+ timestamp: Utc.timestamp_millis_opt(42).unwrap(),
+ headers: Default::default(),
+ }];
+ let offset = partition_client
+ .produce(produced.clone(), Default::default())
+ .await
+ .unwrap()[0];
+
+ // Consumes the record.
+ let consumed = partition_client
+ .fetch_records(offset, 1..4096, 500)
+ .await
+ .unwrap()
+ .0;
+ assert_eq!(produced[0], consumed[0].record);
}
}
diff --git a/tests-integration/src/wal_util/kafka/runtime.rs b/tests-integration/src/wal_util/kafka/runtime.rs
deleted file mode 100644
index f593d6cf733f..000000000000
--- a/tests-integration/src/wal_util/kafka/runtime.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2023 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use testcontainers::clients::Cli as DockerCli;
-use testcontainers::Container;
-
-use crate::wal_util::kafka::image::Image;
-
-/// A runtime running a cluster consisting of a single Kafka node and a single ZooKeeper node.
-#[derive(Default)]
-pub struct Runtime {
- docker: DockerCli,
-}
-
-impl Runtime {
- /// Starts the runtime. The runtime terminates when the returned container is dropped.
- pub async fn start(&self) -> Container {
- self.docker.run(Image::default())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use rskafka::chrono::Utc;
- use rskafka::client::partition::UnknownTopicHandling;
- use rskafka::client::ClientBuilder;
- use rskafka::record::Record;
-
- use crate::start_kafka;
-
- #[tokio::test]
- async fn test_runtime() {
- start_kafka!();
-
- let bootstrap_brokers = vec![9092.to_string()];
- let client = ClientBuilder::new(bootstrap_brokers).build().await.unwrap();
-
- // Creates a topic.
- let topic = "test_topic";
- client
- .controller_client()
- .unwrap()
- .create_topic(topic, 1, 1, 500)
- .await
- .unwrap();
-
- // Produces a record.
- let partition_client = client
- .partition_client(topic, 0, UnknownTopicHandling::Error)
- .await
- .unwrap();
- let produced = vec![Record {
- key: Some(b"111".to_vec()),
- value: Some(b"222".to_vec()),
- timestamp: Utc::now(),
- headers: Default::default(),
- }];
- let offset = partition_client
- .produce(produced.clone(), Default::default())
- .await
- .unwrap()[0];
-
- // Consumes the record.
- let consumed = partition_client
- .fetch_records(offset, 1..4096, 500)
- .await
- .unwrap()
- .0;
- assert_eq!(produced[0], consumed[0].record);
- }
-}
From 18513936a5c9df64baabeb52bebd17d1f7532bab Mon Sep 17 00:00:00 2001
From: niebayes
Date: Fri, 29 Dec 2023 01:23:00 +0800
Subject: [PATCH 06/30] feat: add entry builder
---
Cargo.lock | 2 +
src/common/test-util/Cargo.toml | 2 +
src/common/test-util/src/lib.rs | 1 +
src/common/test-util/src/wal.rs | 15 +++
src/common/test-util/src/wal/kafka.rs | 17 +++
.../test-util/src/wal/kafka/entry_builder.rs | 110 ++++++++++++++++++
src/log-store/src/kafka.rs | 2 +-
src/store-api/src/logstore.rs | 6 +-
tests-integration/src/wal_util.rs | 5 +
tests-integration/src/wal_util/kafka.rs | 2 +-
.../src/wal_util/kafka/config.rs | 9 --
tests-integration/src/wal_util/kafka/image.rs | 12 +-
12 files changed, 165 insertions(+), 18 deletions(-)
create mode 100644 src/common/test-util/src/wal.rs
create mode 100644 src/common/test-util/src/wal/kafka.rs
create mode 100644 src/common/test-util/src/wal/kafka/entry_builder.rs
diff --git a/Cargo.lock b/Cargo.lock
index d8b15f1bb700..09fbe9275128 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1983,8 +1983,10 @@ dependencies = [
name = "common-test-util"
version = "0.5.0"
dependencies = [
+ "log-store",
"once_cell",
"rand",
+ "store-api",
"tempfile",
]
diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml
index 60e854740643..a1f8def92d01 100644
--- a/src/common/test-util/Cargo.toml
+++ b/src/common/test-util/Cargo.toml
@@ -5,6 +5,8 @@ edition.workspace = true
license.workspace = true
[dependencies]
+log-store.workspace = true
once_cell.workspace = true
rand.workspace = true
tempfile.workspace = true
+store-api.workspace = true
diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs
index ef6ff4696861..f4a9c747e518 100644
--- a/src/common/test-util/src/lib.rs
+++ b/src/common/test-util/src/lib.rs
@@ -20,6 +20,7 @@ use std::sync::LazyLock;
pub mod ports;
pub mod temp_dir;
+pub mod wal;
// Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the
// workspace, see https://github.com/rust-lang/cargo/issues/3946.
diff --git a/src/common/test-util/src/wal.rs b/src/common/test-util/src/wal.rs
new file mode 100644
index 000000000000..28c04633b640
--- /dev/null
+++ b/src/common/test-util/src/wal.rs
@@ -0,0 +1,15 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod kafka;
diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs
new file mode 100644
index 000000000000..bd615f8a4af3
--- /dev/null
+++ b/src/common/test-util/src/wal/kafka.rs
@@ -0,0 +1,17 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod entry_builder;
+
+pub use crate::wal::kafka::entry_builder::EntryBuilder;
diff --git a/src/common/test-util/src/wal/kafka/entry_builder.rs b/src/common/test-util/src/wal/kafka/entry_builder.rs
new file mode 100644
index 000000000000..09910c6c7077
--- /dev/null
+++ b/src/common/test-util/src/wal/kafka/entry_builder.rs
@@ -0,0 +1,110 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
+use std::sync::Mutex;
+
+use log_store::kafka::{EntryImpl, NamespaceImpl};
+use rand::rngs::ThreadRng;
+use rand::seq::SliceRandom;
+use rand::{thread_rng, Rng};
+use store_api::logstore::EntryId;
+
+const DEFAULT_DATA: &[u8; 10] = b"[greptime]";
+
+/// A builder for building entries for a namespace.
+pub struct EntryBuilder {
+ /// The namespace of the entries.
+ ns: NamespaceImpl,
+ /// The next entry id to allocate. It starts from 0 by default.
+ next_entry_id: AtomicEntryId,
+ /// A generator for supporting random data generation.
+ /// Wrapped with Mutex
+> [!WARNING]
+> Our default branch has changed from `develop` to `main` (issue [#3025](https://github.com/GreptimeTeam/greptimedb/issues/3025)). Please update your local repository to use the `main` branch.
+
## What is GreptimeDB
GreptimeDB is an open-source time-series database with a special focus on
From 68980c1d12cf66ad94718940bdbd38e90b47f2c8 Mon Sep 17 00:00:00 2001
From: niebayes
Date: Mon, 1 Jan 2024 18:34:19 +0800
Subject: [PATCH 14/30] tmp: ready to move unit tests to an indie dir
---
.../meta/src/wal/kafka/topic_manager.rs | 56 +++++++
src/common/meta/src/wal/options_allocator.rs | 39 +++++
src/log-store/src/kafka/client_manager.rs | 10 +-
src/log-store/src/kafka/log_store.rs | 7 +-
src/log-store/src/kafka/record_utils.rs | 31 +---
src/log-store/src/test_util/kafka.rs | 10 +-
tests-integration/tests/wal.rs | 142 ------------------
7 files changed, 116 insertions(+), 179 deletions(-)
delete mode 100644 tests-integration/tests/wal.rs
diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs
index 80aaa90d402f..a7519493ea63 100644
--- a/src/common/meta/src/wal/kafka/topic_manager.rs
+++ b/src/common/meta/src/wal/kafka/topic_manager.rs
@@ -295,4 +295,60 @@ mod tests {
let manager = TopicManager::new(config, kv_backend);
manager.start().await.unwrap();
}
+
+ // Tests that the TopicManager allocates topics in a round-robin mannar.
+ // #[tokio::test]
+ // async fn test_kafka_alloc_topics() {
+ // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
+ // .unwrap()
+ // .split(',')
+ // .map(ToString::to_string)
+ // .collect::>();
+ // let config = MetaSrvKafkaConfig {
+ // topic_name_prefix: "__test_kafka_alloc_topics".to_string(),
+ // replication_factor: broker_endpoints.len() as i16,
+ // broker_endpoints,
+ // ..Default::default()
+ // };
+ // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ // let manager = KafkaTopicManager::new(config.clone(), kv_backend);
+ // manager.start().await.unwrap();
+
+ // // Topics should be created.
+ // let topics = (0..config.num_topics)
+ // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
+ // .collect::>();
+
+ // // Selects exactly the number of `num_topics` topics one by one.
+ // for expected in topics.iter() {
+ // let got = manager.select().unwrap();
+ // assert_eq!(got, expected);
+ // }
+
+ // // Selects exactly the number of `num_topics` topics in a batching manner.
+ // let got = manager
+ // .select_batch(config.num_topics)
+ // .unwrap()
+ // .into_iter()
+ // .map(ToString::to_string)
+ // .collect::>();
+ // assert_eq!(got, topics);
+
+ // // Selects none.
+ // let got = manager.select_batch(config.num_topics).unwrap();
+ // assert!(got.is_empty());
+
+ // // Selects more than the number of `num_topics` topics.
+ // let got = manager
+ // .select_batch(2 * config.num_topics)
+ // .unwrap()
+ // .into_iter()
+ // .map(ToString::to_string)
+ // .collect::>();
+ // let expected = vec![topics.clone(); 2]
+ // .into_iter()
+ // .flatten()
+ // .collect::>();
+ // assert_eq!(got, expected);
+ // }
}
diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs
index 6c2702053b87..b6f6a99ba089 100644
--- a/src/common/meta/src/wal/options_allocator.rs
+++ b/src/common/meta/src/wal/options_allocator.rs
@@ -128,4 +128,43 @@ mod tests {
.collect();
assert_eq!(got, expected);
}
+
+ // Tests that the wal options allocator could successfully allocate Kafka wal options.
+ // #[tokio::test]
+ // async fn test_kafka_options_allocator() {
+ // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
+ // .unwrap()
+ // .split(',')
+ // .map(ToString::to_string)
+ // .collect::>();
+ // let config = MetaSrvKafkaConfig {
+ // topic_name_prefix: "__test_kafka_options_allocator".to_string(),
+ // replication_factor: broker_endpoints.len() as i16,
+ // broker_endpoints,
+ // ..Default::default()
+ // };
+ // let wal_config = WalConfig::Kafka(config.clone());
+ // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ // let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
+ // allocator.start().await.unwrap();
+
+ // let num_regions = 32;
+ // let regions = (0..num_regions).collect::>();
+ // let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
+
+ // // Topics should be allocated.
+ // let topics = (0..num_regions)
+ // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
+ // .collect::>();
+ // // Check the allocated wal options contain the expected topics.
+ // let expected = (0..num_regions)
+ // .map(|i| {
+ // let options = WalOptions::Kafka(KafkaWalOptions {
+ // topic: topics[i as usize].clone(),
+ // });
+ // (i, serde_json::to_string(&options).unwrap())
+ // })
+ // .collect::>();
+ // assert_eq!(got, expected);
+ // }
}
diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs
index 3eb6312b2f72..27b01035c710 100644
--- a/src/log-store/src/kafka/client_manager.rs
+++ b/src/log-store/src/kafka/client_manager.rs
@@ -98,12 +98,12 @@ impl ClientManager {
/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result {
- let client_pool = self.client_pool.read().await;
- if let Some(client) = client_pool.get(topic) {
- return Ok(client.clone());
+ {
+ let client_pool = self.client_pool.read().await;
+ if let Some(client) = client_pool.get(topic) {
+ return Ok(client.clone());
+ }
}
- // Manullay releases the read lock.
- drop(client_pool);
// Acquires the write lock.
let mut client_pool = self.client_pool.write().await;
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index 30ea652c51f7..9177ca423247 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -293,7 +293,7 @@ mod tests {
use super::*;
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::{
- create_topics, entries_with_random_data, Affix, EntryBuilder, TopicDecorator,
+ create_topics, entries_with_random_data, new_namespace, Affix, EntryBuilder, TopicDecorator,
};
// Stores test context for a region.
@@ -325,10 +325,7 @@ mod tests {
(0..num_regions)
.map(|i| {
let topic = &topics[i % topics.len()];
- let ns = NamespaceImpl {
- region_id: i as u64,
- topic: topic.to_string(),
- };
+ let ns = new_namespace(topic, i as u64);
let entry_builder = EntryBuilder::new(ns.clone());
RegionContext {
ns,
diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs
index 547e2c7b3112..2307020fa37d 100644
--- a/src/log-store/src/kafka/record_utils.rs
+++ b/src/log-store/src/kafka/record_utils.rs
@@ -145,26 +145,12 @@ pub(crate) fn decode_from_record(record: Record) -> Result> {
#[cfg(test)]
mod tests {
use super::*;
-
- fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl {
- EntryImpl {
- data: data.as_ref().to_vec(),
- id: entry_id,
- ns,
- }
- }
+ use crate::test_util::kafka::{entries_with_random_data, new_namespace, EntryBuilder};
#[test]
fn test_serde_record_meta() {
- let ns = NamespaceImpl {
- region_id: 1,
- topic: "test_topic".to_string(),
- };
- let entries = vec![
- new_test_entry(b"111", 1, ns.clone()),
- new_test_entry(b"2222", 2, ns.clone()),
- new_test_entry(b"33333", 3, ns.clone()),
- ];
+ let ns = new_namespace("test_topic", 1);
+ let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone()));
let meta = RecordMeta::new(ns, &entries);
let encoded = serde_json::to_vec(&meta).unwrap();
let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap();
@@ -173,15 +159,8 @@ mod tests {
#[test]
fn test_encdec_record() {
- let ns = NamespaceImpl {
- region_id: 1,
- topic: "test_topic".to_string(),
- };
- let entries = vec![
- new_test_entry(b"111", 1, ns.clone()),
- new_test_entry(b"2222", 2, ns.clone()),
- new_test_entry(b"33333", 3, ns.clone()),
- ];
+ let ns = new_namespace("test_topic", 1);
+ let entries = entries_with_random_data(3, &EntryBuilder::new(ns.clone()));
let record = encode_to_record(ns, entries.clone()).unwrap();
let decoded_entries = decode_from_record(record).unwrap();
assert_eq!(entries, decoded_entries);
diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs
index 3f926e6c1c8b..1bd9a997c203 100644
--- a/src/log-store/src/test_util/kafka.rs
+++ b/src/log-store/src/test_util/kafka.rs
@@ -18,7 +18,7 @@ pub mod topic_decorator;
use common_config::wal::KafkaWalTopic as Topic;
use rskafka::client::ClientBuilder;
-use crate::kafka::EntryImpl;
+use crate::kafka::{EntryImpl, NamespaceImpl};
pub use crate::test_util::kafka::entry_builder::EntryBuilder;
pub use crate::test_util::kafka::topic_decorator::{Affix, TopicDecorator};
@@ -66,6 +66,14 @@ pub async fn create_topics(
topics
}
+/// Creates a new namespace with the given topic and region id.
+pub fn new_namespace(topic: &str, region_id: u64) -> NamespaceImpl {
+ NamespaceImpl {
+ topic: topic.to_string(),
+ region_id,
+ }
+}
+
/// Builds a batch of entries each with random data.
pub fn entries_with_random_data(batch_size: usize, builder: &EntryBuilder) -> Vec {
(0..batch_size)
diff --git a/tests-integration/tests/wal.rs b/tests-integration/tests/wal.rs
deleted file mode 100644
index 8bf09f55c550..000000000000
--- a/tests-integration/tests/wal.rs
+++ /dev/null
@@ -1,142 +0,0 @@
-// // Copyright 2023 Greptime Team
-// //
-// // Licensed under the Apache License, Version 2.0 (the "License");
-// // you may not use this file except in compliance with the License.
-// // You may obtain a copy of the License at
-// //
-// // http://www.apache.org/licenses/LICENSE-2.0
-// //
-// // Unless required by applicable law or agreed to in writing, software
-// // distributed under the License is distributed on an "AS IS" BASIS,
-// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// // See the License for the specific language governing permissions and
-// // limitations under the License.
-
-// use std::collections::HashMap;
-// use std::sync::Arc;
-
-// use common_config::wal::KafkaConfig as DatanodeKafkaConfig;
-// use common_config::{KafkaWalOptions, WalOptions};
-// use common_meta::kv_backend::memory::MemoryKvBackend;
-// use common_meta::kv_backend::KvBackendRef;
-// use common_meta::wal::kafka::{
-// KafkaConfig as MetaSrvKafkaConfig, TopicManager as KafkaTopicManager,
-// };
-// use common_meta::wal::{allocate_region_wal_options, WalConfig, WalOptionsAllocator};
-// use futures::StreamExt;
-// use log_store::error::Result as LogStoreResult;
-// use log_store::kafka::log_store::KafkaLogStore;
-// use log_store::kafka::{EntryImpl, NamespaceImpl};
-// use rskafka::client::controller::ControllerClient;
-// use rskafka::client::ClientBuilder;
-// use store_api::logstore::entry::Id as EntryId;
-// use store_api::logstore::LogStore;
-// use tests_integration::wal_util::{DockerCli, KafkaImage, DEFAULT_EXPOSED_PORT};
-
-// // Notice: the following tests are literally unit tests. They are placed at here since
-// // it seems too heavy to start a Kafka cluster for each unit test.
-
-// // The key of an env variable that stores a series of Kafka broker endpoints.
-// const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS";
-
-// // Tests that the TopicManager allocates topics in a round-robin mannar.
-// #[tokio::test]
-// async fn test_kafka_alloc_topics() {
-// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
-// .unwrap()
-// .split(',')
-// .map(ToString::to_string)
-// .collect::>();
-// let config = MetaSrvKafkaConfig {
-// topic_name_prefix: "__test_kafka_alloc_topics".to_string(),
-// replication_factor: broker_endpoints.len() as i16,
-// broker_endpoints,
-// ..Default::default()
-// };
-// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
-// let manager = KafkaTopicManager::new(config.clone(), kv_backend);
-// manager.start().await.unwrap();
-
-// // Topics should be created.
-// let topics = (0..config.num_topics)
-// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
-// .collect::>();
-
-// // Selects exactly the number of `num_topics` topics one by one.
-// for expected in topics.iter() {
-// let got = manager.select().unwrap();
-// assert_eq!(got, expected);
-// }
-
-// // Selects exactly the number of `num_topics` topics in a batching manner.
-// let got = manager
-// .select_batch(config.num_topics)
-// .unwrap()
-// .into_iter()
-// .map(ToString::to_string)
-// .collect::>();
-// assert_eq!(got, topics);
-
-// // Selects none.
-// let got = manager.select_batch(config.num_topics).unwrap();
-// assert!(got.is_empty());
-
-// // Selects more than the number of `num_topics` topics.
-// let got = manager
-// .select_batch(2 * config.num_topics)
-// .unwrap()
-// .into_iter()
-// .map(ToString::to_string)
-// .collect::>();
-// let expected = vec![topics.clone(); 2]
-// .into_iter()
-// .flatten()
-// .collect::>();
-// assert_eq!(got, expected);
-// }
-
-// // Tests that the wal options allocator could successfully allocate Kafka wal options.
-// #[tokio::test]
-// async fn test_kafka_options_allocator() {
-// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
-// .unwrap()
-// .split(',')
-// .map(ToString::to_string)
-// .collect::>();
-// let config = MetaSrvKafkaConfig {
-// topic_name_prefix: "__test_kafka_options_allocator".to_string(),
-// replication_factor: broker_endpoints.len() as i16,
-// broker_endpoints,
-// ..Default::default()
-// };
-// let wal_config = WalConfig::Kafka(config.clone());
-// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
-// let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
-// allocator.start().await.unwrap();
-
-// let num_regions = 32;
-// let regions = (0..num_regions).collect::>();
-// let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
-
-// // Topics should be allocated.
-// let topics = (0..num_regions)
-// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
-// .collect::>();
-// // Check the allocated wal options contain the expected topics.
-// let expected = (0..num_regions)
-// .map(|i| {
-// let options = WalOptions::Kafka(KafkaWalOptions {
-// topic: topics[i as usize].clone(),
-// });
-// (i, serde_json::to_string(&options).unwrap())
-// })
-// .collect::>();
-// assert_eq!(got, expected);
-// }
-
-// async fn create_topic(topic: &str, replication_factor: i16, client: &ControllerClient) {
-// client
-// .create_topic(topic, 1, replication_factor, 500)
-// .await
-// .unwrap();
-// }
From d6f2b34807a9af5a239e3619f6bc389e472586ac Mon Sep 17 00:00:00 2001
From: niebayes
Date: Tue, 2 Jan 2024 10:38:08 +0800
Subject: [PATCH 15/30] test: update unit tests for client manager
---
README.md | 3 --
src/log-store/src/kafka/client_manager.rs | 43 ++++++++++++++---------
src/log-store/src/kafka/log_store.rs | 4 +--
3 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/README.md b/README.md
index 9dd8c54c7e78..415608ad3c83 100644
--- a/README.md
+++ b/README.md
@@ -27,9 +27,6 @@
-> [!WARNING]
-> Our default branch has changed from `develop` to `main` (issue [#3025](https://github.com/GreptimeTeam/greptimedb/issues/3025)). Please update your local repository to use the `main` branch.
-
## What is GreptimeDB
GreptimeDB is an open-source time-series database with a special focus on
diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs
index 27b01035c710..dbee6dc872e0 100644
--- a/src/log-store/src/kafka/client_manager.rs
+++ b/src/log-store/src/kafka/client_manager.rs
@@ -143,20 +143,13 @@ mod tests {
use crate::get_broker_endpoints_from_env;
use crate::test_util::kafka::{create_topics, Affix, TopicDecorator};
- /// Checks clients for the given topics are created.
- async fn ensure_clients_exist(topics: &[Topic], client_manager: &ClientManager) {
- let client_pool = client_manager.client_pool.read().await;
- let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
- assert!(all_exist);
- }
-
- async fn test_which(test_name: &str) {
- // Creates a collection of topics in Kafka.
+ /// Prepares for a test in that a collection of topics and a client manager are created.
+ async fn prepare(test_name: &str, num_topics: usize) -> (ClientManager, Vec) {
let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed(test_name.to_string()))
.with_suffix(Affix::TimeNow);
- let topics = create_topics(256, decorator, &broker_endpoints, None).await;
+ let topics = create_topics(num_topics, decorator, &broker_endpoints, None).await;
let config = KafkaConfig {
broker_endpoints,
@@ -164,36 +157,54 @@ mod tests {
};
let manager = ClientManager::try_new(&config).await.unwrap();
+ (manager, topics)
+ }
+
+ /// Checks clients for the given topics are created.
+ async fn ensure_clients_exist(topics: &[Topic], client_manager: &ClientManager) {
+ let client_pool = client_manager.client_pool.read().await;
+ let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
+ assert!(all_exist);
+ }
+
+ async fn test_with(test_name: &str) {
+ let (manager, topics) = prepare(test_name, 128).await;
+ // Assigns multiple regions to a topic.
+ let region_topic = (0..512)
+ .map(|region_id| (region_id, &topics[region_id % topics.len()]))
+ .collect::>();
+
+ // Gets the assigned topic for each region and then gets the associated client.
match test_name {
"test_sequential" => {
// Gets all clients sequentially.
- for topic in topics.iter() {
+ for (_, topic) in region_topic {
manager.get_or_insert(topic).await.unwrap();
}
}
"test_parallel" => {
// Gets all clients in parallel.
- let tasks = topics
- .iter()
+ let tasks = region_topic
+ .values()
.map(|topic| manager.get_or_insert(topic))
.collect::>();
futures::future::try_join_all(tasks).await.unwrap();
}
_ => unreachable!(),
}
-
+ // Ensures all clients are created successfully.
ensure_clients_exist(&topics, &manager).await;
}
/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
- test_which("test_sequential").await;
+ test_with("test_sequential").await;
}
/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_parallel() {
- test_which("test_parallel").await;
+ test_with("test_parallel").await;
}
}
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index 9177ca423247..4703c5bd2b96 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions};
-use common_telemetry::debug;
+use common_telemetry::{debug, warn};
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
@@ -178,7 +178,7 @@ impl LogStore for KafkaLogStore {
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
- debug!(
+ warn!(
"No new entries for ns {} in range [{}, {}]",
ns, start_offset, end_offset
);
From b06dc94af1e6e720215bcbd524097018c464f22c Mon Sep 17 00:00:00 2001
From: niebayes
Date: Wed, 3 Jan 2024 01:31:49 +0800
Subject: [PATCH 16/30] test: add unit tests for meta srv remote wal
---
Cargo.lock | 5 +
src/common/meta/Cargo.toml | 1 +
.../meta/src/wal/kafka/topic_manager.rs | 123 +++++++----------
.../meta/src/wal/kafka/topic_selector.rs | 8 ++
src/common/meta/src/wal/options_allocator.rs | 83 ++++++------
src/common/test-util/Cargo.toml | 4 +
src/common/test-util/src/wal.rs | 5 -
src/common/test-util/src/wal/kafka.rs | 50 +++++++
.../src/wal}/kafka/topic_decorator.rs | 0
src/log-store/src/kafka/client_manager.rs | 8 +-
src/log-store/src/kafka/log_store.rs | 12 +-
src/log-store/src/test_util/kafka.rs | 128 +++++++++++-------
.../src/test_util/kafka/entry_builder.rs | 100 --------------
13 files changed, 251 insertions(+), 276 deletions(-)
rename src/{log-store/src/test_util => common/test-util/src/wal}/kafka/topic_decorator.rs (100%)
delete mode 100644 src/log-store/src/test_util/kafka/entry_builder.rs
diff --git a/Cargo.lock b/Cargo.lock
index 663fc1788c0b..355b828073e9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1837,6 +1837,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-telemetry",
+ "common-test-util",
"common-time",
"datatypes",
"derive_builder 0.12.0",
@@ -1984,9 +1985,13 @@ dependencies = [
name = "common-test-util"
version = "0.5.0"
dependencies = [
+ "chrono",
+ "common-config",
+ "futures",
"once_cell",
"rand",
"rskafka",
+ "store-api",
"tempfile",
"testcontainers",
"tokio",
diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml
index 5a15581f41c6..5f34915f9728 100644
--- a/src/common/meta/Cargo.toml
+++ b/src/common/meta/Cargo.toml
@@ -23,6 +23,7 @@ common-procedure.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
+common-test-util.workspace = true
common-time.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs
index a7519493ea63..64f0e7b31e93 100644
--- a/src/common/meta/src/wal/kafka/topic_manager.rs
+++ b/src/common/meta/src/wal/kafka/topic_manager.rs
@@ -47,8 +47,7 @@ const DEFAULT_PARTITION: i32 = 0;
/// Manages topic initialization and selection.
pub struct TopicManager {
config: KafkaConfig,
- // TODO(niebayes): maybe add a guard to ensure all topics in the topic pool are created.
- topic_pool: Vec,
+ pub(crate) topic_pool: Vec,
topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
@@ -242,7 +241,11 @@ impl TopicManager {
mod tests {
use std::env;
+ use chrono::format::Fixed;
use common_telemetry::info;
+ use common_test_util::get_broker_endpoints;
+ use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
+ use common_test_util::wal::kafka::{create_topics, BROKER_ENDPOINTS_KEY};
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -273,82 +276,56 @@ mod tests {
assert_eq!(topics, restored_topics);
}
+ /// Tests that the topic manager could allocate topics correctly.
#[tokio::test]
- async fn test_topic_manager() {
- let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
- common_telemetry::init_default_ut_logging();
+ async fn test_alloc_topics() {
+ let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
+ // Constructs topics that should be created.
+ let mut decorator = TopicDecorator::default()
+ .with_prefix(Affix::Fixed("test_alloc_topics".to_string()))
+ .with_suffix(Affix::TimeNow);
+ let topics = (0..256)
+ .map(|i| decorator.decorate(&format!("topic_{i}")))
+ .collect::>();
- if endpoints.is_empty() {
- info!("The endpoints is empty, skipping the test.");
- return;
- }
- // TODO: supports topic prefix
- let kv_backend = Arc::new(MemoryKvBackend::new());
+ // Creates a topic manager.
let config = KafkaConfig {
- replication_factor: 1,
- broker_endpoints: endpoints
- .split(',')
- .map(|s| s.to_string())
- .collect::>(),
+ replication_factor: broker_endpoints.len() as i16,
+ broker_endpoints,
..Default::default()
};
- let manager = TopicManager::new(config, kv_backend);
+ let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ let mut manager = TopicManager::new(config.clone(), kv_backend);
+ // Replaces the default topic pool with the constructed topics.
+ manager.topic_pool = topics.clone();
manager.start().await.unwrap();
- }
- // Tests that the TopicManager allocates topics in a round-robin mannar.
- // #[tokio::test]
- // async fn test_kafka_alloc_topics() {
- // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
- // .unwrap()
- // .split(',')
- // .map(ToString::to_string)
- // .collect::>();
- // let config = MetaSrvKafkaConfig {
- // topic_name_prefix: "__test_kafka_alloc_topics".to_string(),
- // replication_factor: broker_endpoints.len() as i16,
- // broker_endpoints,
- // ..Default::default()
- // };
- // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
- // let manager = KafkaTopicManager::new(config.clone(), kv_backend);
- // manager.start().await.unwrap();
-
- // // Topics should be created.
- // let topics = (0..config.num_topics)
- // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
- // .collect::>();
-
- // // Selects exactly the number of `num_topics` topics one by one.
- // for expected in topics.iter() {
- // let got = manager.select().unwrap();
- // assert_eq!(got, expected);
- // }
-
- // // Selects exactly the number of `num_topics` topics in a batching manner.
- // let got = manager
- // .select_batch(config.num_topics)
- // .unwrap()
- // .into_iter()
- // .map(ToString::to_string)
- // .collect::>();
- // assert_eq!(got, topics);
-
- // // Selects none.
- // let got = manager.select_batch(config.num_topics).unwrap();
- // assert!(got.is_empty());
-
- // // Selects more than the number of `num_topics` topics.
- // let got = manager
- // .select_batch(2 * config.num_topics)
- // .unwrap()
- // .into_iter()
- // .map(ToString::to_string)
- // .collect::>();
- // let expected = vec![topics.clone(); 2]
- // .into_iter()
- // .flatten()
- // .collect::>();
- // assert_eq!(got, expected);
- // }
+ // Selects exactly the number of `num_topics` topics one by one.
+ for expected in topics.iter() {
+ let got = manager.select().unwrap();
+ assert_eq!(got, expected);
+ }
+
+ // Selects exactly the number of `num_topics` topics in a batching manner.
+ let got = manager
+ .select_batch(topics.len())
+ .unwrap()
+ .into_iter()
+ .map(ToString::to_string)
+ .collect::>();
+ assert_eq!(got, topics);
+
+ // Selects more than the number of `num_topics` topics.
+ let got = manager
+ .select_batch(2 * topics.len())
+ .unwrap()
+ .into_iter()
+ .map(ToString::to_string)
+ .collect::>();
+ let expected = vec![topics.clone(); 2]
+ .into_iter()
+ .flatten()
+ .collect::>();
+ assert_eq!(got, expected);
+ }
}
diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs
index fe7517bfd0b5..df4feb8b2e12 100644
--- a/src/common/meta/src/wal/kafka/topic_selector.rs
+++ b/src/common/meta/src/wal/kafka/topic_selector.rs
@@ -60,6 +60,14 @@ impl TopicSelector for RoundRobinTopicSelector {
mod tests {
use super::*;
+ /// Tests that a selector behaves as expected when the given topic pool is empty.
+ #[test]
+ fn test_empty_topic_pool() {
+ let topic_pool = vec![];
+ let selector = RoundRobinTopicSelector::default();
+ assert!(selector.select(&topic_pool).is_err());
+ }
+
#[test]
fn test_round_robin_topic_selector() {
let topic_pool: Vec<_> = [0, 1, 2].into_iter().map(|v| v.to_string()).collect();
diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs
index b6f6a99ba089..44a15c69e3fd 100644
--- a/src/common/meta/src/wal/options_allocator.rs
+++ b/src/common/meta/src/wal/options_allocator.rs
@@ -105,11 +105,15 @@ pub fn allocate_region_wal_options(
#[cfg(test)]
mod tests {
+ use common_test_util::get_broker_endpoints;
+ use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
+ use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;
+
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
+ use crate::wal::kafka::KafkaConfig;
// Tests the wal options allocator could successfully allocate raft-engine wal options.
- // Note: tests for allocator with kafka are integration tests.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
@@ -130,41 +134,44 @@ mod tests {
}
// Tests that the wal options allocator could successfully allocate Kafka wal options.
- // #[tokio::test]
- // async fn test_kafka_options_allocator() {
- // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
- // .unwrap()
- // .split(',')
- // .map(ToString::to_string)
- // .collect::>();
- // let config = MetaSrvKafkaConfig {
- // topic_name_prefix: "__test_kafka_options_allocator".to_string(),
- // replication_factor: broker_endpoints.len() as i16,
- // broker_endpoints,
- // ..Default::default()
- // };
- // let wal_config = WalConfig::Kafka(config.clone());
- // let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
- // let allocator = WalOptionsAllocator::new(wal_config, kv_backend);
- // allocator.start().await.unwrap();
-
- // let num_regions = 32;
- // let regions = (0..num_regions).collect::>();
- // let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
-
- // // Topics should be allocated.
- // let topics = (0..num_regions)
- // .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
- // .collect::>();
- // // Check the allocated wal options contain the expected topics.
- // let expected = (0..num_regions)
- // .map(|i| {
- // let options = WalOptions::Kafka(KafkaWalOptions {
- // topic: topics[i as usize].clone(),
- // });
- // (i, serde_json::to_string(&options).unwrap())
- // })
- // .collect::>();
- // assert_eq!(got, expected);
- // }
+ #[tokio::test]
+ async fn test_allocator_with_kafka() {
+ let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
+ // Constructs topics that should be created.
+ let mut decorator = TopicDecorator::default()
+ .with_prefix(Affix::Fixed("test_allocator_with_kafka".to_string()))
+ .with_suffix(Affix::TimeNow);
+ let topics = (0..256)
+ .map(|i| decorator.decorate(&format!("topic_{i}")))
+ .collect::>();
+
+ // Creates a topic manager.
+ let config = KafkaConfig {
+ replication_factor: broker_endpoints.len() as i16,
+ broker_endpoints,
+ ..Default::default()
+ };
+ let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
+ let mut topic_manager = KafkaTopicManager::new(config.clone(), kv_backend);
+
+ // Creates an options allocator.
+ let wal_config = WalConfig::Kafka(config.clone());
+ let allocator = WalOptionsAllocator::Kafka(topic_manager);
+ allocator.start().await.unwrap();
+
+ let num_regions = 32;
+ let regions = (0..num_regions).collect::>();
+ let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
+
+ // Check the allocated wal options contain the expected topics.
+ let expected = (0..num_regions)
+ .map(|i| {
+ let options = WalOptions::Kafka(KafkaWalOptions {
+ topic: topics[i as usize].clone(),
+ });
+ (i, serde_json::to_string(&options).unwrap())
+ })
+ .collect::>();
+ assert_eq!(got, expected);
+ }
}
diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml
index e3caa859d21f..0884c626c2cf 100644
--- a/src/common/test-util/Cargo.toml
+++ b/src/common/test-util/Cargo.toml
@@ -5,9 +5,13 @@ edition.workspace = true
license.workspace = true
[dependencies]
+chrono.workspace = true
+common-config.workspace = true
+futures.workspace = true
once_cell.workspace = true
rand.workspace = true
rskafka.workspace = true
+store-api.workspace = true
tempfile.workspace = true
testcontainers = "0.15.0"
tokio.workspace = true
diff --git a/src/common/test-util/src/wal.rs b/src/common/test-util/src/wal.rs
index c1085a8f1e17..28c04633b640 100644
--- a/src/common/test-util/src/wal.rs
+++ b/src/common/test-util/src/wal.rs
@@ -13,8 +13,3 @@
// limitations under the License.
pub mod kafka;
-
-pub use testcontainers::clients::Cli as DockerCli;
-
-pub use crate::wal::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT as DEFAULT_EXPOSED_PORT;
-pub use crate::wal::kafka::image::Image as KafkaImage;
diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs
index 427cbea89a71..db8fde3ea4f4 100644
--- a/src/common/test-util/src/wal/kafka.rs
+++ b/src/common/test-util/src/wal/kafka.rs
@@ -14,5 +14,55 @@
pub mod config;
pub mod image;
+pub mod topic_decorator;
+
+use common_config::wal::KafkaWalTopic as Topic;
+use rskafka::client::ClientBuilder;
+
+use crate::wal::kafka::topic_decorator::TopicDecorator;
pub const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS";
+
+/// Gets broker endpoints from environment variables with the given key.
+/// Returns ["localhost:9092"] if no environment variables set for broker endpoints.
+#[macro_export]
+macro_rules! get_broker_endpoints {
+ ($key:expr) => {{
+ let broker_endpoints = std::env::var($key)
+ .unwrap_or("localhost:9092".to_string())
+ .split(',')
+ .map(ToString::to_string)
+ .collect::>();
+ assert!(!broker_endpoints.is_empty());
+ broker_endpoints
+ }};
+}
+
+/// Creates `num_topiocs` number of topics from the seed topic which are going to be decorated with the given TopicDecorator.
+/// A default seed `topic` will be used if the provided seed is None.
+pub async fn create_topics(
+ num_topics: usize,
+ mut decorator: TopicDecorator,
+ broker_endpoints: &[String],
+ seed: Option<&str>,
+) -> Vec {
+ assert!(!broker_endpoints.is_empty());
+
+ let client = ClientBuilder::new(broker_endpoints.to_vec())
+ .build()
+ .await
+ .unwrap();
+ let ctrl_client = client.controller_client().unwrap();
+
+ let seed = seed.unwrap_or("topic");
+ let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics)
+ .map(|i| {
+ let topic = decorator.decorate(&format!("{seed}_{i}"));
+ let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500);
+ (topic, task)
+ })
+ .unzip();
+ futures::future::try_join_all(tasks).await.unwrap();
+
+ topics
+}
diff --git a/src/log-store/src/test_util/kafka/topic_decorator.rs b/src/common/test-util/src/wal/kafka/topic_decorator.rs
similarity index 100%
rename from src/log-store/src/test_util/kafka/topic_decorator.rs
rename to src/common/test-util/src/wal/kafka/topic_decorator.rs
diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs
index dbee6dc872e0..d114bffa0813 100644
--- a/src/log-store/src/kafka/client_manager.rs
+++ b/src/log-store/src/kafka/client_manager.rs
@@ -137,15 +137,15 @@ impl ClientManager {
#[cfg(test)]
mod tests {
- use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;
+ use common_test_util::get_broker_endpoints;
+ use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
+ use common_test_util::wal::kafka::{create_topics, BROKER_ENDPOINTS_KEY};
use super::*;
- use crate::get_broker_endpoints_from_env;
- use crate::test_util::kafka::{create_topics, Affix, TopicDecorator};
/// Prepares for a test in that a collection of topics and a client manager are created.
async fn prepare(test_name: &str, num_topics: usize) -> (ClientManager, Vec) {
- let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
+ let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed(test_name.to_string()))
.with_suffix(Affix::TimeNow);
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index 4703c5bd2b96..c1d663a62804 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -287,14 +287,13 @@ impl LogStore for KafkaLogStore {
#[cfg(test)]
mod tests {
- use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;
+ use common_test_util::get_broker_endpoints;
+ use common_test_util::wal::kafka::topic_decorator::{Affix, TopicDecorator};
+ use common_test_util::wal::kafka::{create_topics, BROKER_ENDPOINTS_KEY};
use rand::seq::IteratorRandom;
use super::*;
- use crate::get_broker_endpoints_from_env;
- use crate::test_util::kafka::{
- create_topics, entries_with_random_data, new_namespace, Affix, EntryBuilder, TopicDecorator,
- };
+ use crate::test_util::kafka::{entries_with_random_data, new_namespace, EntryBuilder};
// Stores test context for a region.
struct RegionContext {
@@ -305,7 +304,7 @@ mod tests {
/// Prepares for a test in that a log store is constructed and a collection of topics is created.
async fn prepare(test_name: &str, num_topics: usize) -> (KafkaLogStore, Vec) {
- let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY);
+ let broker_endpoints = get_broker_endpoints!(BROKER_ENDPOINTS_KEY);
let decorator = TopicDecorator::default()
.with_prefix(Affix::Fixed(test_name.to_string()))
.with_suffix(Affix::TimeNow);
@@ -434,6 +433,7 @@ mod tests {
test_with("test_multi_appends", 32, 256, 16, false).await;
}
+ // TODO(niebayes): implement.
// / Appends a way-too large entry and checks the log store could handle it correctly.
// #[tokio::test]
// async fn test_way_too_large() {}
diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs
index 1bd9a997c203..cdce23d8b156 100644
--- a/src/log-store/src/test_util/kafka.rs
+++ b/src/log-store/src/test_util/kafka.rs
@@ -11,66 +11,86 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering};
+use std::sync::Mutex;
-pub mod entry_builder;
-pub mod topic_decorator;
-
-use common_config::wal::KafkaWalTopic as Topic;
-use rskafka::client::ClientBuilder;
+use rand::rngs::ThreadRng;
+use rand::seq::SliceRandom;
+use rand::{thread_rng, Rng};
+use store_api::logstore::EntryId;
use crate::kafka::{EntryImpl, NamespaceImpl};
-pub use crate::test_util::kafka::entry_builder::EntryBuilder;
-pub use crate::test_util::kafka::topic_decorator::{Affix, TopicDecorator};
-/// Gets broker endpoints from environment variables with the given key.
-/// Returns ["localhost:9092"] if no environment variables set for broker endpoints.
-#[macro_export]
-macro_rules! get_broker_endpoints_from_env {
- ($key:expr) => {{
- let broker_endpoints = std::env::var($key)
- .unwrap_or("localhost:9092".to_string())
- .split(',')
- .map(ToString::to_string)
- .collect::>();
- assert!(!broker_endpoints.is_empty());
- broker_endpoints
- }};
+/// A builder for building entries for a namespace.
+pub struct EntryBuilder {
+ /// The namespace of the entries.
+ ns: NamespaceImpl,
+ /// The next entry id to allocate. It starts from 0 by default.
+ next_entry_id: AtomicEntryId,
+ /// A generator for supporting random data generation.
+ /// Wrapped with Mutex