Skip to content

Commit

Permalink
Make filters generic over the packet buffer (#1054)
Browse files Browse the repository at this point in the history
* Make filters generic over data (packet) buffer

* Fix bug with alloc_sized

The reserve was being calculated incorrectly, since it's additional bytes from the current position

* Impl compress

* Fix lints

* Remove unstable flag
  • Loading branch information
Jake-Shadle authored Jan 8, 2025
1 parent 481dcd2 commit f06bf5a
Show file tree
Hide file tree
Showing 30 changed files with 431 additions and 351 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ concurrency:

env:
CARGO_TERM_COLOR: always
RUSTFLAGS: "-C target-feature=+aes,+vaes,+avx2"
RUSTFLAGS: "-C target-feature=+aes,+avx2"

jobs:
lint:
Expand Down
26 changes: 8 additions & 18 deletions benches/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ mod decompress {
let compressor = Compressor::from(Mode::Snappy);

b.with_inputs(|| {
let mut packet = pool.clone().alloc_slice(&packet);
compressor.encode(pool.clone(), &mut packet).unwrap();
packet
let packet = pool.clone().alloc_slice(&packet);
compressor.encode(&packet).unwrap()
})
.input_counter(|buf| divan::counter::BytesCount::new(buf.len()))
.bench_local_refs(|buf| {
compressor.decode(pool.clone(), buf).unwrap();
})
.bench_local_refs(|buf| compressor.decode(buf).unwrap())
}

#[divan::bench(consts = PACKET_SIZES)]
Expand All @@ -52,14 +49,11 @@ mod decompress {
let compressor = Compressor::from(Mode::Lz4);

b.with_inputs(|| {
let mut packet = pool.clone().alloc_slice(&packet);
compressor.encode(pool.clone(), &mut packet).unwrap();
packet
let packet = pool.clone().alloc_slice(&packet);
compressor.encode(&packet).unwrap()
})
.input_counter(|buf| divan::counter::BytesCount::new(buf.len()))
.bench_local_refs(|buf| {
compressor.decode(pool.clone(), buf).unwrap();
})
.bench_local_refs(|buf| compressor.decode(buf).unwrap())
}
}

Expand All @@ -75,9 +69,7 @@ mod compress {

b.with_inputs(|| pool.clone().alloc_slice(&packet))
.input_counter(|buf| divan::counter::BytesCount::new(buf.len()))
.bench_local_refs(|buf| {
compressor.encode(pool.clone(), buf).unwrap();
})
.bench_local_refs(|buf| compressor.encode(buf).unwrap())
}

#[divan::bench(consts = PACKET_SIZES)]
Expand All @@ -88,8 +80,6 @@ mod compress {

b.with_inputs(|| pool.clone().alloc_slice(&packet))
.input_counter(|buf| divan::counter::BytesCount::new(buf.len()))
.bench_local_refs(|buf| {
compressor.encode(pool.clone(), buf).unwrap();
})
.bench_local_refs(|buf| compressor.encode(buf).unwrap())
}
}
5 changes: 3 additions & 2 deletions benches/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ fn token_router(b: Bencher, token_kind: &str) {
.counter(divan::counter::BytesCount::new(total_token_size))
.bench_local_values(|(cm, buffer, mut dest, metadata)| {
let mut rc = quilkin::filters::ReadContext {
endpoints: cm,
endpoints: &cm,
destinations: &mut dest,
source: quilkin::net::EndpointAddress::LOCALHOST,
contents: buffer,
metadata,
};

let _ = divan::black_box(filter.sync_read(&mut rc));
use quilkin::filters::Filter;
let _ = divan::black_box(filter.read(&mut rc));
})
}

Expand Down
2 changes: 1 addition & 1 deletion build/build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH \
CARGO_TERM_COLOR=always \
RUSTFLAGS="-C target-feature=+aes,+vaes,+avx2" \
RUSTFLAGS="-C target-feature=+aes,+avx2" \
LC_ALL=C.UTF-8 \
LANG=C.UTF-8

Expand Down
2 changes: 1 addition & 1 deletion cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ options:
- "REPOSITORY=${_REPOSITORY}"
- "BUILD_IMAGE_TAG=${_BUILD_IMAGE_TAG}"
- "CARGO_TERM_COLOR=always"
- 'RUSTFLAGS="-C target-feature=+aes,+vaes,+avx2"'
- 'RUSTFLAGS="-C target-feature=+aes,+avx2"'
machineType: E2_HIGHCPU_32
dynamic_substitutions: true
timeout: 7200s
Expand Down
9 changes: 3 additions & 6 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ impl DownstreamReceiveWorkerConfig {
return Err(PipelineError::NoUpstreamEndpoints);
}

