Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache DNS responses for IpaClients #1496

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions ipa-core/src/net/client/dns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{
future::Future,
io, iter, mem,
net::SocketAddr,
pin::Pin,
str::FromStr,
task::{Context, Poll},
};

use dashmap::DashMap;
use hyper_util::client::legacy::connect::dns::{GaiFuture, GaiResolver, Name};
use pin_project::pin_project;
use tower::Service;

use crate::sync::Arc;

/// Wrapper around Hyper's [`GaiResolver`] to cache the DNS response.
/// The nature of IPA dictates clients talking to a very limited set of services.
/// Assuming there are M shards in the system:
/// - Report collector talks to each shard on every helper (3*M)
/// - Shard talks to each other shard and to the shard with the same index on two other helpers (M+2)
/// So the memory usage is proportional to the number of shards. We need to cache the response,
/// because there is no other caching layer in the system:
/// - Hyper uses [`GaiResolver`] that is basically just a call to libc `getaddrinfo`
/// - Linux by default does not have any OS-level DNS caching enabled. There is [`nscd`], but
/// it is disabled by default and is claimed to be broken on some distributions [`issue`].
///
/// Given these constraints, it is probably much simpler to cache the DNS response in application
/// layer. With the model where each instance just runs a single query it should simplify things
/// by a lot.
///
/// This struct does exactly that.
///
/// [`nscd`]: https://linux.die.net/man/8/nscd
/// [`issue`]: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=335476
#[derive(Clone)]
pub(super) struct CachingGaiResolver {
cache: Arc<DashMap<Name, Vec<SocketAddr>>>,
resolver: GaiResolver,
}

impl CachingGaiResolver {
pub fn new() -> Self {
Self::seeded(iter::empty())
}

pub fn seeded<'a, I: IntoIterator<Item = (&'a str, &'a str)>>(items: I) -> Self {
let cache = DashMap::default();
for (name, addr) in items {
cache.insert(
Name::from_str(name)
.unwrap_or_else(|_| panic!("{name} is not a valid domain name")),
vec![addr.parse().unwrap()],
);
}
Self {
cache: Arc::new(cache),
resolver: GaiResolver::new(),
}
}
}

#[derive(Default)]
pub struct IpAddresses {
iter: std::vec::IntoIter<SocketAddr>,
}

impl Iterator for IpAddresses {
type Item = SocketAddr;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

#[pin_project(project = ResolvingFutureEnumProj)]
pub enum ResolvingFuture {
Ready(IpAddresses),
Pending(#[pin] GaiFuture, Name, Arc<DashMap<Name, Vec<SocketAddr>>>),
}

impl Future for ResolvingFuture {
type Output = Result<IpAddresses, io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
ResolvingFutureEnumProj::Ready(addr) => Poll::Ready(Ok(mem::take(addr))),
ResolvingFutureEnumProj::Pending(fut, name, cache) => {
let res = match fut.poll(cx) {
Poll::Ready(Ok(addr)) => addr,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let addrs = res.collect::<Vec<_>>();
// This should probably be a trace span, once we have full confidence
// that this module works fine
tracing::info!("caching IP addresses for {name}: {addrs:?}");
assert!(
cache.insert(name.clone(), addrs.clone()).is_none(),
"{name} is in the cache already"
);
Poll::Ready(Ok(IpAddresses {
iter: addrs.into_iter(),
}))
}
}
}
}

impl Service<Name> for CachingGaiResolver {
type Response = IpAddresses;
type Error = io::Error;
type Future = ResolvingFuture;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Name) -> Self::Future {
if let Some(addr) = self.cache.get(&req) {
ResolvingFuture::Ready(IpAddresses {
iter: addr.clone().into_iter(),
})
} else {
let fut = self.resolver.call(req.clone());
ResolvingFuture::Pending(fut, req, Arc::clone(&self.cache))
}
}
}

#[cfg(all(test, unit_test))]
mod tests {
use std::str::FromStr;

use hyper_util::client::legacy::connect::dns::Name;
use tower::Service;

use crate::{
net::client::dns::{IpAddresses, ResolvingFuture},
test_executor::run,
};

#[test]
fn cache_data() {
let name = Name::from_str("www.notadomain.com").unwrap();
let mut resolver =
super::CachingGaiResolver::seeded([(name.as_str(), "172.20.0.254:8000")]);
let fut = resolver.call(name);
let res = futures::executor::block_on(fut)
.unwrap()
.map(|v| v.to_string())
.collect::<Vec<_>>();
assert_eq!(vec!["172.20.0.254:8000"], res);
}

#[test]
fn calls_real_resolver() {
fn assert_localhost_present(input: IpAddresses) {
let input = input.into_iter().map(|v| v.to_string()).collect::<Vec<_>>();
assert!(
input.contains(&"127.0.0.1:0".to_string()),
"{input:?} does not include localhost"
);
}

run(|| async move {
let name = Name::from_str("localhost").unwrap();
let mut resolver = super::CachingGaiResolver::new();
let res = resolver.call(name.clone()).await.unwrap();
assert_localhost_present(res);

// call again and make sure it is ready
let fut = resolver.call(name.clone());
if let ResolvingFuture::Ready(ip_addrs) = fut {
assert_localhost_present(ip_addrs);
} else {
panic!("{name} hasn't been cached");
}
});
}
}
17 changes: 12 additions & 5 deletions ipa-core/src/net/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod dns;

use std::{
collections::HashMap,
future::Future,
Expand Down Expand Up @@ -36,10 +38,15 @@ use crate::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
TransportIdentity,
},
net::{error::ShardQueryStatusMismatchError, http_serde, Error, CRYPTO_PROVIDER},
net::{
client::dns::CachingGaiResolver, error::ShardQueryStatusMismatchError, http_serde, Error,
CRYPTO_PROVIDER,
},
protocol::{Gate, QueryId},
};

type DnsResolver = CachingGaiResolver;

#[derive(Default)]
pub enum ClientIdentity<F: ConnectionFlavor = Helper> {
/// Claim the specified helper identity without any additional authentication.
Expand Down Expand Up @@ -179,7 +186,7 @@ async fn response_to_bytes(resp: ResponseFromEndpoint) -> Result<Bytes, Error> {
/// client can be configured to talk to all three helpers.
#[derive(Debug, Clone)]
pub struct IpaHttpClient<F: ConnectionFlavor> {
client: Client<HttpsConnector<HttpConnector>, Body>,
client: Client<HttpsConnector<HttpConnector<DnsResolver>>, Body>,
scheme: uri::Scheme,
authority: uri::Authority,
auth_header: Option<(HeaderName, HeaderValue)>,
Expand Down Expand Up @@ -277,7 +284,7 @@ impl<F: ConnectionFlavor> IpaHttpClient<F> {
fn new_internal<C: HyperClientConfigurator>(
runtime: IpaRuntime,
addr: Uri,
connector: HttpsConnector<HttpConnector>,
connector: HttpsConnector<HttpConnector<DnsResolver>>,
auth_header: Option<(HeaderName, HeaderValue)>,
conf: &C,
) -> Self {
Expand Down Expand Up @@ -536,8 +543,8 @@ impl IpaHttpClient<Shard> {
}
}

fn make_http_connector() -> HttpConnector {
let mut connector = HttpConnector::new();
fn make_http_connector() -> HttpConnector<DnsResolver> {
let mut connector = HttpConnector::new_with_resolver(DnsResolver::new());
// IPA uses HTTP2 and it is sensitive to those delays especially in high-latency network
// configurations.
connector.set_nodelay(true);
Expand Down
Loading