Skip to content

Commit

Permalink
Larger stealer buffers (metalbear-co#2278)
Browse files Browse the repository at this point in the history
* Increased buffer len to 64k

* Changelog entry

* Increased buffer size in internal proxy

* Updated changelog entry
  • Loading branch information
Razz4780 authored Feb 27, 2024
1 parent 9671f87 commit cff119d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/+increased-steal-buffers.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increased size of buffers used by TCP steal to read incoming streams (from 4k to 64k in the agent, from 1k to 64k in the internal proxy).
10 changes: 8 additions & 2 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ pub(crate) struct TcpConnectionStealer {
impl TcpConnectionStealer {
pub const TASK_NAME: &'static str = "Stealer";

/// Capacity of read buffers for [`ReaderStream`]s inside [`Self::read_streams`].
/// Essentialy - length of buffer for reading from tcp sockets with unfiltered steal.
const READ_STREAM_BUFFER_LEN: usize = 64 * 1024;

/// Initializes a new [`TcpConnectionStealer`] fields, but doesn't start the actual working
/// task (call [`TcpConnectionStealer::start`] to do so).
#[tracing::instrument(level = "trace")]
Expand Down Expand Up @@ -300,8 +304,10 @@ impl TcpConnectionStealer {

let (read_half, write_half) = tokio::io::split(stream);
self.write_streams.insert(connection_id, write_half);
self.read_streams
.insert(connection_id, ReaderStream::new(read_half));
self.read_streams.insert(
connection_id,
ReaderStream::with_capacity(read_half, Self::READ_STREAM_BUFFER_LEN),
);

self.connection_clients.insert(connection_id, client_id);
self.client_connections
Expand Down
10 changes: 6 additions & 4 deletions mirrord/intproxy/src/proxies/incoming/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use bytes::BytesMut;
use hyper::{StatusCode, Version};
use mirrord_protocol::tcp::{
HttpRequestFallback, HttpResponse, HttpResponseFallback, InternalHttpBody,
Expand Down Expand Up @@ -319,22 +320,23 @@ impl RawConnection {
mut self,
message_bus: &mut MessageBus<Interceptor>,
) -> InterceptorResult<Option<HttpConnection>> {
let mut buffer = vec![0; 1024];
let mut buf = BytesMut::with_capacity(64 * 1024);
let mut remote_closed = false;
let mut reading_closed = false;

loop {
tokio::select! {
biased;

res = self.stream.read(&mut buffer[..]), if !reading_closed => match res {
res = self.stream.read_buf(&mut buf), if !reading_closed => match res {
Err(e) if e.kind() == ErrorKind::WouldBlock => {},
Err(e) => break Err(e.into()),
Ok(0) => {
reading_closed = true;
}
Ok(read) => {
message_bus.send(MessageOut::Raw(buffer.get(..read).unwrap().to_vec())).await;
Ok(..) => {
message_bus.send(MessageOut::Raw(buf.to_vec())).await;
buf.clear();
}
},

Expand Down

0 comments on commit cff119d

Please sign in to comment.