Skip to content

Commit

Permalink
Remove intproxy dns cache (metalbear-co#2308)
Browse files Browse the repository at this point in the history
* Removed cache

* Changelog entry
  • Loading branch information
Razz4780 authored Mar 15, 2024
1 parent 60968f6 commit bf4a50b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 200 deletions.
1 change: 1 addition & 0 deletions changelog.d/+dns-cache.removed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed problematic DNS cache from internal proxy.
208 changes: 8 additions & 200 deletions mirrord/intproxy/src/proxies/simple.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! The most basic proxying logic. Handles cases when the only job to do in the internal proxy is to
//! pass requests and responses between the layer and the agent.
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;

use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage};
use mirrord_protocol::{
dns::{DnsLookup, GetAddrInfoRequest, GetAddrInfoResponse},
dns::{GetAddrInfoRequest, GetAddrInfoResponse},
file::{CloseDirRequest, CloseFileRequest, OpenDirResponse, OpenFileResponse},
ClientMessage, FileRequest, FileResponse, GetEnvVarsRequest, RemoteResult,
};
Expand Down Expand Up @@ -47,8 +47,6 @@ pub struct SimpleProxy {
addr_info_reqs: RequestQueue,
/// For [`GetEnvVarsRequest`]s.
get_env_reqs: RequestQueue,
/// Cache for resolved domain names.
addr_info_cache: AddrInfoCache,
}

impl BackgroundTask for SimpleProxy {
Expand Down Expand Up @@ -134,31 +132,14 @@ impl BackgroundTask for SimpleProxy {
.await;
}
SimpleProxyMessage::AddrInfoReq(message_id, session_id, req) => {
if let Some(cached_response) = self.addr_info_cache.get_cached(&req.node) {
message_bus
.send(ProxyMessage::ToLayer(ToLayer {
message_id,
layer_id: session_id,
message: ProxyToLayerMessage::GetAddrInfo(GetAddrInfoResponse(Ok(
cached_response,
))),
}))
.await;
} else {
self.addr_info_reqs.insert(message_id, session_id);
self.addr_info_cache.request_sent(req.node.clone());
message_bus
.send(ProxyMessage::ToAgent(ClientMessage::GetAddrInfoRequest(
req,
)))
.await;
}
self.addr_info_reqs.insert(message_id, session_id);
message_bus
.send(ProxyMessage::ToAgent(ClientMessage::GetAddrInfoRequest(
req,
)))
.await;
}
SimpleProxyMessage::AddrInfoRes(res) => {
if let Ok(lookup) = res.0.as_ref() {
self.addr_info_cache.response_came(lookup.clone());
}

let (message_id, layer_id) = self.addr_info_reqs.get()?;
message_bus
.send(ToLayer {
Expand Down Expand Up @@ -206,176 +187,3 @@ impl BackgroundTask for SimpleProxy {
Ok(())
}
}

/// Cache for successful [`DnsLookup`]s done by the agent.
/// DNS mappings should not change very often, so it should be safe to cache these for the time of
/// one mirrord session.
#[derive(Default)]
struct AddrInfoCache {
/// [`GetAddrInfoRequest::node`]s from the DNS requests that still require a response from the
/// agent.
outstanding_requests: VecDeque<String>,
/// Agent's successful responses to previous requests.
responses: HashMap<String, DnsLookup>,
}

impl AddrInfoCache {
/// Notifies this cache that a new [`GetAddrInfoRequest`] has been sent.
fn request_sent(&mut self, host: String) {
self.outstanding_requests.push_back(host);
}

/// Notifies this cache that the agent has responded with success to a [`GetAddrInfoRequest`].
/// The given [`DnsLookup`] is matched against the `host` from the oldest incomplete
/// [`GetAddrInfoRequest`] this struct knows of.
///
/// # Panic
///
/// This method panics if this struct does not know of any incomplete [`GetAddrInfoRequest`].
fn response_came(&mut self, lookup: DnsLookup) {
let host = self
.outstanding_requests
.pop_front()
.expect("received too many GetAddrInfoResponses");

self.responses.insert(host, lookup);
}

/// Returns a cached response for the given `host`.
#[tracing::instrument(level = "trace", name = "get_cached_dns_response", skip(self), ret)]
fn get_cached(&mut self, host: &str) -> Option<DnsLookup> {
self.responses.get(host).cloned()
}
}

#[cfg(test)]
mod test {
use core::fmt;
use std::net::IpAddr;

use mirrord_protocol::dns::LookupRecord;

use super::*;
use crate::background_tasks::{BackgroundTasks, TaskUpdate};

fn assert_is_dns_req<E: fmt::Debug>(update: TaskUpdate<ProxyMessage, E>, expected_node: &str) {
match update {
TaskUpdate::Message(ProxyMessage::ToAgent(ClientMessage::GetAddrInfoRequest(
GetAddrInfoRequest { node },
))) => {
assert_eq!(node, expected_node);
}
other => panic!("unexpected request: {other:?}"),
}
}

fn assert_is_dns_res<E: fmt::Debug>(
update: TaskUpdate<ProxyMessage, E>,
expected_ip: IpAddr,
expected_message_id: u64,
expected_layer_id: LayerId,
) {
match update {
TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer {
message_id,
layer_id,
message: ProxyToLayerMessage::GetAddrInfo(res),
})) => {
assert_eq!(res.0.unwrap().0.first().unwrap().ip, expected_ip,);
assert_eq!(message_id, expected_message_id);
assert_eq!(layer_id, expected_layer_id);
}
other => panic!("unexpected message {other:?}"),
}
}

#[tokio::test]
async fn dns_cache() {
let mut bg_tasks: BackgroundTasks<(), ProxyMessage, RequestQueueEmpty> =
BackgroundTasks::default();
let tx = bg_tasks.register(SimpleProxy::default(), (), 16);

// Simulate two concurrent DNS requests from the user app.
tx.send(SimpleProxyMessage::AddrInfoReq(
0,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
tx.send(SimpleProxyMessage::AddrInfoReq(
1,
LayerId(0),
GetAddrInfoRequest {
node: "node2".to_string(),
},
))
.await;

// Check that the proxy sent requests to the agent.
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_req(update, "node1");
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_req(update, "node2");

// Simulate agent responses.
tx.send(SimpleProxyMessage::AddrInfoRes(GetAddrInfoResponse(Ok(
DnsLookup(vec![LookupRecord {
name: "node1.some.name".to_string(),
ip: "1.1.1.1".parse().unwrap(),
}]),
))))
.await;
tx.send(SimpleProxyMessage::AddrInfoRes(GetAddrInfoResponse(Ok(
DnsLookup(vec![LookupRecord {
name: "node2.some.name".to_string(),
ip: "2.2.2.2".parse().unwrap(),
}]),
))))
.await;

// Check that the proxy sent responses to the user app.
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 0, LayerId(0));
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "2.2.2.2".parse().unwrap(), 1, LayerId(0));

// Simulate three DNS requests from the user app.
// The requested names where already resolved before,
// so the proxy should use the cache and not sent any request to the agent.

tx.send(SimpleProxyMessage::AddrInfoReq(
3,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 3, LayerId(0));

tx.send(SimpleProxyMessage::AddrInfoReq(
4,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 4, LayerId(0));

tx.send(SimpleProxyMessage::AddrInfoReq(
5,
LayerId(0),
GetAddrInfoRequest {
node: "node2".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "2.2.2.2".parse().unwrap(), 5, LayerId(0));
}
}

0 comments on commit bf4a50b

Please sign in to comment.