let cm = config.clusters.clone_value();
let filters = config.filters.load();
let mut context = ReadContext::new(
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
destinations,
);
let mut context =
ReadContext::new(&cm, packet.source.into(), packet.contents, destinations);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext { contents, .. } = context;
Expand Down
10 changes: 8 additions & 2 deletions src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,17 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
fn read<P: crate::filters::Packet>(
&self,
ctx: &mut ReadContext<'_, P>,
) -> Result<(), FilterError> {
self.load().read(ctx)
}

fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
fn write<P: crate::filters::Packet>(
&self,
ctx: &mut WriteContext<P>,
) -> Result<(), FilterError> {
self.load().write(ctx)
}
}
Expand Down
17 changes: 14 additions & 3 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod token_router;
pub mod prelude {
pub use super::{
ConvertProtoConfigError, CreateFilterArgs, CreationError, Filter, FilterError,
FilterInstance, ReadContext, StaticFilter, WriteContext,
FilterInstance, Packet, ReadContext, StaticFilter, WriteContext,
};
}

Expand Down Expand Up @@ -179,6 +179,17 @@ where
}
}

pub trait Packet: Sized {
fn as_slice(&self) -> &[u8];
fn as_mut_slice(&mut self) -> &mut [u8];
fn set_len(&mut self, len: usize);
fn remove_head(&mut self, length: usize);
fn remove_tail(&mut self, length: usize);
fn extend_head(&mut self, bytes: &[u8]);
fn extend_tail(&mut self, bytes: &[u8]);
fn alloc_sized(&self, size: usize) -> Option<Self>;
}

