Skip to content

Commit

Permalink
Improve intproxy deps (metalbear-co#2057)
Browse files Browse the repository at this point in the history
* IntProxy protocol extracted to a separate crate

* mirrord-intproxy-protocol deps updated

* async support feature-gated

* mirrord-intproxy deps updated

* mirrord-console deps updated

* layer deps updated

* Changelog entry
  • Loading branch information
Razz4780 authored Nov 6, 2023
1 parent b050ef3 commit 915e347
Show file tree
Hide file tree
Showing 35 changed files with 228 additions and 191 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"rust-analyzer.cargo.features": [
"binary"
"binary",
]
}
19 changes: 13 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2039.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved crates structure around internal proxy and mirrord console.
2 changes: 1 addition & 1 deletion mirrord/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mirrord-progress = { path = "../progress" }
mirrord-kube = { path = "../kube" }
mirrord-config = { path = "../config" }
mirrord-protocol = { path = "../protocol" }
mirrord-console = { path = "../console" }
mirrord-console = { path = "../console", features = ["async-logger"] }
mirrord-analytics = { path = "../analytics" }
mirrord-intproxy = { path = "../intproxy" }

Expand Down
19 changes: 11 additions & 8 deletions mirrord/console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ required-features = ["binary"]

[features]
default = []
binary = ["dep:tracing", "dep:tracing-subscriber"]
binary = ["dep:tracing", "dep:tracing-subscriber", "dep:tokio", "mirrord-intproxy-protocol/codec-async"]
async-logger = ["mirrord-intproxy-protocol/codec-async", "dep:tokio", "dep:drain", "dep:tokio-util"]

[dependencies]
mirrord-intproxy = { path = "../intproxy" }
tokio.workspace = true
tracing = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
log = { version = "0.4", features = ["std"] }
mirrord-intproxy-protocol = { path = "../intproxy/protocol", features = ["codec"] }

bincode.workspace = true
log = { version = "0.4", features = ["std"] }
miette = "5"
thiserror.workspace = true
drain.workspace = true
tokio-util.workspace = true

tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
drain = { workspace = true, optional = true }
tokio-util = { workspace = true, optional = true }
2 changes: 1 addition & 1 deletion mirrord/console/src/async_logger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use drain::Watch;
use log::LevelFilter;
use mirrord_intproxy::codec::AsyncEncoder;
use mirrord_intproxy_protocol::codec::AsyncEncoder;
use tokio::{
io::BufWriter,
net::TcpStream,
Expand Down
2 changes: 1 addition & 1 deletion mirrord/console/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use miette::Diagnostic;
use mirrord_intproxy::codec::CodecError;
use mirrord_intproxy_protocol::codec::CodecError;
use thiserror::Error;

pub type Result<T, E = ConsoleError> = std::result::Result<T, E>;
Expand Down
2 changes: 2 additions & 0 deletions mirrord/console/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#![warn(clippy::indexing_slicing)]

#[cfg(feature = "async-logger")]
pub mod async_logger;
pub mod error;
pub mod logger;
pub mod protocol;

#[cfg(feature = "async-logger")]
pub use async_logger::init_async_logger;
pub use logger::init_logger;
2 changes: 1 addition & 1 deletion mirrord/console/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{io::BufWriter, net::TcpStream, sync::Mutex};

use log::LevelFilter;
use mirrord_intproxy::codec::SyncEncoder;
use mirrord_intproxy_protocol::codec::SyncEncoder;

use crate::{
error::Result,
Expand Down
2 changes: 1 addition & 1 deletion mirrord/console/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bincode::Decode;
use mirrord_console::protocol::{Hello, Record};
use mirrord_intproxy::codec::AsyncDecoder;
use mirrord_intproxy_protocol::codec::AsyncDecoder;
use tokio::{
io::BufReader,
net::{TcpListener, TcpStream},
Expand Down
6 changes: 1 addition & 5 deletions mirrord/intproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@ mirrord-config = { path = "../config" }
mirrord-kube = { path = "../kube" }
mirrord-operator = { path = "../operator", features = ["client"] }
mirrord-protocol = { path = "../protocol" }
mirrord-intproxy-protocol = { path = "./protocol", features = ["codec-async"] }

bincode.workspace = true
futures.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true
hyper = { workspace = true, features = ["client", "http1", "http2"] }
hyper-util.workspace = true
http-body-util.workspace = true
fancy-regex.workspace = true
bytes.workspace = true

rand = "0.8"
27 changes: 27 additions & 0 deletions mirrord/intproxy/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "mirrord-intproxy-protocol"
version.workspace = true
authors.workspace = true
description.workspace = true
documentation.workspace = true
readme.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
publish.workspace = true
edition.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
mirrord-protocol = { path = "../../protocol" }

bincode.workspace = true
thiserror = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }

[features]
codec = ["dep:thiserror"]
codec-async = ["codec", "dep:tokio"]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Custom codec used in `layer <-> proxy` communication.
//! Supports both synchronous (required by the layer) and asynchronous (convenient for the proxy)
//! IO.
//! IO. Asynchronous IO is feature-gated with the `codec-async` feature.
//!
//! An encoded message consists of two parts:
//! * prefix: 4 bytes containing payload length in bytes (big-endian [`u32`])
Expand All @@ -16,10 +16,11 @@ use bincode::{
Decode, Encode,
};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
};

#[cfg(feature = "codec-async")]
mod codec_async;
#[cfg(feature = "codec-async")]
pub use codec_async::*;

/// Errors that can occur when using this codec.
#[derive(Error, Debug)]
Expand Down Expand Up @@ -164,119 +165,3 @@ pub fn make_sync_framed<T1: Encode, T2: Decode>(

Ok((sender, receiver))
}

/// Handles sending messages of type `T` through the underlying [AsyncWrite] of type `W`.
#[derive(Debug)]
pub struct AsyncEncoder<T, W> {
buffer: Vec<u8>,
writer: W,
_phantom: std::marker::PhantomData<fn() -> T>,
}

impl<T, W> AsyncEncoder<T, W> {
/// Wraps the underlying IO handler.
pub fn new(writer: W) -> Self {
Self {
buffer: Vec::with_capacity(BUFFER_SIZE),
writer,
_phantom: Default::default(),
}
}

/// Unwraps the underlying IO handler.
pub fn into_inner(self) -> W {
self.writer
}
}

impl<T, W> AsyncEncoder<T, W>
where
T: Encode,
W: AsyncWrite + Unpin,
{
/// Encodes the given value into the inner IO handler.
pub async fn send(&mut self, value: &T) -> Result<()> {
self.buffer.resize(PREFIX_BYTES, 0);
let bytes: u32 =
bincode::encode_into_std_write(value, &mut self.buffer, bincode::config::standard())?
.try_into()?;
self.buffer
.get_mut(..PREFIX_BYTES)
.expect("buffer to short")
.copy_from_slice(&bytes.to_be_bytes());

self.writer.write_all(&self.buffer).await?;

Ok(())
}

/// Flushes the inner IO handler.
pub async fn flush(&mut self) -> Result<()> {
self.writer.flush().await.map_err(Into::into)
}
}

/// Handles receiving messages of type `T` from the underlying [AsyncRead] of type `W`.
#[derive(Debug)]
pub struct AsyncDecoder<T, R> {
buffer: Vec<u8>,
reader: R,
_phantom: std::marker::PhantomData<fn() -> T>,
}

impl<T, R> AsyncDecoder<T, R> {
/// Wraps the underlying IO handler.
pub fn new(reader: R) -> Self {
Self {
buffer: Vec::with_capacity(BUFFER_SIZE),
reader,
_phantom: Default::default(),
}
}

/// Unwraps the underlying IO handler.
pub fn into_inner(self) -> R {
self.reader
}
}

impl<T, R> AsyncDecoder<T, R>
where
T: Decode,
R: AsyncRead + Unpin,
{
/// Decodes the next message from the underlying IO handler.
/// Does not read any excessive bytes.
pub async fn receive(&mut self) -> Result<Option<T>> {
let mut len_buffer = [0; 4];
match self.reader.read_exact(&mut len_buffer).await {
Ok(..) => {}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => Err(e)?,
}
let len = u32::from_be_bytes(len_buffer);

self.buffer.resize(len as usize, 0);
self.reader.read_exact(&mut self.buffer).await?;

let value = bincode::decode_from_slice(&self.buffer, bincode::config::standard())?.0;

Ok(Some(value))
}
}

/// Creates a new pair of [`AsyncEncoder`] and [`AsyncDecoder`], using the given asynchronous
/// [`TcpStream`](tokio::net::TcpStream).
pub fn make_async_framed<T1: Encode, T2: Decode>(
stream: tokio::net::TcpStream,
) -> (
AsyncEncoder<T1, OwnedWriteHalf>,
AsyncDecoder<T2, OwnedReadHalf>,
) {
let (reader, writer) = stream.into_split();

let sender = AsyncEncoder::new(writer);
let receiver = AsyncDecoder::new(reader);

(sender, receiver)
}
Loading

0 comments on commit 915e347

Please sign in to comment.