From 8b486cf538ebef5a3100196b9833b78de6fc4b00 Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Mon, 11 Nov 2024 10:11:48 -0800 Subject: [PATCH 1/6] Check content-length on end of stream h2 header The h2 crate does not appear to check for content-length underflow if only a header frame is received with END_STREAM. --- .bleep | 2 +- pingora-core/src/protocols/http/v2/client.rs | 5 ++++ pingora-proxy/src/proxy_h2.rs | 26 ++++++++++++++++++++ pingora-proxy/tests/test_basic.rs | 24 ++++++++++++++++++ 4 files changed, 56 insertions(+), 1 deletion(-) diff --git a/.bleep b/.bleep index 22125bdd3..ce76b7233 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -ef8c591ce876042b96a4efe6331606f73e2f124d \ No newline at end of file +92b5cf31efbead84c6e0ff2f4769cf540df3516e diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 7cc112d9b..5491d4f48 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -307,6 +307,11 @@ impl Http2Session { .or_err(ReadError, "while reading h2 trailers") } + /// The request header if it is already sent + pub fn request_header(&self) -> Option<&RequestHeader> { + self.req_sent.as_deref() + } + /// The response header if it is already read pub fn response_header(&self) -> Option<&ResponseHeader> { self.response_header.as_ref() diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index fad91c252..ba832b247 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -15,6 +15,7 @@ use super::*; use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache}; use crate::proxy_common::*; +use http::{header::CONTENT_LENGTH, Method, StatusCode}; use pingora_core::protocols::http::v2::client::{write_body, Http2Session}; // add scheme and authority as required by h2 lib @@ -571,6 +572,31 @@ pub(crate) async fn pipe_2to1_response( match client.check_response_end_or_error() { Ok(eos) => { + // XXX: the h2 crate won't check for content-length underflow + // if a header frame with END_STREAM is sent without data frames + // As stated by RFC, "204 or 304 responses contain no content, + // as does the response to a HEAD request" + // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1 + let req_header = client.request_header().expect("must have sent req"); + if eos + && req_header.method != Method::HEAD + && resp_header.status != StatusCode::NO_CONTENT + && resp_header.status != StatusCode::NOT_MODIFIED + // RFC technically allows for leading zeroes + // https://datatracker.ietf.org/doc/html/rfc9110#name-content-length + && resp_header + .headers + .get(CONTENT_LENGTH) + .is_some_and(|cl| cl.as_bytes().iter().any(|b| *b != b'0')) + { + let _ = tx + .send(HttpTask::Failed( + Error::explain(H2Error, "non-zero content-length on EOS headers frame") + .into_up(), + )) + .await; + return Ok(()); + } tx.send(HttpTask::Header(resp_header, eos)) .await .or_err(InternalError, "sending h2 headers to pipe")?; diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index e305b5e32..ca1788c2e 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -269,6 +269,30 @@ async fn test_h2_to_h1_upload() { assert_eq!(body, payload); } +#[tokio::test] +#[cfg(feature = "any_tls")] +async fn test_h2_head() { + init(); + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .build() + .unwrap(); + + let res = client + .head("https://127.0.0.1:6150/set_content_length") + .header("sni", "openrusty.org") + .header("x-h2", "true") + .header("x-set-content-length", "11") + .send() + .await + .unwrap(); + assert_eq!(res.status(), reqwest::StatusCode::OK); + assert_eq!(res.version(), reqwest::Version::HTTP_2); + let body = res.text().await.unwrap(); + // should not be any body, despite content-length + assert_eq!(body, ""); +} + #[cfg(unix)] #[tokio::test] async fn test_simple_proxy_uds() { From cb3f7e89f31668587550c4c487140263fb6dfa29 Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Sun, 3 Nov 2024 20:40:16 +0000 Subject: [PATCH 2/6] Prune unused deps and run cargo-machete in ci Includes-commit: 7cee7a7e368a6e445903af1d59d92c680948ed2b Replicated-from: https://github.com/cloudflare/pingora/pull/454 --- .bleep | 2 +- pingora-core/Cargo.toml | 2 -- pingora-openssl/Cargo.toml | 1 - pingora-proxy/Cargo.toml | 1 - tinyufo/Cargo.toml | 1 - 5 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.bleep b/.bleep index ce76b7233..be76ff380 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -92b5cf31efbead84c6e0ff2f4769cf540df3516e +6b27f3641cf486ae8742129b9e4e336d825218ea diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index 529e3f52b..350be2f89 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -35,7 +35,6 @@ bytes = { workspace = true } http = { workspace = true } log = { workspace = true } h2 = { workspace = true } -lru = { workspace = true } clap = { version = "3.2.25", features = ["derive"] } once_cell = { workspace = true } serde = { version = "1.0", features = ["derive"] } @@ -44,7 +43,6 @@ strum = "0.26.2" strum_macros = "0.26.2" libc = "0.2.70" chrono = { version = "~0.4.31", features = ["alloc"], default-features = false } -thread_local = "1.0" prometheus = "0.13" sentry = { version = "0.26", features = [ "backtrace", diff --git a/pingora-openssl/Cargo.toml b/pingora-openssl/Cargo.toml index adea1162b..3ddb59e72 100644 --- a/pingora-openssl/Cargo.toml +++ b/pingora-openssl/Cargo.toml @@ -19,7 +19,6 @@ path = "src/lib.rs" [dependencies] openssl-sys = "0.9" openssl = { version = "0.10", features = ["vendored"] } -openssl-src = { version = "300", features = ["weak-crypto"] } tokio-openssl = { version = "0.6" } libc = "0.2.70" foreign-types = { version = "0.3"} diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index 4784dea2e..731579dfc 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -20,7 +20,6 @@ path = "src/lib.rs" [dependencies] pingora-error = { version = "0.4.0", path = "../pingora-error" } pingora-core = { version = "0.4.0", path = "../pingora-core", default-features = false } -pingora-timeout = { version = "0.4.0", path = "../pingora-timeout" } pingora-cache = { version = "0.4.0", path = "../pingora-cache", default-features = false } tokio = { workspace = true, features = ["macros", "net"] } pingora-http = { version = "0.4.0", path = "../pingora-http" } diff --git a/tinyufo/Cargo.toml b/tinyufo/Cargo.toml index 9846c10d7..335dfc5bc 100644 --- a/tinyufo/Cargo.toml +++ b/tinyufo/Cargo.toml @@ -18,7 +18,6 @@ path = "src/lib.rs" [dependencies] ahash = { workspace = true } flurry = "0.5" -parking_lot = "0" crossbeam-queue = "0" crossbeam-skiplist = "0" From e22b65c917a28366ed788a4ddb24bc3ea7a6f095 Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Tue, 12 Nov 2024 12:57:32 -0800 Subject: [PATCH 3/6] Only cache GET response after bypass After a request is predicted uncacheable and bypasses cache, it may still be cached if the response is a 200. However, HEAD responses are not normally cacheable as they don't include the actual response body. --- .bleep | 2 +- pingora-proxy/src/proxy_cache.rs | 10 ++++--- pingora-proxy/tests/test_upstream.rs | 43 ++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/.bleep b/.bleep index be76ff380..765b13505 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -6b27f3641cf486ae8742129b9e4e336d825218ea +6638da6bb3588e4924e2952b14d72f3b2deb8ab3 diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index f8b003548..4369ec8bb 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::*; -use http::StatusCode; +use http::{Method, StatusCode}; use pingora_cache::key::CacheHashKey; use pingora_cache::lock::LockStatus; use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE; @@ -450,14 +450,16 @@ impl HttpProxy { session.cache.response_became_cacheable(); - if meta.response_header().status == StatusCode::OK { + if session.req_header().method == Method::GET + && meta.response_header().status == StatusCode::OK + { self.inner.cache_miss(session, ctx); } else { // we've allowed caching on the next request, // but do not cache _this_ request if bypassed and not 200 // (We didn't run upstream request cache filters to strip range or condition headers, - // so this could be an uncacheable response e.g. 206 or 304. - // Exclude all non-200 for simplicity, may expand allowable codes in the future.) + // so this could be an uncacheable response e.g. 206 or 304 or HEAD. + // Exclude all non-200/GET for simplicity, may expand allowable codes in the future.) fill_cache = false; session.cache.disable(NoCacheReason::Deferred); } diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index d391b9d98..b2d957586 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -826,6 +826,49 @@ mod test_cache { assert_eq!(headers["x-cache-status"], "deferred"); } + #[tokio::test] + async fn test_bypassed_head() { + init(); + + let url = "http://127.0.0.1:6148/unique/test_bypassed_head/cache_control"; + + // uncacheable, should bypass + let res = reqwest::Client::new() + .get(url) + .header("set-cache-control", "private, max-age=0") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "no-cache"); + assert_eq!(res.text().await.unwrap(), "hello world"); + + // we bypass cache for this next request, becomes cacheable + let res = reqwest::Client::new() + .head(url) + .header("set-cache-control", "public, max-age=10") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + // should not cache the response + assert_eq!(headers["x-cache-status"], "deferred"); + + // MISS + let res = reqwest::Client::new() + .get(url) + .header("set-cache-control", "public, max-age=10") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello world"); + } + #[tokio::test] async fn test_eviction() { init(); From e90ffb527017a22adad51ae65580d88ce0b167dd Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Thu, 14 Nov 2024 17:14:12 -0800 Subject: [PATCH 4/6] Don't parse CachePut payloads as chunked encoding The PUT request body itself may be chunked, but normally clients send the body that ought to be cached without chunked encoding applied. --- .bleep | 2 +- pingora-cache/src/put.rs | 158 ++++----------------------------------- 2 files changed, 15 insertions(+), 145 deletions(-) diff --git a/.bleep b/.bleep index 765b13505..5b88e4a1a 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -6638da6bb3588e4924e2952b14d72f3b2deb8ab3 +ab4b4c023bc3b26dfe253cf43ce410ccd76a6b27 diff --git a/pingora-cache/src/put.rs b/pingora-cache/src/put.rs index 3f864599c..851bb4c69 100644 --- a/pingora-cache/src/put.rs +++ b/pingora-cache/src/put.rs @@ -351,20 +351,17 @@ mod parse_response { ErrorType::{self, *}, }; - pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk"); pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody"); const MAX_HEADERS: usize = 256; const INIT_HEADER_BUF_SIZE: usize = 4096; - const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n #[derive(Debug, Clone, Copy, PartialEq)] enum ParseState { Init, PartialHeader, PartialBodyContentLength(usize, usize), - PartialChunkedBody(usize), - PartialHttp10Body(usize), + PartialBody(usize), Done(usize), Invalid(httparse::Error), } @@ -379,9 +376,7 @@ mod parse_response { fn read_body(&self) -> bool { matches!( self, - Self::PartialBodyContentLength(..) - | Self::PartialChunkedBody(_) - | Self::PartialHttp10Body(_) + Self::PartialBodyContentLength(..) | Self::PartialBody(_) ) } } @@ -509,49 +504,15 @@ mod parse_response { } Ok(Some(self.buf.split_to(end).freeze())) } - PartialChunkedBody(seen) => { - let parsed = httparse::parse_chunk_size(&self.buf).map_err(|e| { - self.state = Done(seen); - Error::explain(INVALID_CHUNK, format!("Invalid chunked encoding: {e:?}")) - })?; - match parsed { - httparse::Status::Complete((header_len, body_len)) => { - // 4\r\nRust\r\n: header: "4\r\n", body: "Rust", "\r\n" - let total_chunk_size = - header_len + body_len as usize + CHUNK_DELIMITER_SIZE; - if self.buf.len() < total_chunk_size { - // wait for the full chunk to be read - // Note that we have to buffer the entire chunk in this design - Ok(None) - } else { - if body_len == 0 { - self.state = Done(seen); - } else { - self.state = PartialChunkedBody(seen + body_len as usize); - } - let mut chunk_bytes = self.buf.split_to(total_chunk_size); - let mut chunk_body = chunk_bytes.split_off(header_len); - chunk_body.truncate(body_len as usize); - // Note that the final 0 sized chunk will return an empty Bytes - // instead of not None - Ok(Some(chunk_body.freeze())) - } - } - httparse::Status::Partial => { - // not even a full chunk, continue waiting for more data - Ok(None) - } - } - } - PartialHttp10Body(seen) => { - self.state = PartialHttp10Body(seen + self.buf.len()); + PartialBody(seen) => { + self.state = PartialBody(seen + self.buf.len()); Ok(Some(self.buf.split().freeze())) } } } pub fn finish(&mut self) -> Result<()> { - if let ParseState::PartialHttp10Body(seen) = self.state { + if let ParseState::PartialBody(seen) = self.state { self.state = ParseState::Done(seen); } if !self.state.is_done() { @@ -572,12 +533,6 @@ mod parse_response { // these status codes cannot have body by definition return ParseState::Done(0); } - if let Some(encoding) = resp.headers.get(http::header::TRANSFER_ENCODING) { - // TODO: case sensitive? - if encoding.as_bytes() == b"chunked" { - return ParseState::PartialChunkedBody(0); - } - } if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) { // ignore invalid header value if let Some(cl) = std::str::from_utf8(cl.as_bytes()) @@ -591,7 +546,10 @@ mod parse_response { }; } } - ParseState::PartialHttp10Body(0) + // HTTP/1.0 and chunked encoding are both treated as PartialBody + // The response body payload should _not_ be chunked encoded + // even if the Transfer-Encoding: chunked header is added + ParseState::PartialBody(0) } #[cfg(test)] @@ -684,7 +642,7 @@ mod parse_response { #[test] fn test_body_chunked() { - let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n"; + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nrust"; let mut parser = ResponseParse::new(); let output = parser.inject_data(input).unwrap(); @@ -700,14 +658,6 @@ mod parse_response { assert_eq!(data.as_ref().unwrap(), "rust"); assert!(!eos); - let output = parser.inject_data(b"0\r\n\r\n").unwrap(); - assert_eq!(output.len(), 1); - let HttpTask::Body(data, eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(data.as_ref().unwrap(), ""); - assert!(eos); - parser.finish().unwrap(); } @@ -755,8 +705,8 @@ mod parse_response { } #[test] - fn test_body_chunked_early() { - let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n"; + fn test_body_chunked_partial_chunk() { + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nru"; let mut parser = ResponseParse::new(); let output = parser.inject_data(input).unwrap(); @@ -769,75 +719,15 @@ mod parse_response { let HttpTask::Body(data, eos) = &output[1] else { panic!("{:?}", output); }; - assert_eq!(data.as_ref().unwrap(), "rust"); + assert_eq!(data.as_ref().unwrap(), "ru"); assert!(!eos); - parser.finish().unwrap_err(); - } - - #[test] - fn test_body_chunked_partial_chunk() { - let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nru"; - let mut parser = ResponseParse::new(); - let output = parser.inject_data(input).unwrap(); - - assert_eq!(output.len(), 1); - let HttpTask::Header(header, _eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(header.status, 200); - let output = parser.inject_data(b"st\r\n").unwrap(); assert_eq!(output.len(), 1); let HttpTask::Body(data, eos) = &output[0] else { panic!("{:?}", output); }; - assert_eq!(data.as_ref().unwrap(), "rust"); - assert!(!eos); - } - - #[test] - fn test_body_chunked_partial_chunk_head() { - let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r"; - let mut parser = ResponseParse::new(); - let output = parser.inject_data(input).unwrap(); - - assert_eq!(output.len(), 1); - let HttpTask::Header(header, _eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(header.status, 200); - - let output = parser.inject_data(b"\nrust\r\n").unwrap(); - assert_eq!(output.len(), 1); - let HttpTask::Body(data, eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(data.as_ref().unwrap(), "rust"); - assert!(!eos); - } - - #[test] - fn test_body_chunked_many_chunks() { - let input = - b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n1\r\ny\r\n"; - let mut parser = ResponseParse::new(); - let output = parser.inject_data(input).unwrap(); - - assert_eq!(output.len(), 3); - let HttpTask::Header(header, _eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(header.status, 200); - let HttpTask::Body(data, eos) = &output[1] else { - panic!("{:?}", output); - }; - assert!(!eos); - assert_eq!(data.as_ref().unwrap(), "rust"); - let HttpTask::Body(data, eos) = &output[2] else { - panic!("{:?}", output); - }; - assert_eq!(data.as_ref().unwrap(), "y"); + assert_eq!(data.as_ref().unwrap(), "st\r\n"); assert!(!eos); } @@ -928,25 +818,5 @@ mod parse_response { assert!(output.is_empty()); parser.finish().unwrap(); } - - #[test] - fn test_no_body_chunked() { - let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n0\r\n\r\n"; - let mut parser = ResponseParse::new(); - let output = parser.inject_data(input).unwrap(); - - assert_eq!(output.len(), 2); - let HttpTask::Header(header, _eos) = &output[0] else { - panic!("{:?}", output); - }; - assert_eq!(header.status, 200); - - let HttpTask::Body(data, eos) = &output[1] else { - panic!("{:?}", output); - }; - assert_eq!(data.as_ref().unwrap(), ""); - assert!(eos); - parser.finish().unwrap(); - } } } From a03a03d444a2bda2618d8e5724948cec7603c5f8 Mon Sep 17 00:00:00 2001 From: Georg Lauterbach <44545919+georglauterbach@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:42:00 +0000 Subject: [PATCH 5/6] update `impl UniqueID` to use correct return type Includes-commit: d26d344e15bd411dd298b32ecc3e60ac233b0866 Replicated-from: https://github.com/cloudflare/pingora/pull/471 Co-authored-by: Matthew Gumport --- .bleep | 2 +- pingora-core/src/protocols/tls/rustls/stream.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bleep b/.bleep index 5b88e4a1a..1f8104d5c 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -ab4b4c023bc3b26dfe253cf43ce410ccd76a6b27 +4cba0f37f5682058c05904a3c5a395919a8ac498 diff --git a/pingora-core/src/protocols/tls/rustls/stream.rs b/pingora-core/src/protocols/tls/rustls/stream.rs index a2c681123..7361bdb88 100644 --- a/pingora-core/src/protocols/tls/rustls/stream.rs +++ b/pingora-core/src/protocols/tls/rustls/stream.rs @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime}; use crate::listeners::tls::Acceptor; use crate::protocols::raw_connect::ProxyDigest; -use crate::protocols::{tls::SslDigest, Peek, TimingDigest}; +use crate::protocols::{tls::SslDigest, Peek, TimingDigest, UniqueIDType}; use crate::protocols::{ GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, UniqueID, ALPN, }; @@ -216,7 +216,7 @@ impl UniqueID for TlsStream where T: UniqueID, { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.tls.stream.as_ref().unwrap().get_ref().0.id() } } From 725329fa717b878615e2d207165e0573bcd09ee5 Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Mon, 11 Nov 2024 18:09:55 -0800 Subject: [PATCH 6/6] Release cache lock after proxy upstream filter, serve stale The cache locks may be held after serving stale, proxy upstream filter, or revalidate uncacheable resulting in dangling cache locks. Also only disable cache on final error if cache was not already disabled, and add DeclinedToUpstream / UpstreamError no cache reasons. --- .bleep | 2 +- pingora-cache/src/lib.rs | 70 +++++++++++++++++++++----------- pingora-cache/src/predictor.rs | 2 +- pingora-proxy/src/lib.rs | 17 +++++++- pingora-proxy/src/proxy_cache.rs | 17 +++++++- pingora-proxy/src/proxy_h2.rs | 1 + 6 files changed, 81 insertions(+), 28 deletions(-) diff --git a/.bleep b/.bleep index 1f8104d5c..d1591c7e5 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -4cba0f37f5682058c05904a3c5a395919a8ac498 +89319e9383d6f99066dfeace750a553d45e1c167 diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index 6bebbd316..6ac49a590 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -125,8 +125,12 @@ pub enum NoCacheReason { /// /// This happens when the cache predictor predicted that this request is not cacheable, but /// the response turns out to be OK to cache. However, it might be too large to re-enable caching - /// for this request. + /// for this request Deferred, + /// Due to the proxy upstream filter declining the current request from going upstream + DeclinedToUpstream, + /// Due to the upstream being unreachable or otherwise erroring during proxying + UpstreamError, /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache) CacheLockGiveUp, /// This request waited too long for the writer of the cache lock to finish, so this request will @@ -147,6 +151,8 @@ impl NoCacheReason { StorageError => "StorageError", InternalError => "InternalError", Deferred => "Deferred", + DeclinedToUpstream => "DeclinedToUpstream", + UpstreamError => "UpstreamError", CacheLockGiveUp => "CacheLockGiveUp", CacheLockTimeout => "CacheLockTimeout", Custom(s) => s, @@ -299,9 +305,44 @@ impl HttpCache { .is_some() } + /// Release the cache lock if the current request is a cache writer. + /// + /// Generally callers should prefer using `disable` when a cache lock should be released + /// due to an error to clear all cache context. This function is for releasing the cache lock + /// while still keeping the cache around for reading, e.g. when serving stale. + pub fn release_write_lock(&mut self, reason: NoCacheReason) { + use NoCacheReason::*; + if let Some(inner) = self.inner.as_mut() { + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + let lock_status = match reason { + // let the next request try to fetch it + InternalError | StorageError | Deferred | UpstreamError => { + LockStatus::TransientError + } + // depends on why the proxy upstream filter declined the request, + // for now still allow next request try to acquire to avoid thundering herd + DeclinedToUpstream => LockStatus::TransientError, + // no need for the lock anymore + OriginNotCache | ResponseTooLarge => LockStatus::GiveUp, + // not sure which LockStatus make sense, we treat it as GiveUp for now + Custom(_) => LockStatus::GiveUp, + // should never happen, NeverEnabled shouldn't hold a lock + NeverEnabled => panic!("NeverEnabled holds a write lock"), + CacheLockGiveUp | CacheLockTimeout => { + panic!("CacheLock* are for cache lock readers only") + } + }; + inner + .cache_lock + .unwrap() + .release(inner.key.as_ref().unwrap(), lock_status); + } + } + } + /// Disable caching pub fn disable(&mut self, reason: NoCacheReason) { - use NoCacheReason::*; match self.phase { CachePhase::Disabled(_) => { // replace reason @@ -309,28 +350,7 @@ impl HttpCache { } _ => { self.phase = CachePhase::Disabled(reason); - if let Some(inner) = self.inner.as_mut() { - let lock = inner.lock.take(); - if let Some(Locked::Write(_r)) = lock { - let lock_status = match reason { - // let the next request try to fetch it - InternalError | StorageError | Deferred => LockStatus::TransientError, - // no need for the lock anymore - OriginNotCache | ResponseTooLarge => LockStatus::GiveUp, - // not sure which LockStatus make sense, we treat it as GiveUp for now - Custom(_) => LockStatus::GiveUp, - // should never happen, NeverEnabled shouldn't hold a lock - NeverEnabled => panic!("NeverEnabled holds a write lock"), - CacheLockGiveUp | CacheLockTimeout => { - panic!("CacheLock* are for cache lock readers only") - } - }; - inner - .cache_lock - .unwrap() - .release(inner.key.as_ref().unwrap(), lock_status); - } - } + self.release_write_lock(reason); // log initial disable reason self.inner_mut() .traces @@ -824,6 +844,8 @@ impl HttpCache { CachePhase::Stale => { // replace cache meta header self.inner_mut().meta.as_mut().unwrap().0.header = header; + // upstream request done, release write lock + self.release_write_lock(reason); } _ => panic!("wrong phase {:?}", self.phase), } diff --git a/pingora-cache/src/predictor.rs b/pingora-cache/src/predictor.rs index df8f37437..d131c20d3 100644 --- a/pingora-cache/src/predictor.rs +++ b/pingora-cache/src/predictor.rs @@ -120,7 +120,7 @@ where // CacheLockGiveUp: the writer will set OriginNotCache (if applicable) // readers don't need to do it NeverEnabled | StorageError | InternalError | Deferred | CacheLockGiveUp - | CacheLockTimeout => { + | CacheLockTimeout | DeclinedToUpstream | UpstreamError => { return None; } // Skip certain NoCacheReason::Custom according to user diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index f13a1f796..731cdb419 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -535,6 +535,10 @@ impl HttpProxy { if !proxy_to_upstream { // The hook can choose to write its own response, but if it doesn't, we respond // with a generic 502 + if session.cache.enabled() { + // drop the cache lock that this request may be holding onto + session.cache.disable(NoCacheReason::DeclinedToUpstream); + } if session.response_written().is_none() { match session.write_response_header_ref(&BAD_GATEWAY).await { Ok(()) => {} @@ -557,6 +561,10 @@ impl HttpProxy { /* else continue */ } Err(e) => { + if session.cache.enabled() { + session.cache.disable(NoCacheReason::InternalError); + } + self.handle_error( &mut session, &mut ctx, @@ -621,7 +629,14 @@ impl HttpProxy { if let Some(e) = final_error.as_ref() { // If we have errored and are still holding a cache lock, release it. - session.cache.disable(NoCacheReason::InternalError); + if session.cache.enabled() { + let reason = if *e.esource() == ErrorSource::Upstream { + NoCacheReason::UpstreamError + } else { + NoCacheReason::InternalError + }; + session.cache.disable(reason); + } let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await; // final error will have > 0 status unless downstream connection is dead diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index 4369ec8bb..8957bc763 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -667,8 +667,18 @@ impl HttpProxy { None, None, ); - self.inner + if self + .inner .should_serve_stale(session, ctx, Some(&http_status_error)) + { + // no more need to keep the write lock + session + .cache + .release_write_lock(NoCacheReason::UpstreamError); + true + } else { + false + } } else { false // not 304, not stale if error status code } @@ -712,6 +722,11 @@ impl HttpProxy { self.inner.request_summary(session, ctx) ); + // no more need to hang onto the cache lock + session + .cache + .release_write_lock(NoCacheReason::UpstreamError); + Some(self.proxy_cache_hit(session, ctx).await) } diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index ba832b247..a27c559b4 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -409,6 +409,7 @@ impl HttpProxy { .cache_http_task(session, &task, ctx, serve_from_cache) .await { + session.cache.disable(NoCacheReason::StorageError); if serve_from_cache.is_miss_body() { // if the response stream cache body during miss but write fails, it has to // give up the entire request