Skip to content

Commit

Permalink
refactor: Remove pipeline logging (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Jan 14, 2025
1 parent 38b9dda commit 146c92c
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 169 deletions.
14 changes: 0 additions & 14 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,6 @@ trace_test!(uring_receiver, {

let (mut packet_rx, endpoint) = sb.server("server");

let (error_sender, mut error_receiver) = tokio::sync::mpsc::channel::<proxy::ErrorMap>(20);

tokio::task::spawn(
async move {
while let Some(errors) = error_receiver.recv().await {
for error in errors.keys() {
tracing::error!(%error, "error sent from DownstreamReceiverWorker");
}
}
}
.instrument(tracing::debug_span!("error rx")),
);

let config = std::sync::Arc::new(quilkin::Config::default_non_agent());
config
.clusters
Expand All @@ -115,7 +102,6 @@ trace_test!(uring_receiver, {
worker_id: 1,
port: addr.port(),
config: config.clone(),
error_sender,
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
sessions: proxy::SessionPool::new(
config,
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod packet_router;
mod sessions;

use super::RunArgs;
pub use error::{ErrorMap, PipelineError};
pub use error::PipelineError;
pub use sessions::SessionPool;
use std::{
net::SocketAddr,
Expand Down
96 changes: 1 addition & 95 deletions src/components/proxy/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ pub enum PipelineError {
Filter(crate::filters::FilterError),
Session(super::sessions::SessionError),
Io(std::io::Error),
ChannelClosed,
ChannelFull,
/// This occurs if a receive task has accumulated so many errors that the
/// error details had to be dropped in order to reduce memory pressure
AccumulatorOverflow,
}

impl PipelineError {
Expand All @@ -36,9 +31,6 @@ impl PipelineError {
Self::Filter(fe) => fe.discriminant(),
Self::Session(_) => "session",
Self::Io(_) => "io",
Self::ChannelClosed => "channel closed",
Self::ChannelFull => "channel full",
Self::AccumulatorOverflow => "error accumulator overflow",
}
}
}
Expand All @@ -52,9 +44,6 @@ impl fmt::Display for PipelineError {
Self::Filter(fe) => write!(f, "filter {fe}"),
Self::Session(session) => write!(f, "session error: {session}"),
Self::Io(io) => write!(f, "OS level error: {io}"),
Self::ChannelClosed => f.write_str("channel closed"),
Self::ChannelFull => f.write_str("channel full"),
Self::AccumulatorOverflow => f.write_str("error accumulator overflow"),
}
}
}
Expand All @@ -78,9 +67,6 @@ impl PartialEq for PipelineError {
(Self::Filter(fa), Self::Filter(fb)) => fa.eq(fb),
(Self::Session(sa), Self::Session(sb)) => sa.eq(sb),
(Self::Io(ia), Self::Io(ib)) => ia.kind().eq(&ib.kind()),
(Self::ChannelClosed, Self::ChannelClosed) => true,
(Self::ChannelFull, Self::ChannelFull) => true,
(Self::AccumulatorOverflow, Self::AccumulatorOverflow) => true,
_ => false,
}
}
Expand All @@ -97,87 +83,7 @@ impl Hash for PipelineError {
Self::Filter(fe) => Hash::hash(&fe, state),
Self::Session(se) => Hash::hash(&se, state),
Self::Io(io) => Hash::hash(&io.kind(), state),
Self::NoUpstreamEndpoints
| Self::ChannelClosed
| Self::ChannelFull
| Self::AccumulatorOverflow => {}
}
}
}

pub type ErrorMap = gxhash::HashMap<PipelineError, u64>;

pub type ErrorSender = tokio::sync::mpsc::Sender<ErrorMap>;
//pub type ErrorReceiver = tokio::sync::mpsc::Receiver<ErrorMap>;

/// The soft cap of errors after which we try to send them to the collation task
const CAP_ERRORS: usize = 10 * 1024;
/// The maximum errors that can be accumulated before being dropped and lost
const MAX_ERRORS: usize = 100 * 1024;
const MAX_ELAPSED: std::time::Duration = std::time::Duration::from_secs(5);

use std::time::Instant;

/// Accumulates errors on downstream receiver tasks before sending them for collation
///
/// If many errors occur and
pub struct ErrorAccumulator {
map: ErrorMap,
tx: ErrorSender,
oldest: Instant,
}

impl ErrorAccumulator {
pub fn new(tx: ErrorSender) -> Self {
Self {
map: <_>::default(),
tx,
oldest: Instant::now(),
}
}

pub fn maybe_send(&mut self) -> bool {
if self.map.is_empty() || self.oldest.elapsed() < MAX_ELAPSED {
return false;
}

self.do_send()
}

fn do_send(&mut self) -> bool {
let Ok(permit) = self.tx.try_reserve() else {
return false;
};

#[allow(clippy::mutable_key_type)]
let map = std::mem::take(&mut self.map);
permit.send(map);
true
}

pub fn push_error(&mut self, error: PipelineError) {
if self.map.is_empty() {
self.oldest = Instant::now();
}

*self.map.entry(error).or_default() += 1;

if self.map.len() >= CAP_ERRORS {
if self.do_send() {
return;
}

// If we failed to send and we've reach our max capacity, reset and
// note the fact that we did so
if self.map.len() >= MAX_ERRORS {
self.map.clear();
self.map.insert(PipelineError::AccumulatorOverflow, 1);
self.oldest = Instant::now();
}
}

if self.oldest.elapsed() >= MAX_ELAPSED {
self.do_send();
Self::NoUpstreamEndpoints => {}
}
}
}
61 changes: 11 additions & 50 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
metrics, Config,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::mpsc;

#[cfg(target_os = "linux")]
mod io_uring;
Expand Down Expand Up @@ -76,7 +75,6 @@ impl<P: PacketMut> DownstreamPacket<P> {
worker_id: usize,
config: &Arc<Config>,
sessions: &S,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
) {
tracing::trace!(
Expand All @@ -87,17 +85,19 @@ impl<P: PacketMut> DownstreamPacket<P> {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match self.process_inner(config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Err(error) => {
let discriminant = error.discriminant();
if let Err(error) = self.process_inner(config, sessions, destinations) {
let discriminant = error.discriminant();

// We only want to mark potential I/O errors as errors, as they
// can indicate something wrong with the system, error variants
// from packets being bad aren't errors from quilkin's perspective.
if matches!(
error,
PipelineError::Io(_) | PipelineError::Filter(crate::filters::FilterError::Io(_))
) {
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc();

error_acc.push_error(error);
}
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
}

timer.stop_and_record();
Expand Down Expand Up @@ -149,7 +149,6 @@ pub struct DownstreamReceiveWorkerConfig {
pub port: u16,
pub config: Arc<Config>,
pub sessions: Arc<SessionPool>,
pub error_sender: super::error::ErrorSender,
pub buffer_pool: Arc<crate::collections::BufferPool>,
}

Expand All @@ -165,8 +164,6 @@ pub async fn spawn_receivers(
sessions: &Arc<SessionPool>,
buffer_pool: Arc<crate::collections::BufferPool>,
) -> crate::Result<()> {
let (error_sender, mut error_receiver) = mpsc::channel(128);

let port = crate::net::socket_port(&socket);

for (worker_id, ws) in worker_sends.into_iter().enumerate() {
Expand All @@ -175,47 +172,11 @@ pub async fn spawn_receivers(
port,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
buffer_pool: buffer_pool.clone(),
};

worker.spawn(ws).await?;
}

drop(error_sender);

tokio::spawn(async move {
let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5));

#[allow(clippy::mutable_key_type)]
let mut pipeline_errors = super::error::ErrorMap::default();

#[allow(clippy::mutable_key_type)]
fn report(errors: &mut super::error::ErrorMap) {
for (error, instances) in errors.drain() {
tracing::warn!(%error, %instances, "pipeline report");
}
}

loop {
tokio::select! {
_ = log_task.tick() => {
report(&mut pipeline_errors);
}
received = error_receiver.recv() => {
let Some(errors) = received else {
report(&mut pipeline_errors);
tracing::info!("pipeline reporting task closed");
return;
};

for (k, v) in errors {
*pipeline_errors.entry(k).or_default() += v;
}
}
}
}
});

Ok(())
}
2 changes: 0 additions & 2 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ impl super::DownstreamReceiveWorkerConfig {
port,
config,
sessions,
error_sender,
buffer_pool,
} = self;

Expand All @@ -39,7 +38,6 @@ impl super::DownstreamReceiveWorkerConfig {
io_uring::PacketProcessorCtx::Router {
config,
sessions,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
destinations: Vec::with_capacity(1),
},
Expand Down
4 changes: 0 additions & 4 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl super::DownstreamReceiveWorkerConfig {
port,
config,
sessions,
error_sender,
buffer_pool,
} = self;

Expand Down Expand Up @@ -95,8 +94,6 @@ impl super::DownstreamReceiveWorkerConfig {
}
}

let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);

loop {
Expand Down Expand Up @@ -127,7 +124,6 @@ impl super::DownstreamReceiveWorkerConfig {
worker_id,
&config,
&sessions,
&mut error_acc,
&mut destinations,
);
}
Expand Down
4 changes: 1 addition & 3 deletions src/net/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ pub enum PacketProcessorCtx {
Router {
config: Arc<crate::config::Config>,
sessions: Arc<proxy::SessionPool>,
error_acc: proxy::error::ErrorAccumulator,
worker_id: usize,
destinations: Vec<crate::net::EndpointAddress>,
},
Expand All @@ -235,7 +234,6 @@ fn process_packet(
config,
sessions,
worker_id,
error_acc,
destinations,
} => {
let received_at = UtcTimestamp::now();
Expand All @@ -250,7 +248,7 @@ fn process_packet(
source: packet.source,
};

ds_packet.process(*worker_id, config, sessions, error_acc, destinations);
ds_packet.process(*worker_id, config, sessions, destinations);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
let mut last_received_at = None;
Expand Down

0 comments on commit 146c92c

Please sign in to comment.