Skip to content

Commit

Permalink
incoming http docs - intproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
Razz4780 committed Oct 17, 2023
1 parent c7255ad commit 9e9863f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 25 deletions.
10 changes: 5 additions & 5 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ struct InterceptorHandle<I: BackgroundTask> {
/// agent. If the agent closes the connection, the proxy shuts down the [`HttpInterceptor`].
#[derive(Default)]
pub struct IncomingProxy {
/// For keeping track of active subscriptions across forks.
subscribed_ports: RemoteResources<Port>,
/// For keeping track of active port subscriptions on the agent side.
remote_subscriptions: RemoteResources<Port>,
/// Mapping from subscribed port on the remote target to layer's [`PortSubscribe`] request.
subscriptions: HashMap<Port, PortSubscribe>,
/// For matching agent's responses with layer's [`PortSubscribe`] requests.
Expand Down Expand Up @@ -187,7 +187,7 @@ impl IncomingProxy {
return;
}

let msg = subscribe.subscription.wrap_agent_subscribe();
let msg = subscribe.subscription.agent_subscribe();
message_bus.send(ProxyMessage::ToAgent(msg)).await;

self.subscribe_reqs.insert(message_id, layer_id);
Expand Down Expand Up @@ -331,12 +331,12 @@ impl IncomingProxy {

fn handle_layer_fork(&mut self, msg: LayerForked) {
let LayerForked { child, parent } = msg;
self.subscribed_ports.clone_all(child, parent);
self.remote_subscriptions.clone_all(child, parent);
}

async fn handle_layer_close(&mut self, msg: LayerClosed, message_bus: &mut MessageBus<Self>) {

Check failure on line 337 in mirrord/intproxy/src/proxies/incoming.rs

View workflow job for this annotation

GitHub Actions / macos_tests

this argument is a mutable reference, but not used mutably
let LayerClosed { id } = msg;
for to_close in self.subscribed_ports.remove_all(id) {
for to_close in self.remote_subscriptions.remove_all(id) {
let Some(subscription) = self.subscriptions.remove(&to_close) else {
continue;
};
Expand Down
9 changes: 7 additions & 2 deletions mirrord/intproxy/src/proxies/incoming/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! # PR NOTE

Check warning on line 1 in mirrord/intproxy/src/proxies/incoming/http.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/mirrord/mirrord/mirrord/intproxy/src/proxies/incoming/http.rs
//!
//! Copied from old `HttpV` trait and its implementations (`HttpV1` and `HttpV2`).
//! A bit simplified - turned into the [`HttpConnection`] enum after removing unnecessary associated types.
use std::convert::Infallible;

use bytes::Bytes;
Expand All @@ -14,12 +19,12 @@ use tokio::net::TcpStream;
use super::http_interceptor::HttpInterceptorError;

/// Handles the differences between hyper's HTTP/1 and HTTP/2 connections.
pub enum HttpConnector {
pub enum HttpConnection {
V1(http1::SendRequest<BoxBody<Bytes, Infallible>>),
V2(http2::SendRequest<BoxBody<Bytes, Infallible>>),
}

impl HttpConnector {
impl HttpConnection {
pub async fn handshake(
version: Version,
target_stream: TcpStream,
Expand Down
27 changes: 16 additions & 11 deletions mirrord/intproxy/src/proxies/incoming/http_interceptor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
//! [`BackgroundTask`] used by [`Incoming`](super::IncomingProxy) to manage a single

Check warning on line 1 in mirrord/intproxy/src/proxies/incoming/http_interceptor.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/mirrord/mirrord/mirrord/intproxy/src/proxies/incoming/http_interceptor.rs
//! intercepted HTTP connection.
//!
//! # PR NOTE
//!
//! Most logic copied from old HTTP `ConnectionTask`.
//! Added sending remote peer's address in `connect_and_send_source`.
use std::{io, net::SocketAddr};

Expand All @@ -10,7 +15,7 @@ use mirrord_protocol::tcp::{
use thiserror::Error;
use tokio::net::TcpStream;

use super::{http::HttpConnector, InterceptorMessageOut};
use super::{http::HttpConnection, InterceptorMessageOut};
use crate::{
background_tasks::{BackgroundTask, MessageBus},
codec::{AsyncEncoder, CodecError},
Expand Down Expand Up @@ -55,9 +60,9 @@ impl HttpInterceptor {

impl HttpInterceptor {
/// Prepares an HTTP connection.
async fn connect_to_application(&self) -> Result<HttpConnector, HttpInterceptorError> {
async fn connect_to_application(&self) -> Result<HttpConnection, HttpInterceptorError> {
let target_stream = self.connect_and_send_source().await?;
HttpConnector::handshake(self.version, target_stream).await
HttpConnection::handshake(self.version, target_stream).await
}

/// Connects to the local listener and sends it encoded address of the remote peer.
Expand Down Expand Up @@ -163,20 +168,20 @@ impl HttpInterceptor {
async fn send_to_user(
&mut self,
request: HttpRequestFallback,
connector: &mut HttpConnector,
connection: &mut HttpConnection,
) -> Result<HttpResponseFallback, HttpInterceptorError> {
let response = connector.send_request(request.clone()).await;
let response = connection.send_request(request.clone()).await;
let response = self.handle_response(request, response).await;

// Retry once if the connection was closed.
if let Err(HttpInterceptorError::ConnectionClosedTooSoon(request)) = response {
tracing::trace!("Request {request:#?} connection was closed too soon, retrying once!");

// Create a new connector for this second attempt.
let new_connector = self.connect_to_application().await?;
*connector = new_connector;
// Create a new connection for this second attempt.
let new_connection = self.connect_to_application().await?;
*connection = new_connection;

let response = connector.send_request(request.clone()).await;
let response = connection.send_request(request.clone()).await;
self.handle_response(request, response).await
} else {
response
Expand All @@ -190,10 +195,10 @@ impl BackgroundTask for HttpInterceptor {
type MessageOut = InterceptorMessageOut;

async fn run(mut self, message_bus: &mut MessageBus<Self>) -> Result<(), Self::Error> {
let mut connector = self.connect_to_application().await?;
let mut connection = self.connect_to_application().await?;

while let Some(request) = message_bus.recv().await {
let response = self.send_to_user(request, &mut connector).await?;
let response = self.send_to_user(request, &mut connection).await?;

message_bus
.send(InterceptorMessageOut::Http(response))
Expand Down
22 changes: 15 additions & 7 deletions mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use mirrord_protocol::{
use super::InterceptorMessageOut;
use crate::protocol::PortSubscription;

/// Retrieves subscribed port from the given [`StealType`].
fn get_port(steal_type: &StealType) -> Port {
match steal_type {
StealType::All(port) => *port,
Expand All @@ -16,19 +17,23 @@ fn get_port(steal_type: &StealType) -> Port {
}

Check warning on line 17 in mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/mirrord/mirrord/mirrord/intproxy/src/proxies/incoming/port_subscription_ext.rs
}

/// Trait for [`PortSubscription`] that handles differences in [`mirrord_protocol`] between the `steal` and the `mirror` flow.
/// Allows to unify logic for both flows.
pub trait PortSubscriptionExt {
/// Returns the subscribed port.
fn port(&self) -> Port;

/// Returns a correct port subscription request.
fn wrap_agent_subscribe(&self) -> ClientMessage;
/// Returns a subscribe request to be sent to the agent.
fn agent_subscribe(&self) -> ClientMessage;

/// Returns a port unsubscription request correct for this mode.
/// Returns an unsubscribe request to be sent to the agent.
fn wrap_agent_unsubscribe(&self) -> ClientMessage;

/// Returns a connection unsubscription request correct for this mode.
/// Returns an unsubscribe connection request to be sent to the agent.
fn wrap_agent_unsubscribe_connection(&self, connection_id: ConnectionId) -> ClientMessage;

/// Returns a message to be sent to the agent in response to data coming from an interceptor.
/// [`None`] means that the data should be discarded.
fn wrap_response(
&self,
res: InterceptorMessageOut,
Expand All @@ -44,7 +49,8 @@ impl PortSubscriptionExt for PortSubscription {
}
}

fn wrap_agent_subscribe(&self) -> ClientMessage {
/// [`LayerTcp::PortSubscribe`] or [`LayerTcpSteal::PortSubscribe`].
fn agent_subscribe(&self) -> ClientMessage {
match self {
Self::Mirror(port) => ClientMessage::Tcp(LayerTcp::PortSubscribe(*port)),
Self::Steal(steal_type) => {
Expand All @@ -53,7 +59,7 @@ impl PortSubscriptionExt for PortSubscription {
}
}

/// Returns a port unsubscription request correct for this mode.
/// [`LayerTcp::PortUnsubscribe`] or [`LayerTcpSteal::PortUnsubscribe`].
fn wrap_agent_unsubscribe(&self) -> ClientMessage {
match self {
Self::Mirror(port) => ClientMessage::Tcp(LayerTcp::PortUnsubscribe(*port)),
Expand All @@ -63,7 +69,7 @@ impl PortSubscriptionExt for PortSubscription {
}
}

/// Returns a connection unsubscription request correct for this mode.
/// [`LayerTcp::ConnectionUnsubscribe`] or [`LayerTcpSteal::ConnectionUnsubscribe`].
fn wrap_agent_unsubscribe_connection(&self, connection_id: ConnectionId) -> ClientMessage {
match self {
Self::Mirror(..) => ClientMessage::Tcp(LayerTcp::ConnectionUnsubscribe(connection_id)),
Expand All @@ -73,6 +79,8 @@ impl PortSubscriptionExt for PortSubscription {
}
}

/// Always [`None`] for the `mirror` mode - data coming from the layer is discarded.
/// Corrent [`LayerTcpSteal`] variant for the `steal` mode.
fn wrap_response(
&self,
res: InterceptorMessageOut,
Expand Down

0 comments on commit 9e9863f

Please sign in to comment.