From 2ad2f4e526b05dfa93f8cc41d934017c11faaa87 Mon Sep 17 00:00:00 2001 From: niebayes Date: Wed, 3 Jan 2024 12:12:31 +0800 Subject: [PATCH 1/2] test: add kafka image for testing purpose --- Cargo.lock | 95 ++++++++- src/common/test-util/Cargo.toml | 3 + src/common/test-util/src/lib.rs | 1 + src/common/test-util/src/wal.rs | 15 ++ src/common/test-util/src/wal/kafka.rs | 15 ++ src/common/test-util/src/wal/kafka/image.rs | 219 ++++++++++++++++++++ 6 files changed, 344 insertions(+), 4 deletions(-) create mode 100644 src/common/test-util/src/wal.rs create mode 100644 src/common/test-util/src/wal/kafka.rs create mode 100644 src/common/test-util/src/wal/kafka/image.rs diff --git a/Cargo.lock b/Cargo.lock index b042227a293b..db5678a1e986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -972,6 +972,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard-stubs" +version = "1.42.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864" +dependencies = [ + "serde", + "serde_with 1.14.0", +] + [[package]] name = "borsh" version = "1.3.0" @@ -1630,7 +1640,7 @@ dependencies = [ "rskafka", "serde", "serde_json", - "serde_with", + "serde_with 3.4.0", "toml 0.8.8", ] @@ -1842,7 +1852,7 @@ dependencies = [ "rskafka", "serde", "serde_json", - "serde_with", + "serde_with 3.4.0", "snafu", "store-api", "strum 0.25.0", @@ -1976,7 +1986,10 @@ version = "0.5.0" dependencies = [ "once_cell", "rand", + "rskafka", "tempfile", + "testcontainers", + "tokio", ] [[package]] @@ -2334,6 +2347,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.14.4" @@ -2354,6 +2377,20 @@ dependencies = [ "darling_macro 0.20.3", ] +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", +] + [[package]] name = "darling_core" version = "0.14.4" @@ -2382,6 +2419,17 @@ dependencies = [ "syn 2.0.43", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.14.4" @@ -4991,7 +5039,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with", + "serde_with 3.4.0", "smallvec", "snafu", "store-api", @@ -8405,6 +8453,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros 1.5.2", +] + [[package]] name = "serde_with" version = "3.4.0" @@ -8418,10 +8476,22 @@ dependencies = [ "indexmap 2.1.0", "serde", "serde_json", - "serde_with_macros", + "serde_with_macros 3.4.0", "time", ] +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling 0.13.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "serde_with_macros" version = "3.4.0" @@ -9463,6 +9533,23 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" +[[package]] +name = "testcontainers" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d2931d7f521af5bae989f716c3fa43a6af9af7ec7a5e21b59ae40878cec00" +dependencies = [ + "bollard-stubs", + "futures", + "hex", + "hmac", + "log", + "rand", + "serde", + "serde_json", + "sha2", +] + [[package]] name = "tests-integration" version = "0.5.0" diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 60e854740643..e3caa859d21f 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -7,4 +7,7 @@ license.workspace = true [dependencies] once_cell.workspace = true rand.workspace = true +rskafka.workspace = true tempfile.workspace = true +testcontainers = "0.15.0" +tokio.workspace = true diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index ef6ff4696861..f4a9c747e518 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -20,6 +20,7 @@ use std::sync::LazyLock; pub mod ports; pub mod temp_dir; +pub mod wal; // Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the // workspace, see https://github.com/rust-lang/cargo/issues/3946. diff --git a/src/common/test-util/src/wal.rs b/src/common/test-util/src/wal.rs new file mode 100644 index 000000000000..28c04633b640 --- /dev/null +++ b/src/common/test-util/src/wal.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs new file mode 100644 index 000000000000..4b00725547b1 --- /dev/null +++ b/src/common/test-util/src/wal/kafka.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod image; diff --git a/src/common/test-util/src/wal/kafka/image.rs b/src/common/test-util/src/wal/kafka/image.rs new file mode 100644 index 000000000000..ff801ea3dfec --- /dev/null +++ b/src/common/test-util/src/wal/kafka/image.rs @@ -0,0 +1,219 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use testcontainers::core::{ContainerState, ExecCommand, WaitFor}; + +const IMAGE_NAME: &str = "confluentinc/cp-kafka"; +const IMAGE_TAG: &str = "7.4.3"; +/// Through which port the Zookeeper node listens for external traffics, e.g. traffics from the Kafka node. +const ZOOKEEPER_PORT: u16 = 2181; +/// Through which port the Kafka node listens for internal traffics, i.e. traffics between Kafka nodes in the same Kafka cluster. +const KAFKA_LISTENER_PORT: u16 = 19092; +/// Through which port the Kafka node listens for external traffics, e.g. traffics from Kafka clients. +const KAFKA_ADVERTISED_LISTENER_PORT: u16 = 9092; + +#[derive(Debug, Clone, Default)] +pub struct ImageArgs; + +impl testcontainers::ImageArgs for ImageArgs { + fn into_iterator(self) -> Box> { + Box::new( + vec![ + "/bin/bash".to_string(), + "-c".to_string(), + format!( + r#" + echo 'clientPort={}' > zookeeper.properties; + echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties; + echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties; + zookeeper-server-start zookeeper.properties & + . /etc/confluent/docker/bash-config && + /etc/confluent/docker/configure && + /etc/confluent/docker/launch + "#, + ZOOKEEPER_PORT, + ), + ] + .into_iter(), + ) + } +} + +/// # Example +/// ```rust +/// // Starts a Kafka container. +/// let port = KAFKA_ADVERTISED_LISTENER_PORT; +/// let docker = DockerCli::default(); +/// let container = docker.run(Image::default()); +/// // Gets the broker endpoints of the containerized Kafka node. +/// let broker_endpoints = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))]; +/// // Do something with the broker endpoints, for e.g. building a Kafka client. +/// let client = ClientBuilder::new(broker_endpoints).build().await.unwrap(); +/// ``` +pub struct Image { + env_vars: HashMap, +} + +impl Default for Image { + fn default() -> Self { + Self { + env_vars: build_env_vars(), + } + } +} + +impl testcontainers::Image for Image { + type Args = ImageArgs; + + /// The name of the Kafka image hosted in the docker hub. + fn name(&self) -> String { + IMAGE_NAME.to_string() + } + + /// The tag of the kafka image hosted in the docker hub. + /// Warning: please use a tag with long-term support. Do not use `latest` or any other tags that + /// the underlying image may suddenly change. + fn tag(&self) -> String { + IMAGE_TAG.to_string() + } + + /// The runtime is regarded ready to be used if all ready conditions are met. + /// Warning: be sure to update the conditions when necessary if the image is altered. + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout( + "started (kafka.server.KafkaServer)", + )] + } + + /// The environment variables required to run the runtime. + /// Warning: be sure to update the environment variables when necessary if the image is altered. + fn env_vars(&self) -> Box + '_> { + Box::new(self.env_vars.iter()) + } + + /// The runtime is running in a docker container and has its own network. In order to be used by the host machine, + /// the runtime must expose an internal port. For e.g. assume the runtime has an internal port 9092, + /// and the `exposed_port` is set to 9092, then the host machine can get a mapped external port with + /// `container.get_host_port_ipv4(exposed_port)`. With the mapped port, the host machine could connect with the runtime. + fn expose_ports(&self) -> Vec { + vec![KAFKA_ADVERTISED_LISTENER_PORT] + } + + /// Specifies a collection of commands to be executed when the container is started. + fn exec_after_start(&self, cs: ContainerState) -> Vec { + let mut commands = vec![]; + let cmd = format!( + "kafka-configs --alter --bootstrap-server 0.0.0.0:{} --entity-type brokers --entity-name 1 --add-config advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]", + KAFKA_LISTENER_PORT, + cs.host_port_ipv4(KAFKA_ADVERTISED_LISTENER_PORT), + ); + let ready_conditions = vec![WaitFor::message_on_stdout( + "Checking need to trigger auto leader balancing", + )]; + commands.push(ExecCommand { + cmd, + ready_conditions, + }); + commands + } +} + +fn build_env_vars() -> HashMap { + [ + ( + "KAFKA_ZOOKEEPER_CONNECT".to_string(), + format!("localhost:{ZOOKEEPER_PORT}"), + ), + ( + "KAFKA_LISTENERS".to_string(), + format!("PLAINTEXT://0.0.0.0:{KAFKA_ADVERTISED_LISTENER_PORT},BROKER://0.0.0.0:{KAFKA_LISTENER_PORT}"), + ), + ( + "KAFKA_ADVERTISED_LISTENERS".to_string(), + format!("PLAINTEXT://localhost:{KAFKA_ADVERTISED_LISTENER_PORT},BROKER://localhost:{KAFKA_LISTENER_PORT}",), + ), + ( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_string(), + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_string(), + ), + ( + "KAFKA_INTER_BROKER_LISTENER_NAME".to_string(), + "BROKER".to_string(), + ), + ("KAFKA_BROKER_ID".to_string(), "1".to_string()), + ( + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_string(), + "1".to_string(), + ), + ] + .into() +} + +#[cfg(test)] +mod tests { + use rskafka::chrono::{TimeZone, Utc}; + use rskafka::client::partition::UnknownTopicHandling; + use rskafka::client::ClientBuilder; + use rskafka::record::Record; + use testcontainers::clients::Cli as DockerCli; + + use super::*; + + #[tokio::test] + async fn test_image() { + // Starts a Kafka container. + let port = KAFKA_ADVERTISED_LISTENER_PORT; + let docker = DockerCli::default(); + let container = docker.run(Image::default()); + + // Creates a Kafka client. + let broker_endpoints = vec![format!("127.0.0.1:{}", container.get_host_port_ipv4(port))]; + let client = ClientBuilder::new(broker_endpoints).build().await.unwrap(); + + // Creates a topic. + let topic = "test_topic"; + client + .controller_client() + .unwrap() + .create_topic(topic, 1, 1, 500) + .await + .unwrap(); + + // Produces a record. + let partition_client = client + .partition_client(topic, 0, UnknownTopicHandling::Error) + .await + .unwrap(); + let produced = vec![Record { + key: Some(b"111".to_vec()), + value: Some(b"222".to_vec()), + timestamp: Utc.timestamp_millis_opt(42).unwrap(), + headers: Default::default(), + }]; + let offset = partition_client + .produce(produced.clone(), Default::default()) + .await + .unwrap()[0]; + + // Consumes the record. + let consumed = partition_client + .fetch_records(offset, 1..4096, 500) + .await + .unwrap() + .0; + assert_eq!(produced[0], consumed[0].record); + } +} From 96ea2e740e5f7ced5012ba44bf0efba1ce6584b7 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 9 Jan 2024 11:35:25 +0800 Subject: [PATCH 2/2] chore: move kafka image to tests-integration --- Cargo.lock | 5 ++--- src/common/test-util/Cargo.toml | 3 --- src/common/test-util/src/lib.rs | 1 - src/common/test-util/src/wal.rs | 15 --------------- src/common/test-util/src/wal/kafka.rs | 15 --------------- tests-integration/Cargo.toml | 2 ++ tests-integration/src/test_util.rs | 2 ++ .../src/test_util/kafka_image.rs | 0 8 files changed, 6 insertions(+), 37 deletions(-) delete mode 100644 src/common/test-util/src/wal.rs delete mode 100644 src/common/test-util/src/wal/kafka.rs rename src/common/test-util/src/wal/kafka/image.rs => tests-integration/src/test_util/kafka_image.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index db5678a1e986..8989d34cc341 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,10 +1986,7 @@ version = "0.5.0" dependencies = [ "once_cell", "rand", - "rskafka", "tempfile", - "testcontainers", - "tokio", ] [[package]] @@ -9596,6 +9593,7 @@ dependencies = [ "prost 0.12.3", "query", "rand", + "rskafka", "rstest", "rstest_reuse", "script", @@ -9611,6 +9609,7 @@ dependencies = [ "substrait 0.5.0", "table", "tempfile", + "testcontainers", "time", "tokio", "tokio-postgres", diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index e3caa859d21f..60e854740643 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -7,7 +7,4 @@ license.workspace = true [dependencies] once_cell.workspace = true rand.workspace = true -rskafka.workspace = true tempfile.workspace = true -testcontainers = "0.15.0" -tokio.workspace = true diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index f4a9c747e518..ef6ff4696861 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -20,7 +20,6 @@ use std::sync::LazyLock; pub mod ports; pub mod temp_dir; -pub mod wal; // Rust is working on an env possibly named `CARGO_WORKSPACE_DIR` to find the root path to the // workspace, see https://github.com/rust-lang/cargo/issues/3946. diff --git a/src/common/test-util/src/wal.rs b/src/common/test-util/src/wal.rs deleted file mode 100644 index 28c04633b640..000000000000 --- a/src/common/test-util/src/wal.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod kafka; diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs deleted file mode 100644 index 4b00725547b1..000000000000 --- a/src/common/test-util/src/wal/kafka.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod image; diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1ba6f8e05d36..157c19a2f4d8 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -62,6 +62,7 @@ sqlx = { version = "0.6", features = [ substrait.workspace = true table.workspace = true tempfile.workspace = true +testcontainers = "0.15.0" time = "0.3" tokio.workspace = true tonic.workspace = true @@ -77,6 +78,7 @@ opentelemetry-proto.workspace = true partition.workspace = true paste.workspace = true prost.workspace = true +rskafka.workspace = true script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 04e31d91ca3f..46393762f8e2 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod kafka_image; + use std::env; use std::fmt::Display; use std::net::SocketAddr; diff --git a/src/common/test-util/src/wal/kafka/image.rs b/tests-integration/src/test_util/kafka_image.rs similarity index 100% rename from src/common/test-util/src/wal/kafka/image.rs rename to tests-integration/src/test_util/kafka_image.rs