/// Trait for routing and manipulating packets.
///
/// An implementation of [`Filter`] provides a `read` and a `write` method. Both
Expand Down Expand Up @@ -208,7 +219,7 @@ pub trait Filter: Send + Sync {
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> {
fn read<P: Packet>(&self, _: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
Ok(())
}

Expand All @@ -218,7 +229,7 @@ pub trait Filter: Send + Sync {
///
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
fn write(&self, _: &mut WriteContext) -> Result<(), FilterError> {
fn write<P: Packet>(&self, _: &mut WriteContext<P>) -> Result<(), FilterError> {
Ok(())
}
}
45 changes: 30 additions & 15 deletions src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod regex;

use crate::generated::quilkin::filters::capture::v1alpha1 as proto;

use crate::{filters::prelude::*, net::endpoint::metadata, pool::PoolBuffer};
use crate::{filters::prelude::*, net::endpoint::metadata};

/// The default key under which the [`Capture`] filter puts the
/// byte slices it extracts from each packet.
Expand All @@ -37,7 +37,7 @@ pub use self::{
pub trait CaptureStrategy {
/// Capture packet data from the contents, and optionally returns a value if
/// anything was captured.
fn capture(&self, contents: &mut PoolBuffer) -> Option<metadata::Value>;
fn capture(&self, contents: &[u8]) -> Option<(metadata::Value, isize)>;
}

pub struct Capture {
Expand All @@ -58,16 +58,25 @@ impl Capture {

impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let capture = self.capture.capture(&mut ctx.contents);
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
let capture = self.capture.capture(ctx.contents.as_slice());
ctx.metadata.insert(
self.is_present_key,
metadata::Value::Bool(capture.is_some()),
);

if let Some(value) = capture {
if let Some((value, remove)) = capture {
tracing::trace!(key=%self.metadata_key, %value, "captured value");
ctx.metadata.insert(self.metadata_key, value);

if remove != 0 {
if remove < 0 {
ctx.contents.remove_head(remove.unsigned_abs());
} else {
ctx.contents.remove_tail(remove as _);
}
}

Ok(())
} else {
tracing::trace!(key = %self.metadata_key, "No value captured");
Expand Down Expand Up @@ -163,7 +172,7 @@ mod tests {
let mut dest = Vec::new();
assert!(filter
.read(&mut ReadContext::new(
endpoints.into(),
&endpoints,
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
alloc_buffer(b"abc"),
&mut dest,
Expand All @@ -189,8 +198,8 @@ mod tests {
let end = Regex {
pattern: ::regex::bytes::Regex::new(".{3}$").unwrap(),
};
let mut contents = alloc_buffer(b"helloabc");
let result = end.capture(&mut contents).unwrap();
let contents = alloc_buffer(b"helloabc");
let result = end.capture(&contents).unwrap().0;
assert_eq!(Value::Bytes(b"abc".to_vec().into()), result);
assert_eq!(b"helloabc", &*contents);
}
Expand All @@ -202,14 +211,17 @@ mod tests {
remove: false,
};
let mut contents = alloc_buffer(b"helloabc");
let result = end.capture(&mut contents).unwrap();
let (result, remove) = end.capture(&contents).unwrap();
assert_eq!(Value::Bytes(b"abc".to_vec().into()), result);
assert_eq!(remove, 0);
assert_eq!(b"helloabc", &*contents);

end.remove = true;

let result = end.capture(&mut contents).unwrap();
let (result, remove) = end.capture(&contents).unwrap();
assert_eq!(Value::Bytes(b"abc".to_vec().into()), result);
assert_eq!(remove, 3);
contents.remove_tail(remove as _);
assert_eq!(b"hello", &*contents);
}

Expand All @@ -221,14 +233,17 @@ mod tests {
};
let mut contents = alloc_buffer(b"abchello");

let result = beg.capture(&mut contents);
assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result);
let (result, remove) = beg.capture(&contents).unwrap();
assert_eq!(Value::Bytes(b"abc".to_vec().into()), result);
assert_eq!(remove, 0);
assert_eq!(b"abchello", &*contents);

beg.remove = true;

let result = beg.capture(&mut contents);
assert_eq!(Some(Value::Bytes(b"abc".to_vec().into())), result);
let (result, remove) = beg.capture(&contents).unwrap();
assert_eq!(Value::Bytes(b"abc".to_vec().into()), result);
assert_eq!(remove, -3);
contents.remove_head(remove.unsigned_abs());
assert_eq!(b"hello", &*contents);
}

Expand All @@ -241,7 +256,7 @@ mod tests {
);
let mut dest = Vec::new();
let mut context = ReadContext::new(
endpoints.into(),
&endpoints,
"127.0.0.1:80".parse().unwrap(),
alloc_buffer(b"helloabc"),
&mut dest,
Expand Down
35 changes: 17 additions & 18 deletions src/filters/capture/affix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

use crate::{net::endpoint::metadata::Value, pool::PoolBuffer};
use crate::net::endpoint::metadata::Value;
use bytes::Bytes;

/// Returns whether the capture size is bigger than the packet size.
Expand All @@ -34,15 +34,18 @@ pub struct Prefix {
}

impl super::CaptureStrategy for Prefix {
fn capture(&self, contents: &mut PoolBuffer) -> Option<Value> {
fn capture(&self, contents: &[u8]) -> Option<(Value, isize)> {
is_valid_size(contents, self.size).then(|| {
if self.remove {
Value::Bytes(Bytes::copy_from_slice(
contents.split_prefix(self.size as _),
))
} else {
Value::Bytes(Bytes::copy_from_slice(&contents[..self.size as _]))
}
let value = Value::Bytes(Bytes::copy_from_slice(&contents[..self.size as _]));

(
value,
if self.remove {
-(self.size as isize)
} else {
0
},
)
})
}
}
Expand All @@ -58,16 +61,12 @@ pub struct Suffix {
}

impl super::CaptureStrategy for Suffix {
fn capture(&self, contents: &mut PoolBuffer) -> Option<Value> {
fn capture(&self, contents: &[u8]) -> Option<(Value, isize)> {
is_valid_size(contents, self.size).then(|| {
if self.remove {
Value::Bytes(Bytes::copy_from_slice(
contents.split_suffix(self.size as _),
))
} else {
let index = contents.len() - self.size as usize;
Value::Bytes(Bytes::copy_from_slice(&contents[index..]))
}
let index = contents.len() - self.size as usize;
let value = Value::Bytes(Bytes::copy_from_slice(&contents[index..]));

(value, if self.remove { self.size as isize } else { 0 })
})
}
}
8 changes: 4 additions & 4 deletions src/filters/capture/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

use crate::{net::endpoint::metadata::Value, pool::PoolBuffer};
use crate::net::endpoint::metadata::Value;

/// Capture from the start of the packet.
#[derive(serde::Serialize, serde::Deserialize, Debug, schemars::JsonSchema)]
Expand All @@ -26,17 +26,17 @@ pub struct Regex {
}

impl super::CaptureStrategy for Regex {
fn capture(&self, contents: &mut PoolBuffer) -> Option<Value> {
fn capture(&self, contents: &[u8]) -> Option<(Value, isize)> {
let matches = self
.pattern
.find_iter(contents)
.map(|mat| Value::Bytes(bytes::Bytes::copy_from_slice(mat.as_bytes())))
.collect::<Vec<_>>();

if matches.len() > 1 {
Some(Value::List(matches))
Some((Value::List(matches), 0))
} else {
matches.into_iter().next()
matches.into_iter().next().map(|v| (v, 0))
}
}
}
Expand Down
Loading

0 comments on commit f06bf5a

Please sign in to comment.