Skip to content

Commit

Permalink
Fix incoming TCP with multiple pods (metalbear-co#2079)
Browse files Browse the repository at this point in the history
* Subscription management fixed in IncomingProxy

* Docs

* SubscriptionsManager tests

* Changelog entry
  • Loading branch information
Razz4780 authored Nov 22, 2023
1 parent fa814c2 commit c9979a7
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 84 deletions.
1 change: 1 addition & 0 deletions changelog.d/2078.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issues with mirroring incoming TCP connections when targeting multi-pod deployments.
10 changes: 6 additions & 4 deletions mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ pub enum PortSubscription {
pub struct PortUnsubscribe {
/// Port on the remote pod that layer mirrored.
pub port: Port,
/// Local address on which the layer was listening.
pub listening_on: SocketAddr,
}

/// Messages sent by the internal proxy and handled by the layer.
Expand All @@ -210,9 +212,11 @@ pub enum ProxyToLayerMessage {
#[derive(Encode, Decode, Debug)]
pub enum IncomingResponse {
/// A response to layer's [`PortSubscribe`].
/// As a temporary workaround to [agent protocol](mirrord_protocol) limitations, the only error
/// returned here is
/// [`ResponseError::PortAlreadyStolen`](mirrord_protocol::error::ResponseError::PortAlreadyStolen).
/// Other errors will make the internal proxy terminate.
PortSubscribe(RemoteResult<()>),
/// A response to layer's [`PortUnsubscribe`].
PortUnsubscribe(RemoteResult<()>),
/// A response to layers' [`ConnMetadataRequest`].
ConnMetadata(ConnMetadataResponse),
}
Expand Down Expand Up @@ -383,9 +387,7 @@ impl_request!(

impl_request!(
req = PortUnsubscribe,
res = RemoteResult<()>,
req_path = LayerToProxyMessage::Incoming => IncomingRequest::PortUnsubscribe,
res_path = ProxyToLayerMessage::Incoming => IncomingResponse::PortUnsubscribe,
);

impl_request!(
Expand Down
2 changes: 2 additions & 0 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ impl IntProxy {

/// Routes most messages from the agent to the correct background task.
/// Some messages are handled here.
#[tracing::instrument(level = "trace", skip(self), ret)]
async fn handle_agent_message(&mut self, message: DaemonMessage) -> Result<(), IntProxyError> {
self.task_txs
.ping_pong
Expand Down Expand Up @@ -335,6 +336,7 @@ impl IntProxy {
}

/// Routes a message from the layer to the correct background task.
#[tracing::instrument(level = "trace", skip(self), ret)]
async fn handle_layer_message(&self, message: FromLayer) -> Result<(), IntProxyError> {
let FromLayer {
message_id,
Expand Down
116 changes: 39 additions & 77 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mirrord_intproxy_protocol::{
};
use mirrord_protocol::{
tcp::{DaemonTcp, HttpRequestFallback, HttpResponseFallback, NewTcpConnection},
ConnectionId, Port,
ConnectionId, ResponseError,
};
use thiserror::Error;
use tokio::net::TcpSocket;
Expand All @@ -22,19 +22,19 @@ use self::{
http_interceptor::{HttpInterceptor, HttpInterceptorError},
port_subscription_ext::PortSubscriptionExt,
raw_interceptor::RawInterceptor,
subscriptions::SubscriptionsManager,
};
use crate::{
background_tasks::{BackgroundTask, BackgroundTasks, MessageBus, TaskSender, TaskUpdate},
main_tasks::{LayerClosed, LayerForked, ToLayer},
remote_resources::RemoteResources,
request_queue::{RequestQueue, RequestQueueEmpty},
ProxyMessage,
};

mod http;
mod http_interceptor;
mod port_subscription_ext;
mod raw_interceptor;
mod subscriptions;

/// Common type for errors of the [`RawInterceptor`] and the [`HttpInterceptor`].
#[derive(Error, Debug)]
Expand Down Expand Up @@ -73,16 +73,14 @@ pub enum IncomingProxyError {
/// operate in the `steal` mode. This should never happen.
#[error("received TCP steal message while in mirror mode: {0:?}")]
ReceivedStealMessage(DaemonTcp),
/// The agent sent a response, but the corresponding [`RequestQueue`] was empty.
/// This should never happen.
#[error("{0}")]
RequestQueueEmpty(#[from] RequestQueueEmpty),
/// The agent sent an HTTP request with unsupported [`Version`].
/// [`Version::HTTP_3`] is currently not supported.
#[error("{0:?} is not supported")]
UnsupportedHttpVersion(Version),
#[error("{0}")]
Io(#[from] io::Error),
#[error("subscribing port failed: {0}")]
SubscriptionFailed(ResponseError),
}

/// Messages consumed by [`IncomingProxy`] running as a [`BackgroundTask`].
Expand Down Expand Up @@ -162,12 +160,8 @@ impl MetadataStore {
/// agent. If the agent closes the connection, the proxy shuts down the [`HttpInterceptor`].
#[derive(Default)]
pub struct IncomingProxy {
/// Remote ports subscribed by layers. Allows tracking across layer forks.
remote_subscriptions: RemoteResources<Port>,
/// Mapping from subscribed port on the remote target to layer's [`PortSubscribe`] request.
subscriptions: HashMap<Port, PortSubscribe>,
/// For matching agent's responses with layer's [`PortSubscribe`] requests.
subscribe_reqs: RequestQueue,
/// Active port subscriptions for all layers.
subscriptions: SubscriptionsManager,
/// [`TaskSender`]s for active [`RawInterceptor`]s.
interceptors_raw: HashMap<InterceptorId, InterceptorHandle<RawInterceptor>>,
/// [`TaskSender`]s for active [`HttpInterceptor`]s.
Expand All @@ -183,10 +177,7 @@ impl IncomingProxy {
/// [`BackgroundTasks`] struct.
const CHANNEL_SIZE: usize = 512;

/// Stores subscription request id and sends a corresponding request to the agent.
/// However, if a subscription for the same port already exists, this method does not make any
/// request to the agent. Instead, it immediately responds to the layer and starts
/// redirecting connections to the new listener.
/// Tries to register the new subscription in the [`SubscriptionsManager`].
#[tracing::instrument(level = "trace", skip(self, message_bus))]
async fn handle_port_subscribe(
&mut self,
Expand All @@ -195,46 +186,27 @@ impl IncomingProxy {
subscribe: PortSubscribe,
message_bus: &mut MessageBus<Self>,
) {
if let Some(old_subscription) = self
let msg = self
.subscriptions
.insert(subscribe.subscription.port(), subscribe.clone())
{
// Since this struct identifies listening sockets by their port (#1558), we can only
// forward the incoming traffic to one socket with that port.
tracing::info!(
"Received layer subscription message for port {}, listening on {}, while already listening on {}. Sending all incoming traffic to new socket.",
subscribe.subscription.port(),
subscribe.listening_on,
old_subscription.listening_on,
);

message_bus
.send(ToLayer {
message_id,
layer_id,
message: ProxyToLayerMessage::Incoming(IncomingResponse::PortSubscribe(Ok(()))),
})
.await;
.layer_subscribed(layer_id, message_id, subscribe);

return;
if let Some(msg) = msg {
message_bus.send(msg).await;
}

let msg = subscribe.subscription.agent_subscribe();
message_bus.send(ProxyMessage::ToAgent(msg)).await;

self.subscribe_reqs.insert(message_id, layer_id);
}

/// Sends a request to the agent to stop sending incoming connections for the specified port.
/// Tries to unregister the subscription from the [`SubscriptionManager`].
#[tracing::instrument(level = "trace", skip(self, message_bus))]
async fn handle_port_unsubscribe(
&mut self,
unsubscribe: PortUnsubscribe,
layer_id: LayerId,
request: PortUnsubscribe,
message_bus: &mut MessageBus<Self>,
) {
if let Some(subscription) = self.subscriptions.remove(&unsubscribe.port) {
let msg = subscription.subscription.wrap_agent_unsubscribe();
message_bus.send(ProxyMessage::ToAgent(msg)).await;
let msg = self.subscriptions.layer_unsubscribed(layer_id, request);

if let Some(msg) = msg {
message_bus.send(msg).await;
}
}

Expand All @@ -251,7 +223,7 @@ impl IncomingProxy {
let interceptor = match self.interceptors_http.entry(id) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
let Some(subscription) = self.subscriptions.get(&request.port()) else {
let Some(sub) = self.subscriptions.get(request.port()) else {
tracing::trace!(
"received a new http request for port {} that is no longer mirrored",
request.port()
Expand All @@ -262,14 +234,14 @@ impl IncomingProxy {

let version = request.version();
let interceptor = self.background_tasks.register(
HttpInterceptor::new(subscription.listening_on, version),
HttpInterceptor::new(sub.listening_on, version),
InterceptorId(request.connection_id()),
Self::CHANNEL_SIZE,
);

e.insert(InterceptorHandle {
tx: interceptor,
subscription: subscription.subscription.clone(),
subscription: sub.subscription.clone(),
})
}
};
Expand Down Expand Up @@ -325,12 +297,12 @@ impl IncomingProxy {
source_port,
local_address,
}) => {
let Some(subscription) = self.subscriptions.get(&destination_port) else {
let Some(sub) = self.subscriptions.get(destination_port) else {
tracing::trace!("received a new connection for port {destination_port} that is no longer mirrored");
return Ok(());
};

let interceptor_socket = match subscription.listening_on.ip() {
let interceptor_socket = match sub.listening_on.ip() {
addr @ IpAddr::V4(..) => {
let socket = TcpSocket::new_v4()?;
socket.bind(SocketAddr::new(addr, 0))?;
Expand All @@ -347,7 +319,7 @@ impl IncomingProxy {

self.metadata_store.expect(
ConnMetadataRequest {
listener_address: subscription.listening_on,
listener_address: sub.listening_on,
peer_address: interceptor_socket.local_addr()?,
},
id,
Expand All @@ -358,30 +330,24 @@ impl IncomingProxy {
);

let interceptor = self.background_tasks.register(
RawInterceptor::new(subscription.listening_on, interceptor_socket),
RawInterceptor::new(sub.listening_on, interceptor_socket),
id,
Self::CHANNEL_SIZE,
);
self.interceptors_raw.insert(
id,
InterceptorHandle {
tx: interceptor,
subscription: subscription.subscription.clone(),
subscription: sub.subscription.clone(),
},
);
}
DaemonTcp::SubscribeResult(res) => {
let (message_id, layer_id) = self.subscribe_reqs.get()?;

message_bus
.send(ToLayer {
message_id,
layer_id,
message: ProxyToLayerMessage::Incoming(IncomingResponse::PortSubscribe(
res.map(|_| ()),
)),
})
.await;
DaemonTcp::SubscribeResult(result) => {
let msgs = self.subscriptions.agent_responded(result)?;

for msg in msgs {
message_bus.send(msg).await;
}
}
}

Expand All @@ -390,18 +356,14 @@ impl IncomingProxy {

fn handle_layer_fork(&mut self, msg: LayerForked) {
let LayerForked { child, parent } = msg;
self.remote_subscriptions.clone_all(child, parent);
self.subscriptions.layer_forked(parent, child);
}

async fn handle_layer_close(&mut self, msg: LayerClosed, message_bus: &MessageBus<Self>) {
let LayerClosed { id } = msg;
for to_close in self.remote_subscriptions.remove_all(id) {
let Some(subscription) = self.subscriptions.remove(&to_close) else {
continue;
};
message_bus
.send(subscription.subscription.wrap_agent_unsubscribe())
.await;
let msgs = self.subscriptions.layer_closed(msg.id);

for msg in msgs {
message_bus.send(msg).await;
}
}

Expand Down Expand Up @@ -431,7 +393,7 @@ impl BackgroundTask for IncomingProxy {
},
Some(IncomingProxyMessage::LayerRequest(message_id, layer_id, req)) => match req {
IncomingRequest::PortSubscribe(subscribe) => self.handle_port_subscribe(message_id, layer_id, subscribe, message_bus).await,
IncomingRequest::PortUnsubscribe(unsubscribe) => self.handle_port_unsubscribe(unsubscribe, message_bus).await,
IncomingRequest::PortUnsubscribe(unsubscribe) => self.handle_port_unsubscribe(layer_id, unsubscribe, message_bus).await,
IncomingRequest::ConnMetadata(req) => {
let res = self.metadata_store.get(req);
message_bus.send(ToLayer { message_id, layer_id, message: ProxyToLayerMessage::Incoming(IncomingResponse::ConnMetadata(res)) }).await;
Expand Down
Loading

0 comments on commit c9979a7

Please sign in to comment.