Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2024-11-22 weekly sync #479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ef8c591ce876042b96a4efe6331606f73e2f124d
89319e9383d6f99066dfeace750a553d45e1c167
70 changes: 46 additions & 24 deletions pingora-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,12 @@ pub enum NoCacheReason {
///
/// This happens when the cache predictor predicted that this request is not cacheable, but
/// the response turns out to be OK to cache. However, it might be too large to re-enable caching
/// for this request.
/// for this request
Deferred,
/// Due to the proxy upstream filter declining the current request from going upstream
DeclinedToUpstream,
/// Due to the upstream being unreachable or otherwise erroring during proxying
UpstreamError,
/// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
CacheLockGiveUp,
/// This request waited too long for the writer of the cache lock to finish, so this request will
Expand All @@ -147,6 +151,8 @@ impl NoCacheReason {
StorageError => "StorageError",
InternalError => "InternalError",
Deferred => "Deferred",
DeclinedToUpstream => "DeclinedToUpstream",
UpstreamError => "UpstreamError",
CacheLockGiveUp => "CacheLockGiveUp",
CacheLockTimeout => "CacheLockTimeout",
Custom(s) => s,
Expand Down Expand Up @@ -299,38 +305,52 @@ impl HttpCache {
.is_some()
}

/// Release the cache lock if the current request is a cache writer.
///
/// Generally callers should prefer using `disable` when a cache lock should be released
/// due to an error to clear all cache context. This function is for releasing the cache lock
/// while still keeping the cache around for reading, e.g. when serving stale.
pub fn release_write_lock(&mut self, reason: NoCacheReason) {
use NoCacheReason::*;
if let Some(inner) = self.inner.as_mut() {
let lock = inner.lock.take();
if let Some(Locked::Write(_r)) = lock {
let lock_status = match reason {
// let the next request try to fetch it
InternalError | StorageError | Deferred | UpstreamError => {
LockStatus::TransientError
}
// depends on why the proxy upstream filter declined the request,
// for now still allow next request try to acquire to avoid thundering herd
DeclinedToUpstream => LockStatus::TransientError,
// no need for the lock anymore
OriginNotCache | ResponseTooLarge => LockStatus::GiveUp,
// not sure which LockStatus make sense, we treat it as GiveUp for now
Custom(_) => LockStatus::GiveUp,
// should never happen, NeverEnabled shouldn't hold a lock
NeverEnabled => panic!("NeverEnabled holds a write lock"),
CacheLockGiveUp | CacheLockTimeout => {
panic!("CacheLock* are for cache lock readers only")
}
};
inner
.cache_lock
.unwrap()
.release(inner.key.as_ref().unwrap(), lock_status);
}
}
}

/// Disable caching
pub fn disable(&mut self, reason: NoCacheReason) {
use NoCacheReason::*;
match self.phase {
CachePhase::Disabled(_) => {
// replace reason
self.phase = CachePhase::Disabled(reason);
}
_ => {
self.phase = CachePhase::Disabled(reason);
if let Some(inner) = self.inner.as_mut() {
let lock = inner.lock.take();
if let Some(Locked::Write(_r)) = lock {
let lock_status = match reason {
// let the next request try to fetch it
InternalError | StorageError | Deferred => LockStatus::TransientError,
// no need for the lock anymore
OriginNotCache | ResponseTooLarge => LockStatus::GiveUp,
// not sure which LockStatus make sense, we treat it as GiveUp for now
Custom(_) => LockStatus::GiveUp,
// should never happen, NeverEnabled shouldn't hold a lock
NeverEnabled => panic!("NeverEnabled holds a write lock"),
CacheLockGiveUp | CacheLockTimeout => {
panic!("CacheLock* are for cache lock readers only")
}
};
inner
.cache_lock
.unwrap()
.release(inner.key.as_ref().unwrap(), lock_status);
}
}
self.release_write_lock(reason);
// log initial disable reason
self.inner_mut()
.traces
Expand Down Expand Up @@ -824,6 +844,8 @@ impl HttpCache {
CachePhase::Stale => {
// replace cache meta header
self.inner_mut().meta.as_mut().unwrap().0.header = header;
// upstream request done, release write lock
self.release_write_lock(reason);
}
_ => panic!("wrong phase {:?}", self.phase),
}
Expand Down
2 changes: 1 addition & 1 deletion pingora-cache/src/predictor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
// CacheLockGiveUp: the writer will set OriginNotCache (if applicable)
// readers don't need to do it
NeverEnabled | StorageError | InternalError | Deferred | CacheLockGiveUp
| CacheLockTimeout => {
| CacheLockTimeout | DeclinedToUpstream | UpstreamError => {
return None;
}
// Skip certain NoCacheReason::Custom according to user
Expand Down
158 changes: 14 additions & 144 deletions pingora-cache/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,17 @@ mod parse_response {
ErrorType::{self, *},
};

pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk");
pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");

const MAX_HEADERS: usize = 256;
const INIT_HEADER_BUF_SIZE: usize = 4096;
const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n

#[derive(Debug, Clone, Copy, PartialEq)]
enum ParseState {
Init,
PartialHeader,
PartialBodyContentLength(usize, usize),
PartialChunkedBody(usize),
PartialHttp10Body(usize),
PartialBody(usize),
Done(usize),
Invalid(httparse::Error),
}
Expand All @@ -379,9 +376,7 @@ mod parse_response {
fn read_body(&self) -> bool {
matches!(
self,
Self::PartialBodyContentLength(..)
| Self::PartialChunkedBody(_)
| Self::PartialHttp10Body(_)
Self::PartialBodyContentLength(..) | Self::PartialBody(_)
)
}
}
Expand Down Expand Up @@ -509,49 +504,15 @@ mod parse_response {
}
Ok(Some(self.buf.split_to(end).freeze()))
}
PartialChunkedBody(seen) => {
let parsed = httparse::parse_chunk_size(&self.buf).map_err(|e| {
self.state = Done(seen);
Error::explain(INVALID_CHUNK, format!("Invalid chunked encoding: {e:?}"))
})?;
match parsed {
httparse::Status::Complete((header_len, body_len)) => {
// 4\r\nRust\r\n: header: "4\r\n", body: "Rust", "\r\n"
let total_chunk_size =
header_len + body_len as usize + CHUNK_DELIMITER_SIZE;
if self.buf.len() < total_chunk_size {
// wait for the full chunk to be read
// Note that we have to buffer the entire chunk in this design
Ok(None)
} else {
if body_len == 0 {
self.state = Done(seen);
} else {
self.state = PartialChunkedBody(seen + body_len as usize);
}
let mut chunk_bytes = self.buf.split_to(total_chunk_size);
let mut chunk_body = chunk_bytes.split_off(header_len);
chunk_body.truncate(body_len as usize);
// Note that the final 0 sized chunk will return an empty Bytes
// instead of not None
Ok(Some(chunk_body.freeze()))
}
}
httparse::Status::Partial => {
// not even a full chunk, continue waiting for more data
Ok(None)
}
}
}
PartialHttp10Body(seen) => {
self.state = PartialHttp10Body(seen + self.buf.len());
PartialBody(seen) => {
self.state = PartialBody(seen + self.buf.len());
Ok(Some(self.buf.split().freeze()))
}
}
}

pub fn finish(&mut self) -> Result<()> {
if let ParseState::PartialHttp10Body(seen) = self.state {
if let ParseState::PartialBody(seen) = self.state {
self.state = ParseState::Done(seen);
}
if !self.state.is_done() {
Expand All @@ -572,12 +533,6 @@ mod parse_response {
// these status codes cannot have body by definition
return ParseState::Done(0);
}
if let Some(encoding) = resp.headers.get(http::header::TRANSFER_ENCODING) {
// TODO: case sensitive?
if encoding.as_bytes() == b"chunked" {
return ParseState::PartialChunkedBody(0);
}
}
if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) {
// ignore invalid header value
if let Some(cl) = std::str::from_utf8(cl.as_bytes())
Expand All @@ -591,7 +546,10 @@ mod parse_response {
};
}
}
ParseState::PartialHttp10Body(0)
// HTTP/1.0 and chunked encoding are both treated as PartialBody
// The response body payload should _not_ be chunked encoded
// even if the Transfer-Encoding: chunked header is added
ParseState::PartialBody(0)
}

#[cfg(test)]
Expand Down Expand Up @@ -684,7 +642,7 @@ mod parse_response {

#[test]
fn test_body_chunked() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n";
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nrust";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

Expand All @@ -700,14 +658,6 @@ mod parse_response {
assert_eq!(data.as_ref().unwrap(), "rust");
assert!(!eos);

let output = parser.inject_data(b"0\r\n\r\n").unwrap();
assert_eq!(output.len(), 1);
let HttpTask::Body(data, eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "");
assert!(eos);

parser.finish().unwrap();
}

Expand Down Expand Up @@ -755,8 +705,8 @@ mod parse_response {
}

#[test]
fn test_body_chunked_early() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n";
fn test_body_chunked_partial_chunk() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nru";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

Expand All @@ -769,75 +719,15 @@ mod parse_response {
let HttpTask::Body(data, eos) = &output[1] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "rust");
assert_eq!(data.as_ref().unwrap(), "ru");
assert!(!eos);

parser.finish().unwrap_err();
}

#[test]
fn test_body_chunked_partial_chunk() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nru";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

assert_eq!(output.len(), 1);
let HttpTask::Header(header, _eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(header.status, 200);

let output = parser.inject_data(b"st\r\n").unwrap();
assert_eq!(output.len(), 1);
let HttpTask::Body(data, eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "rust");
assert!(!eos);
}

#[test]
fn test_body_chunked_partial_chunk_head() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

assert_eq!(output.len(), 1);
let HttpTask::Header(header, _eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(header.status, 200);

let output = parser.inject_data(b"\nrust\r\n").unwrap();
assert_eq!(output.len(), 1);
let HttpTask::Body(data, eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "rust");
assert!(!eos);
}

#[test]
fn test_body_chunked_many_chunks() {
let input =
b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n1\r\ny\r\n";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

assert_eq!(output.len(), 3);
let HttpTask::Header(header, _eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(header.status, 200);
let HttpTask::Body(data, eos) = &output[1] else {
panic!("{:?}", output);
};
assert!(!eos);
assert_eq!(data.as_ref().unwrap(), "rust");
let HttpTask::Body(data, eos) = &output[2] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "y");
assert_eq!(data.as_ref().unwrap(), "st\r\n");
assert!(!eos);
}

Expand Down Expand Up @@ -928,25 +818,5 @@ mod parse_response {
assert!(output.is_empty());
parser.finish().unwrap();
}

#[test]
fn test_no_body_chunked() {
let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n0\r\n\r\n";
let mut parser = ResponseParse::new();
let output = parser.inject_data(input).unwrap();

assert_eq!(output.len(), 2);
let HttpTask::Header(header, _eos) = &output[0] else {
panic!("{:?}", output);
};
assert_eq!(header.status, 200);

let HttpTask::Body(data, eos) = &output[1] else {
panic!("{:?}", output);
};
assert_eq!(data.as_ref().unwrap(), "");
assert!(eos);
parser.finish().unwrap();
}
}
}
2 changes: 0 additions & 2 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ bytes = { workspace = true }
http = { workspace = true }
log = { workspace = true }
h2 = { workspace = true }
lru = { workspace = true }
clap = { version = "3.2.25", features = ["derive"] }
once_cell = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -44,7 +43,6 @@ strum = "0.26.2"
strum_macros = "0.26.2"
libc = "0.2.70"
chrono = { version = "~0.4.31", features = ["alloc"], default-features = false }
thread_local = "1.0"
prometheus = "0.13"
sentry = { version = "0.26", features = [
"backtrace",
Expand Down
5 changes: 5 additions & 0 deletions pingora-core/src/protocols/http/v2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ impl Http2Session {
.or_err(ReadError, "while reading h2 trailers")
}

/// The request header if it is already sent
pub fn request_header(&self) -> Option<&RequestHeader> {
self.req_sent.as_deref()
}

/// The response header if it is already read
pub fn response_header(&self) -> Option<&ResponseHeader> {
self.response_header.as_ref()
Expand Down
Loading