Skip to content

Commit

Permalink
feat: Support automatic DNS lookup for kafka bootstrap servers (#3379)
Browse files Browse the repository at this point in the history
* feat: Support automatic DNS lookup for kafka bootstrap servers

* Revert "feat: Support automatic DNS lookup for kafka bootstrap servers"

This reverts commit 5baed7b.

* feat: Support automatic DNS lookup for Kafka broker

* fix: resolve broker endpoint in client manager

* fix: apply clippy lints

* refactor: slimplify the code with clippy hint

* refactor: move resolve_broker_endpoint to common/wal/src/lib.rs

* test: add mock test for resolver_broker_endpoint

* refactor: accept niebayes's advice

* refactor: rename EndpointIpNotFound to EndpointIPV4NotFound

* refactor: remove mock test and simplify the implementation

* docs: add comments about test_vallid_host_ipv6

* Apply suggestions from code review

Co-authored-by: niebayes <[email protected]>

* move more common code

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
Co-authored-by: tison <[email protected]>
Co-authored-by: niebayes <[email protected]>
  • Loading branch information
3 people authored Feb 29, 2024
1 parent a3533c4 commit a500252
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 4 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
Expand Down Expand Up @@ -425,6 +428,7 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, Result,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand Down Expand Up @@ -117,7 +117,10 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand Down
4 changes: 4 additions & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ workspace = true

[dependencies]
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
tokio.workspace = true

[dev-dependencies]
serde_json.workspace = true
Expand Down
33 changes: 33 additions & 0 deletions src/common/wal/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_macro::stack_trace_debug;
use snafu::Snafu;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to resolve endpoint {:?}", broker_endpoint))]
ResolveEndpoint {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))]
EndpointIPV4NotFound { broker_endpoint: String },
}

pub type Result<T> = std::result::Result<T, Error>;
57 changes: 57 additions & 0 deletions src/common/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(assert_matches)]

use std::net::SocketAddr;

use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::net;

pub mod config;
pub mod error;
pub mod options;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand All @@ -30,3 +38,52 @@ pub enum TopicSelectorType {
#[default]
RoundRobin,
}

pub async fn resolve_to_ipv4<T: AsRef<str>>(endpoints: &[T]) -> Result<Vec<String>> {
futures_util::future::try_join_all(endpoints.iter().map(resolve_to_ipv4_one)).await
}

async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
let endpoint = endpoint.as_ref();
net::lookup_host(endpoint)
.await
.context(ResolveEndpointSnafu {
broker_endpoint: endpoint,
})?
.find(SocketAddr::is_ipv4)
.map(|addr| addr.to_string())
.context(EndpointIPV4NotFoundSnafu {
broker_endpoint: endpoint,
})
}

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

use super::*;
use crate::error::Error;

// test for resolve_broker_endpoint
#[tokio::test]
async fn test_valid_host() {
let host = "localhost:9092";
let got = resolve_to_ipv4_one(host).await;
assert_eq!(got.unwrap(), "127.0.0.1:9092");
}

#[tokio::test]
async fn test_valid_host_ipv6() {
// the host is valid, it is an IPv6 address, but we only accept IPv4 addresses
let host = "::1:9092";
let got = resolve_to_ipv4_one(host).await;
assert_matches!(got.unwrap_err(), Error::EndpointIPV4NotFound { .. });
}

#[tokio::test]
async fn test_invalid_host() {
let host = "non-exist-host:9092";
let got = resolve_to_ipv4_one(host).await;
assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
}
}
3 changes: 3 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
Expand Down
9 changes: 7 additions & 2 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::RwLock;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result,
};

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
Expand Down Expand Up @@ -80,7 +82,10 @@ impl ClientManager {
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
let client = ClientBuilder::new(config.broker_endpoints.clone())
let broker_endpoints = common_wal::resolve_to_ipv4(&config.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
.build()
.await
Expand Down

0 comments on commit a500252

Please sign in to comment.