diff --git a/Cargo.lock b/Cargo.lock index c23e83d19aa8..0839b0595bb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2109,6 +2109,8 @@ name = "common-wal" version = "0.6.0" dependencies = [ "common-base", + "common-error", + "common-macro", "common-telemetry", "futures-util", "humantime-serde", @@ -2116,6 +2118,8 @@ dependencies = [ "serde", "serde_json", "serde_with", + "snafu", + "tokio", "toml 0.8.8", ] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 32af562e30f8..cfe1cbf65602 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -340,6 +340,9 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display("Failed to resolve Kafka broker endpoint."))] + ResolveKafkaEndpoint { source: common_wal::error::Error }, + #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { location: Location, @@ -425,6 +428,7 @@ impl ErrorExt for Error { | BuildKafkaClient { .. } | BuildKafkaCtrlClient { .. } | BuildKafkaPartitionClient { .. } + | ResolveKafkaEndpoint { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } | EmptyTopicPool { .. } diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 63944be13c05..d6ee3e774600 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu, - ProduceRecordSnafu, Result, + ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, }; use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; @@ -117,7 +117,10 @@ impl TopicManager { base: self.config.backoff.base as f64, deadline: self.config.backoff.deadline, }; - let client = ClientBuilder::new(self.config.broker_endpoints.clone()) + let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; + let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() .await diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 3b84673bb1ee..a39baf438f19 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -12,12 +12,16 @@ workspace = true [dependencies] common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true serde.workspace = true serde_with.workspace = true +snafu.workspace = true +tokio.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs new file mode 100644 index 000000000000..147eeb293da1 --- /dev/null +++ b/src/common/wal/src/error.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_macro::stack_trace_debug; +use snafu::Snafu; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to resolve endpoint {:?}", broker_endpoint))] + ResolveEndpoint { + broker_endpoint: String, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))] + EndpointIPV4NotFound { broker_endpoint: String }, +} + +pub type Result = std::result::Result; diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index 88d67ee3e0bb..086846ab3960 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -12,9 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(assert_matches)] + +use std::net::SocketAddr; + +use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result}; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use tokio::net; pub mod config; +pub mod error; pub mod options; #[cfg(any(test, feature = "testing"))] pub mod test_util; @@ -30,3 +38,52 @@ pub enum TopicSelectorType { #[default] RoundRobin, } + +pub async fn resolve_to_ipv4>(endpoints: &[T]) -> Result> { + futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await +} + +async fn resolve_to_ipv4_one>(endpoint: T) -> Result { + let endpoint = endpoint.as_ref(); + net::lookup_host(endpoint) + .await + .context(ResolveEndpointSnafu { + broker_endpoint: endpoint, + })? + .find(SocketAddr::is_ipv4) + .map(|addr| addr.to_string()) + .context(EndpointIPV4NotFoundSnafu { + broker_endpoint: endpoint, + }) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::error::Error; + + // test for resolve_broker_endpoint + #[tokio::test] + async fn test_valid_host() { + let host = "localhost:9092"; + let got = resolve_to_ipv4_one(host).await; + assert_eq!(got.unwrap(), "127.0.0.1:9092"); + } + + #[tokio::test] + async fn test_valid_host_ipv6() { + // the host is valid, it is an IPv6 address, but we only accept IPv4 addresses + let host = "::1:9092"; + let got = resolve_to_ipv4_one(host).await; + assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. }); + } + + #[tokio::test] + async fn test_invalid_host() { + let host = "non-exist-host:9092"; + let got = resolve_to_ipv4_one(host).await; + assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. }); + } +} diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 0f5beaa16bb6..edb06b42ca7f 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -108,6 +108,9 @@ pub enum Error { error: rskafka::client::error::Error, }, + #[snafu(display("Failed to resolve Kafka broker endpoint."))] + ResolveKafkaEndpoint { source: common_wal::error::Error }, + #[snafu(display( "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 9e082decb851..1708efed1d09 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -24,7 +24,9 @@ use rskafka::BackoffConfig; use snafu::ResultExt; use tokio::sync::RwLock; -use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; +use crate::error::{ + BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, +}; // Each topic only has one partition for now. // The `DEFAULT_PARTITION` refers to the index of the partition. @@ -80,7 +82,10 @@ impl ClientManager { base: config.backoff.base as f64, deadline: config.backoff.deadline, }; - let client = ClientBuilder::new(config.broker_endpoints.clone()) + let broker_endpoints = common_wal::resolve_to_ipv4(&config.broker_endpoints) + .await + .context(ResolveKafkaEndpointSnafu)?; + let client = ClientBuilder::new(broker_endpoints) .backoff_config(backoff_config) .build() .await