Skip to content

Commit

Permalink
Merge pull request #503 from Luap99/tcp-timeout
Browse files Browse the repository at this point in the history
coredns: work on tcp requests concurrently
  • Loading branch information
openshift-merge-bot[bot] authored Sep 2, 2024
2 parents 81fd123 + 39d0043 commit 07ece81
Showing 1 changed file with 101 additions and 44 deletions.
145 changes: 101 additions & 44 deletions src/dns/coredns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::io::Error;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::net::UdpSocket;

Expand All @@ -33,9 +34,14 @@ use tokio::net::UdpSocket;
const CONTAINER_TTL: u32 = 60;

pub struct CoreDns {
rx: flume::Receiver<()>, // kill switch receiver
inner: CoreDnsData,
}

#[derive(Clone)]
struct CoreDnsData {
network_name: String, // raw network name
backend: &'static ArcSwap<DNSBackend>, // server's data store
rx: flume::Receiver<()>, // kill switch receiver
no_proxy: bool, // do not forward to external resolvers
nameservers: Arc<Mutex<Vec<ScopedIp>>>, // host nameservers from resolv.conf
}
Expand All @@ -56,11 +62,13 @@ impl CoreDns {
nameservers: Arc<Mutex<Vec<ScopedIp>>>,
) -> Self {
CoreDns {
network_name,
backend,
rx,
no_proxy,
nameservers,
inner: CoreDnsData {
network_name,
backend,
no_proxy,
nameservers,
},
}
}

Expand All @@ -86,12 +94,12 @@ impl CoreDns {
continue;
}
};
self.process_message(msg_received, &sender_original, Protocol::Udp);
Self::process_message(&self.inner, msg_received, &sender_original, Protocol::Udp).await;
},
res = tcp_listener.accept() => {
match res {
Ok((sock,addr)) => {
self.process_tcp_stream(sock, addr).await
tokio::spawn(Self::process_tcp_stream(self.inner.clone(), sock, addr));
}
Err(e) => {
error!("Failed to accept new tcp connection: {e}");
Expand All @@ -104,17 +112,34 @@ impl CoreDns {
Ok(())
}

async fn process_tcp_stream(&self, stream: tokio::net::TcpStream, peer: SocketAddr) {
async fn process_tcp_stream(
data: CoreDnsData,
stream: tokio::net::TcpStream,
peer: SocketAddr,
) {
let (mut hickory_stream, sender_original) =
TcpStream::from_stream(AsyncIoTokioAsStd(stream), peer);

while let Some(message) = hickory_stream.next().await {
self.process_message(message, &sender_original, Protocol::Tcp)
// It is possible for a client to keep the tcp socket open forever and never send any data,
// we do not want this so add a 3s timeout then we close the socket.
match tokio::time::timeout(Duration::from_secs(3), hickory_stream.next()).await {
Ok(message) => {
if let Some(msg) = message {
Self::process_message(&data, msg, &sender_original, Protocol::Tcp).await;
// The API is a bit strange, first time we call next we get the message,
// but we must call again to send our reply back
hickory_stream.next().await;
}
}
Err(_) => debug!(
"Tcp connection {} was cancelled after 3s as it took to long to receive message",
peer
),
}
}

fn process_message(
&self,
async fn process_message(
data: &CoreDnsData,
msg_received: Result<SerialMessage, Error>,
sender_original: &BufDnsStreamHandle,
proto: Protocol,
Expand All @@ -126,7 +151,7 @@ impl CoreDns {
return;
}
};
let backend = self.backend.load();
let backend = data.backend.load();
let src_address = msg.addr();
let mut sender = sender_original.with_remote_addr(src_address);
let (request_name, record_type, mut req) = match parse_dns_msg(msg) {
Expand All @@ -139,7 +164,7 @@ impl CoreDns {
let request_name_string = request_name.to_string();

// Create debug and trace info for key parameters.
trace!("server network name: {:?}", self.network_name);
trace!("server network name: {:?}", data.network_name);
debug!("request source address: {:?}", src_address);
trace!("requested record type: {:?}", record_type);
debug!(
Expand All @@ -161,7 +186,7 @@ impl CoreDns {
if let Some(msg) = reply_ip(
&request_name_string,
&request_name,
&self.network_name,
&data.network_name,
record_type,
&backend,
src_address,
Expand All @@ -181,7 +206,7 @@ impl CoreDns {
};

// are we allowed to forward?
if self.no_proxy
if data.no_proxy
|| backend.ctr_is_internal(&src_address.ip())
|| request_name_string.ends_with(&backend.search_domain)
|| request_name_string.matches('.').count() == 1
Expand Down Expand Up @@ -210,41 +235,73 @@ impl CoreDns {
}
// Use host resolvers if no custom resolvers are set for the container.
if nameservers.is_empty() {
nameservers.clone_from(&self.nameservers.lock().expect("lock nameservers"));
nameservers.clone_from(&data.nameservers.lock().expect("lock nameservers"));
}

tokio::spawn(async move {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = AsyncClient::connect(stream).await?;
let handle = tokio::spawn(bg);
(cl, handle)
match proto {
Protocol::Udp => {
tokio::spawn(Self::forward_to_servers(
nameservers,
sender,
src_address,
req,
proto,
));
}
Protocol::Tcp => {
// we already spawned a new future when we read the message so there is no need to spawn another one
Self::forward_to_servers(nameservers, sender, src_address, req, proto).await;
}
}
}
}

async fn forward_to_servers(
nameservers: Vec<ScopedIp>,
mut sender: BufDnsStreamHandle,
src_address: SocketAddr,
req: Message,
proto: Protocol,
) {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = match AsyncClient::connect(stream).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
Protocol::Tcp => {
let (stream, sender) = TcpClientStream::<
AsyncIoTokioAsStd<tokio::net::TcpStream>,
>::new(addr);
let (cl, bg) = AsyncClient::new(stream, sender, None).await?;
let handle = tokio::spawn(bg);
(cl, handle)
};
let handle = tokio::spawn(bg);
(cl, handle)
}
Protocol::Tcp => {
let (stream, sender) =
TcpClientStream::<AsyncIoTokioAsStd<tokio::net::TcpStream>>::new(addr);
let (cl, bg) = match AsyncClient::new(stream, sender, None).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
};
let handle = tokio::spawn(bg);
(cl, handle)
}
};

if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
}
drop(handle);
if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
Ok::<(), std::io::Error>(())
});
}
handle.abort();
}
}
}
Expand Down

0 comments on commit 07ece81

Please sign in to comment.