From 9e9863f61f06bd6931d4cbf344b59e9efd416717 Mon Sep 17 00:00:00 2001 From: Razz4780 Date: Tue, 17 Oct 2023 15:18:43 +0200 Subject: [PATCH] incoming http docs - intproxy --- mirrord/intproxy/src/proxies/incoming.rs | 10 +++---- mirrord/intproxy/src/proxies/incoming/http.rs | 9 +++++-- .../src/proxies/incoming/http_interceptor.rs | 27 +++++++++++-------- .../proxies/incoming/port_subscription_ext.rs | 22 ++++++++++----- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/mirrord/intproxy/src/proxies/incoming.rs b/mirrord/intproxy/src/proxies/incoming.rs index 303d480cc02..10a6985291e 100644 --- a/mirrord/intproxy/src/proxies/incoming.rs +++ b/mirrord/intproxy/src/proxies/incoming.rs @@ -132,8 +132,8 @@ struct InterceptorHandle { /// agent. If the agent closes the connection, the proxy shuts down the [`HttpInterceptor`]. #[derive(Default)] pub struct IncomingProxy { - /// For keeping track of active subscriptions across forks. - subscribed_ports: RemoteResources, + /// For keeping track of active port subscriptions on the agent side. + remote_subscriptions: RemoteResources, /// Mapping from subscribed port on the remote target to layer's [`PortSubscribe`] request. subscriptions: HashMap, /// For matching agent's responses with layer's [`PortSubscribe`] requests. @@ -187,7 +187,7 @@ impl IncomingProxy { return; } - let msg = subscribe.subscription.wrap_agent_subscribe(); + let msg = subscribe.subscription.agent_subscribe(); message_bus.send(ProxyMessage::ToAgent(msg)).await; self.subscribe_reqs.insert(message_id, layer_id); @@ -331,12 +331,12 @@ impl IncomingProxy { fn handle_layer_fork(&mut self, msg: LayerForked) { let LayerForked { child, parent } = msg; - self.subscribed_ports.clone_all(child, parent); + self.remote_subscriptions.clone_all(child, parent); } async fn handle_layer_close(&mut self, msg: LayerClosed, message_bus: &mut MessageBus) { let LayerClosed { id } = msg; - for to_close in self.subscribed_ports.remove_all(id) { + for to_close in self.remote_subscriptions.remove_all(id) { let Some(subscription) = self.subscriptions.remove(&to_close) else { continue; }; diff --git a/mirrord/intproxy/src/proxies/incoming/http.rs b/mirrord/intproxy/src/proxies/incoming/http.rs index 9dcadec3659..407e7dfeeee 100644 --- a/mirrord/intproxy/src/proxies/incoming/http.rs +++ b/mirrord/intproxy/src/proxies/incoming/http.rs @@ -1,3 +1,8 @@ +//! # PR NOTE +//! +//! Copied from old `HttpV` trait and its implementations (`HttpV1` and `HttpV2`). +//! A bit simplified - turned into the [`HttpConnection`] enum after removing unnecessary associated types. + use std::convert::Infallible; use bytes::Bytes; @@ -14,12 +19,12 @@ use tokio::net::TcpStream; use super::http_interceptor::HttpInterceptorError; /// Handles the differences between hyper's HTTP/1 and HTTP/2 connections. -pub enum HttpConnector { +pub enum HttpConnection { V1(http1::SendRequest>), V2(http2::SendRequest>), } -impl HttpConnector { +impl HttpConnection { pub async fn handshake( version: Version, target_stream: TcpStream, diff --git a/mirrord/intproxy/src/proxies/incoming/http_interceptor.rs b/mirrord/intproxy/src/proxies/incoming/http_interceptor.rs index 209d30c38d6..64dc4d2ea05 100644 --- a/mirrord/intproxy/src/proxies/incoming/http_interceptor.rs +++ b/mirrord/intproxy/src/proxies/incoming/http_interceptor.rs @@ -1,5 +1,10 @@ //! [`BackgroundTask`] used by [`Incoming`](super::IncomingProxy) to manage a single //! intercepted HTTP connection. +//! +//! # PR NOTE +//! +//! Most logic copied from old HTTP `ConnectionTask`. +//! Added sending remote peer's address in `connect_and_send_source`. use std::{io, net::SocketAddr}; @@ -10,7 +15,7 @@ use mirrord_protocol::tcp::{ use thiserror::Error; use tokio::net::TcpStream; -use super::{http::HttpConnector, InterceptorMessageOut}; +use super::{http::HttpConnection, InterceptorMessageOut}; use crate::{ background_tasks::{BackgroundTask, MessageBus}, codec::{AsyncEncoder, CodecError}, @@ -55,9 +60,9 @@ impl HttpInterceptor { impl HttpInterceptor { /// Prepares an HTTP connection. - async fn connect_to_application(&self) -> Result { + async fn connect_to_application(&self) -> Result { let target_stream = self.connect_and_send_source().await?; - HttpConnector::handshake(self.version, target_stream).await + HttpConnection::handshake(self.version, target_stream).await } /// Connects to the local listener and sends it encoded address of the remote peer. @@ -163,20 +168,20 @@ impl HttpInterceptor { async fn send_to_user( &mut self, request: HttpRequestFallback, - connector: &mut HttpConnector, + connection: &mut HttpConnection, ) -> Result { - let response = connector.send_request(request.clone()).await; + let response = connection.send_request(request.clone()).await; let response = self.handle_response(request, response).await; // Retry once if the connection was closed. if let Err(HttpInterceptorError::ConnectionClosedTooSoon(request)) = response { tracing::trace!("Request {request:#?} connection was closed too soon, retrying once!"); - // Create a new connector for this second attempt. - let new_connector = self.connect_to_application().await?; - *connector = new_connector; + // Create a new connection for this second attempt. + let new_connection = self.connect_to_application().await?; + *connection = new_connection; - let response = connector.send_request(request.clone()).await; + let response = connection.send_request(request.clone()).await; self.handle_response(request, response).await } else { response @@ -190,10 +195,10 @@ impl BackgroundTask for HttpInterceptor { type MessageOut = InterceptorMessageOut; async fn run(mut self, message_bus: &mut MessageBus) -> Result<(), Self::Error> { - let mut connector = self.connect_to_application().await?; + let mut connection = self.connect_to_application().await?; while let Some(request) = message_bus.recv().await { - let response = self.send_to_user(request, &mut connector).await?; + let response = self.send_to_user(request, &mut connection).await?; message_bus .send(InterceptorMessageOut::Http(response)) diff --git a/mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs b/mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs index 42f44f3d55d..ccd3e2bce7b 100644 --- a/mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs +++ b/mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs @@ -8,6 +8,7 @@ use mirrord_protocol::{ use super::InterceptorMessageOut; use crate::protocol::PortSubscription; +/// Retrieves subscribed port from the given [`StealType`]. fn get_port(steal_type: &StealType) -> Port { match steal_type { StealType::All(port) => *port, @@ -16,19 +17,23 @@ fn get_port(steal_type: &StealType) -> Port { } } +/// Trait for [`PortSubscription`] that handles differences in [`mirrord_protocol`] between the `steal` and the `mirror` flow. +/// Allows to unify logic for both flows. pub trait PortSubscriptionExt { + /// Returns the subscribed port. fn port(&self) -> Port; - /// Returns a correct port subscription request. - fn wrap_agent_subscribe(&self) -> ClientMessage; + /// Returns a subscribe request to be sent to the agent. + fn agent_subscribe(&self) -> ClientMessage; - /// Returns a port unsubscription request correct for this mode. + /// Returns an unsubscribe request to be sent to the agent. fn wrap_agent_unsubscribe(&self) -> ClientMessage; - /// Returns a connection unsubscription request correct for this mode. + /// Returns an unsubscribe connection request to be sent to the agent. fn wrap_agent_unsubscribe_connection(&self, connection_id: ConnectionId) -> ClientMessage; /// Returns a message to be sent to the agent in response to data coming from an interceptor. + /// [`None`] means that the data should be discarded. fn wrap_response( &self, res: InterceptorMessageOut, @@ -44,7 +49,8 @@ impl PortSubscriptionExt for PortSubscription { } } - fn wrap_agent_subscribe(&self) -> ClientMessage { + /// [`LayerTcp::PortSubscribe`] or [`LayerTcpSteal::PortSubscribe`]. + fn agent_subscribe(&self) -> ClientMessage { match self { Self::Mirror(port) => ClientMessage::Tcp(LayerTcp::PortSubscribe(*port)), Self::Steal(steal_type) => { @@ -53,7 +59,7 @@ impl PortSubscriptionExt for PortSubscription { } } - /// Returns a port unsubscription request correct for this mode. + /// [`LayerTcp::PortUnsubscribe`] or [`LayerTcpSteal::PortUnsubscribe`]. fn wrap_agent_unsubscribe(&self) -> ClientMessage { match self { Self::Mirror(port) => ClientMessage::Tcp(LayerTcp::PortUnsubscribe(*port)), @@ -63,7 +69,7 @@ impl PortSubscriptionExt for PortSubscription { } } - /// Returns a connection unsubscription request correct for this mode. + /// [`LayerTcp::ConnectionUnsubscribe`] or [`LayerTcpSteal::ConnectionUnsubscribe`]. fn wrap_agent_unsubscribe_connection(&self, connection_id: ConnectionId) -> ClientMessage { match self { Self::Mirror(..) => ClientMessage::Tcp(LayerTcp::ConnectionUnsubscribe(connection_id)), @@ -73,6 +79,8 @@ impl PortSubscriptionExt for PortSubscription { } } + /// Always [`None`] for the `mirror` mode - data coming from the layer is discarded. + /// Corrent [`LayerTcpSteal`] variant for the `steal` mode. fn wrap_response( &self, res: InterceptorMessageOut,