Skip to content

Commit

Permalink
Test + tracing (metalbear-co#2304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 authored Mar 14, 2024
1 parent d887558 commit ed89787
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelog.d/+dns-cache-test.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a unit test to internal proxy DNS cache. Added more tracing to internal proxy.
11 changes: 8 additions & 3 deletions mirrord/intproxy/src/agent_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ impl AgentConnection {

Ok(Self { agent_tx, agent_rx })
}

#[tracing::instrument(level = "trace", name = "send_agent_message", skip(self), ret)]
async fn send(&self, msg: ClientMessage) -> Result<(), AgentChannelError> {
self.agent_tx.send(msg).await.map_err(|_| AgentChannelError)
}
}

/// This error occurs when the [`AgentConnection`] fails to communicate with the inner
Expand All @@ -133,9 +138,9 @@ impl BackgroundTask for AgentConnection {
break Ok(());
},
Some(msg) => {
if self.agent_tx.send(msg).await.is_err() {
tracing::error!("failed to send message to the agent, inner task down");
break Err(AgentChannelError);
if let Err(error) = self.send(msg).await {
tracing::error!(%error, "failed to send message to the agent");
break Err(error);
}
}
},
Expand Down
1 change: 1 addition & 0 deletions mirrord/intproxy/src/layer_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl LayerConnection {
}
}

#[tracing::instrument(level = "trace", name = "send_layer_message", skip(self), fields(layer_id = self.layer_id.0), ret)]
async fn send_and_flush(
&mut self,
msg: &LocalMessage<ProxyToLayerMessage>,
Expand Down
133 changes: 133 additions & 0 deletions mirrord/intproxy/src/proxies/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,140 @@ impl AddrInfoCache {
}

/// Returns a cached response for the given `host`.
#[tracing::instrument(level = "trace", name = "get_cached_dns_response", skip(self), ret)]
fn get_cached(&mut self, host: &str) -> Option<DnsLookup> {
self.responses.get(host).cloned()
}
}

#[cfg(test)]
mod test {
use core::fmt;
use std::net::IpAddr;

use mirrord_protocol::dns::LookupRecord;

use super::*;
use crate::background_tasks::{BackgroundTasks, TaskUpdate};

fn assert_is_dns_req<E: fmt::Debug>(update: TaskUpdate<ProxyMessage, E>, expected_node: &str) {
match update {
TaskUpdate::Message(ProxyMessage::ToAgent(ClientMessage::GetAddrInfoRequest(
GetAddrInfoRequest { node },
))) => {
assert_eq!(node, expected_node);
}
other => panic!("unexpected request: {other:?}"),
}
}

fn assert_is_dns_res<E: fmt::Debug>(
update: TaskUpdate<ProxyMessage, E>,
expected_ip: IpAddr,
expected_message_id: u64,
expected_layer_id: LayerId,
) {
match update {
TaskUpdate::Message(ProxyMessage::ToLayer(ToLayer {
message_id,
layer_id,
message: ProxyToLayerMessage::GetAddrInfo(res),
})) => {
assert_eq!(res.0.unwrap().0.first().unwrap().ip, expected_ip,);
assert_eq!(message_id, expected_message_id);
assert_eq!(layer_id, expected_layer_id);
}
other => panic!("unexpected message {other:?}"),
}
}

#[tokio::test]
async fn dns_cache() {
let mut bg_tasks: BackgroundTasks<(), ProxyMessage, RequestQueueEmpty> =
BackgroundTasks::default();
let tx = bg_tasks.register(SimpleProxy::default(), (), 16);

// Simulate two concurrent DNS requests from the user app.
tx.send(SimpleProxyMessage::AddrInfoReq(
0,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
tx.send(SimpleProxyMessage::AddrInfoReq(
1,
LayerId(0),
GetAddrInfoRequest {
node: "node2".to_string(),
},
))
.await;

// Check that the proxy sent requests to the agent.
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_req(update, "node1");
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_req(update, "node2");

// Simulate agent responses.
tx.send(SimpleProxyMessage::AddrInfoRes(GetAddrInfoResponse(Ok(
DnsLookup(vec![LookupRecord {
name: "node1.some.name".to_string(),
ip: "1.1.1.1".parse().unwrap(),
}]),
))))
.await;
tx.send(SimpleProxyMessage::AddrInfoRes(GetAddrInfoResponse(Ok(
DnsLookup(vec![LookupRecord {
name: "node2.some.name".to_string(),
ip: "2.2.2.2".parse().unwrap(),
}]),
))))
.await;

// Check that the proxy sent responses to the user app.
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 0, LayerId(0));
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "2.2.2.2".parse().unwrap(), 1, LayerId(0));

// Simulate three DNS requests from the user app.
// The requested names where already resolved before,
// so the proxy should use the cache and not sent any request to the agent.

tx.send(SimpleProxyMessage::AddrInfoReq(
3,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 3, LayerId(0));

tx.send(SimpleProxyMessage::AddrInfoReq(
4,
LayerId(0),
GetAddrInfoRequest {
node: "node1".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "1.1.1.1".parse().unwrap(), 4, LayerId(0));

tx.send(SimpleProxyMessage::AddrInfoReq(
5,
LayerId(0),
GetAddrInfoRequest {
node: "node2".to_string(),
},
))
.await;
let ((), update) = bg_tasks.next().await.unwrap();
assert_is_dns_res(update, "2.2.2.2".parse().unwrap(), 5, LayerId(0));
}
}

0 comments on commit ed89787

Please sign in to comment.