diff --git a/docs/reference/components/sources/http.cue b/docs/reference/components/sources/http.cue index 48f16c9763517..0a039c58b75db 100644 --- a/docs/reference/components/sources/http.cue +++ b/docs/reference/components/sources/http.cue @@ -107,8 +107,38 @@ components: sources: http: { } } } + path: { + common: false + description: "The URL path on which log event POST requests shall be sent." + required: false + type: string: { + default: "/" + examples: ["/event/path", "/logs"] + syntax: "literal" + } + } + strict_path: { + common: false + description: """ + If set to `true`, only requests using the exact URL path specified in `path` will be accepted; + otherwise requests sent to a URL path that starts with the value of `path` will be accepted. + With `strict_path` set to `false` and `path` set to `""`, the configured HTTP source will + accept requests from any URL path. + """ + required: false + type: bool: default: true + } + path_key: { + common: false + description: "The event key in which the requested URL path used to send the request will be stored." + required: false + type: string: { + default: "path" + examples: ["vector_http_path"] + syntax: "literal" + } + } } - output: logs: { text: { description: "An individual line from a `text/plain` request" @@ -122,6 +152,14 @@ components: sources: http: { syntax: "literal" } } + path: { + description: "The HTTP path the event was received from. The key can be changed using the `path_key` configuration setting" + required: true + type: string: { + examples: ["/", "/logs/event712"] + syntax: "literal" + } + } timestamp: fields._current_timestamp } } @@ -135,6 +173,14 @@ components: sources: http: { required: false type: "*": {} } + path: { + description: "The HTTP path the event was received from. The key can be changed using the `path_key` configuration setting" + required: true + type: string: { + examples: ["/", "/logs/event712"] + syntax: "literal" + } + } timestamp: fields._current_timestamp } } @@ -142,6 +188,7 @@ components: sources: http: { examples: [ { + _path: "/" _line: "Hello world" _user_agent: "my-service/v2.1" title: "text/plain" @@ -152,24 +199,28 @@ components: sources: http: { headers: ["User-Agent"] } input: """ - ```http - Content-Type: text/plain - User-Agent: \( _user_agent ) - X-Forwarded-For: \( _values.local_host ) + ```http + POST \( _path ) HTTP/1.1 + Content-Type: text/plain + User-Agent: \( _user_agent ) + X-Forwarded-For: \( _values.local_host ) - \( _line ) - ``` - """ + \( _line ) + ``` + """ output: [{ log: { host: _values.local_host message: _line timestamp: _values.current_timestamp + path: _path "User-Agent": _user_agent } }] }, { + _path: "/events" + _path_key: "vector_http_path" _line: "{\"key\": \"val\"}" _user_agent: "my-service/v2.1" title: "application/json" @@ -178,21 +229,24 @@ components: sources: http: { address: "0.0.0.0:\(_port)" encoding: "json" headers: ["User-Agent"] + _path: _path + path_key: _path_key } input: """ - ```http - Content-Type: application/json - User-Agent: \( _user_agent ) - X-Forwarded-For: \( _values.local_host ) - - \( _line ) - ``` - """ + ```http + POST \( _path ) HTTP/1.1 + Content-Type: application/json + User-Agent: \( _user_agent ) + X-Forwarded-For: \( _values.local_host ) + \( _line ) + ``` + """ output: [{ log: { host: _values.local_host key: "val" timestamp: _values.current_timestamp + _path_key: _path "User-Agent": _user_agent } }] diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index c807a061c491c..f8b97b10fff08 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -62,6 +62,7 @@ impl HttpSource for LogplexSource { body: Bytes, header_map: HeaderMap, query_parameters: HashMap, + _full_path: &str, ) -> Result, ErrorMessage> { decode_message(body, header_map) .map(|events| add_query_parameters(events, &self.query_parameters, query_parameters)) @@ -81,7 +82,15 @@ impl SourceConfig for LogplexConfig { let source = LogplexSource { query_parameters: self.query_parameters.clone(), }; - source.run(self.address, "events", &self.tls, &self.auth, out, shutdown) + source.run( + self.address, + "events", + true, + &self.tls, + &self.auth, + out, + shutdown, + ) } fn output_type(&self) -> DataType { diff --git a/src/sources/http.rs b/src/sources/http.rs index 773f9720aaae3..7204b6954d7d8 100644 --- a/src/sources/http.rs +++ b/src/sources/http.rs @@ -30,6 +30,12 @@ pub struct SimpleHttpConfig { query_parameters: Vec, tls: Option, auth: Option, + #[serde(default = "crate::serde::default_true")] + strict_path: bool, + #[serde(default = "default_path")] + path: String, + #[serde(default = "default_path_key")] + path_key: String, } inventory::submit! { @@ -39,22 +45,34 @@ inventory::submit! { impl GenerateConfig for SimpleHttpConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { - address: "0.0.0.0:80".parse().unwrap(), + address: "0.0.0.0:8080".parse().unwrap(), encoding: Default::default(), headers: Vec::new(), query_parameters: Vec::new(), tls: None, auth: None, + path_key: "path".to_string(), + path: "/".to_string(), + strict_path: true, }) .unwrap() } } +pub fn default_path() -> String { + "/".to_string() +} + +pub fn default_path_key() -> String { + "path".to_string() +} + #[derive(Clone)] struct SimpleHttpSource { encoding: Encoding, headers: Vec, query_parameters: Vec, + path_key: String, } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative, Copy)] @@ -73,10 +91,12 @@ impl HttpSource for SimpleHttpSource { body: Bytes, header_map: HeaderMap, query_parameters: HashMap, + request_path: &str, ) -> Result, ErrorMessage> { decode_body(body, self.encoding) .map(|events| add_headers(events, &self.headers, header_map)) .map(|events| add_query_parameters(events, &self.query_parameters, query_parameters)) + .map(|events| add_path(events, self.path_key.as_str(), request_path)) .map(|mut events| { // Add source type let key = log_schema().source_type_key(); @@ -102,8 +122,17 @@ impl SourceConfig for SimpleHttpConfig { encoding: self.encoding, headers: self.headers.clone(), query_parameters: self.query_parameters.clone(), + path_key: self.path_key.clone(), }; - source.run(self.address, "", &self.tls, &self.auth, out, shutdown) + source.run( + self.address, + &self.path.as_str(), + self.strict_path, + &self.tls, + &self.auth, + out, + shutdown, + ) } fn output_type(&self) -> DataType { @@ -119,6 +148,16 @@ impl SourceConfig for SimpleHttpConfig { } } +fn add_path(mut events: Vec, key: &str, path: &str) -> Vec { + for event in events.iter_mut() { + event + .as_mut_log() + .insert(key, Value::from(path.to_string())); + } + + events +} + fn add_headers( mut events: Vec, headers_config: &[String], @@ -261,9 +300,14 @@ mod tests { encoding: Encoding, headers: Vec, query_parameters: Vec, + path_key: &str, + path: &str, + strict_path: bool, ) -> (mpsc::Receiver, SocketAddr) { let (sender, recv) = Pipeline::new_test(); let address = next_addr(); + let path = path.to_owned(); + let path_key = path_key.to_owned(); tokio::spawn(async move { SimpleHttpConfig { address, @@ -272,6 +316,9 @@ mod tests { query_parameters, tls: None, auth: None, + strict_path, + path_key, + path, } .build( "default", @@ -322,6 +369,17 @@ mod tests { .as_u16() } + async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 { + reqwest::Client::new() + .post(&format!("http://{}{}", address, path)) + .body(body.to_owned()) + .send() + .await + .unwrap() + .status() + .as_u16() + } + async fn send_bytes(address: SocketAddr, body: Vec, headers: HeaderMap) -> u16 { reqwest::Client::new() .post(&format!("http://{}/", address)) @@ -340,7 +398,7 @@ mod tests { let body = "test body\n\ntest body 2"; - let (rx, addr) = source(Encoding::default(), vec![], vec![]).await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; assert_eq!(200, send(addr, body).await); @@ -351,6 +409,7 @@ mod tests { assert_eq!(log[log_schema().message_key()], "test body".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } { let event = events.remove(0); @@ -358,6 +417,7 @@ mod tests { assert_eq!(log[log_schema().message_key()], "test body 2".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } } @@ -368,7 +428,7 @@ mod tests { //same as above test but with a newline at the end let body = "test body\n\ntest body 2\n"; - let (rx, addr) = source(Encoding::default(), vec![], vec![]).await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; assert_eq!(200, send(addr, body).await); @@ -379,6 +439,7 @@ mod tests { assert_eq!(log[log_schema().message_key()], "test body".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } { let event = events.remove(0); @@ -386,6 +447,7 @@ mod tests { assert_eq!(log[log_schema().message_key()], "test body 2".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } } @@ -393,7 +455,7 @@ mod tests { async fn http_json_parsing() { trace_init(); - let (rx, addr) = source(Encoding::Json, vec![], vec![]).await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; assert_eq!(400, send(addr, "{").await); //malformed assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value @@ -418,7 +480,7 @@ mod tests { async fn http_json_values() { trace_init(); - let (rx, addr) = source(Encoding::Json, vec![], vec![]).await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await); assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await); @@ -430,6 +492,7 @@ mod tests { assert_eq!(log["key"], "value".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } { let event = events.remove(0); @@ -437,6 +500,7 @@ mod tests { assert_eq!(log["key2"], "value2".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } } @@ -444,7 +508,7 @@ mod tests { async fn http_json_dotted_keys() { trace_init(); - let (rx, addr) = source(Encoding::Json, vec![], vec![]).await; + let (rx, addr) = source(Encoding::Json, vec![], vec![], "http_path", "/", true).await; assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await); assert_eq!( @@ -471,7 +535,7 @@ mod tests { async fn http_ndjson() { trace_init(); - let (rx, addr) = source(Encoding::Ndjson, vec![], vec![]).await; + let (rx, addr) = source(Encoding::Ndjson, vec![], vec![], "http_path", "/", true).await; assert_eq!(400, send(addr, r#"[{"key":"value"}]"#).await); //one object per line @@ -487,6 +551,7 @@ mod tests { assert_eq!(log["key1"], "value1".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } { let event = events.remove(0); @@ -494,6 +559,7 @@ mod tests { assert_eq!(log["key2"], "value2".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); } } @@ -513,6 +579,9 @@ mod tests { "AbsentHeader".to_string(), ], vec![], + "http_path", + "/", + true, ) .await; @@ -529,6 +598,7 @@ mod tests { assert_eq!(log["User-Agent"], "test_client".into()); assert_eq!(log["Upgrade-Insecure-Requests"], "false".into()); assert_eq!(log["AbsentHeader"], Value::Null); + assert_eq!(log["http_path"], "/".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); } @@ -545,6 +615,9 @@ mod tests { "region".to_string(), "absent".to_string(), ], + "http_path", + "/", + true, ) .await; @@ -561,6 +634,7 @@ mod tests { assert_eq!(log["source"], "staging".into()); assert_eq!(log["region"], "gb".into()); assert_eq!(log["absent"], Value::Null); + assert_eq!(log["http_path"], "/".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); } @@ -583,7 +657,7 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap()); - let (rx, addr) = source(Encoding::default(), vec![], vec![]).await; + let (rx, addr) = source(Encoding::default(), vec![], vec![], "http_path", "/", true).await; assert_eq!(200, send_bytes(addr, body, headers).await); @@ -594,6 +668,96 @@ mod tests { assert_eq!(log[log_schema().message_key()], "test body".into()); assert!(log.get(log_schema().timestamp_key()).is_some()); assert_eq!(log[log_schema().source_type_key()], "http".into()); + assert_eq!(log["http_path"], "/".into()); + } + } + + #[tokio::test] + async fn http_path() { + trace_init(); + let (rx, addr) = source( + Encoding::Ndjson, + vec![], + vec![], + "vector_http_path", + "/event/path", + true, + ) + .await; + + assert_eq!( + 200, + send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await + ); + + let mut events = collect_n(rx, 1).await; + { + let event = events.remove(0); + let log = event.as_log(); + assert_eq!(log["key1"], "value1".into()); + assert_eq!(log["vector_http_path"], "/event/path".into()); + assert!(log.get(log_schema().timestamp_key()).is_some()); + assert_eq!(log[log_schema().source_type_key()], "http".into()); } } + + #[tokio::test] + async fn http_path_no_restriction() { + trace_init(); + let (rx, addr) = source( + Encoding::Ndjson, + vec![], + vec![], + "vector_http_path", + "/event", + false, + ) + .await; + + assert_eq!( + 200, + send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await + ); + assert_eq!( + 200, + send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await + ); + + let mut events = collect_n(rx, 2).await; + { + let event = events.remove(0); + let log = event.as_log(); + assert_eq!(log["key1"], "value1".into()); + assert_eq!(log["vector_http_path"], "/event/path1".into()); + assert!(log.get(log_schema().timestamp_key()).is_some()); + assert_eq!(log[log_schema().source_type_key()], "http".into()); + } + { + let event = events.remove(0); + let log = event.as_log(); + assert_eq!(log["key2"], "value2".into()); + assert_eq!(log["vector_http_path"], "/event/path2".into()); + assert!(log.get(log_schema().timestamp_key()).is_some()); + assert_eq!(log[log_schema().source_type_key()], "http".into()); + } + } + + #[tokio::test] + async fn http_wrong_path() { + trace_init(); + let (_rx, addr) = source( + Encoding::Ndjson, + vec![], + vec![], + "vector_http_path", + "/", + true, + ) + .await; + + assert_eq!( + 404, + send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await + ); + } } diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 8b89619933d82..5c40b523fb680 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -54,7 +54,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig { out: Pipeline, ) -> crate::Result { let source = RemoteWriteSource; - source.run(self.address, "", &self.tls, &self.auth, out, shutdown) + source.run(self.address, "", true, &self.tls, &self.auth, out, shutdown) } fn output_type(&self) -> crate::config::DataType { @@ -95,6 +95,7 @@ impl HttpSource for RemoteWriteSource { mut body: Bytes, header_map: HeaderMap, _query_parameters: HashMap, + _full_path: &str, ) -> Result, ErrorMessage> { // If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us // se we need to. diff --git a/src/sources/util/http.rs b/src/sources/util/http.rs index 2e2926f37c421..5648dbee535c6 100644 --- a/src/sources/util/http.rs +++ b/src/sources/util/http.rs @@ -15,7 +15,7 @@ use snap::raw::Decoder as SnappyDecoder; use std::{collections::HashMap, convert::TryFrom, error::Error, fmt, io::Read, net::SocketAddr}; use tracing_futures::Instrument; use warp::{ - filters::BoxedFilter, + filters::{path::FullPath, path::Tail, BoxedFilter}, http::{HeaderMap, StatusCode}, reject::Rejection, Filter, @@ -180,12 +180,14 @@ pub trait HttpSource: Clone + Send + Sync + 'static { body: Bytes, header_map: HeaderMap, query_parameters: HashMap, + path: &str, ) -> Result, ErrorMessage>; fn run( self, address: SocketAddr, - path: &'static str, + path: &str, + strict_path: bool, tls: &Option, auth: &Option, out: Pipeline, @@ -193,24 +195,35 @@ pub trait HttpSource: Clone + Send + Sync + 'static { ) -> crate::Result { let tls = MaybeTlsSettings::from_config(tls, true)?; let auth = HttpSourceAuth::try_from(auth.as_ref())?; + let path = path.to_owned(); Ok(Box::pin(async move { let span = crate::trace::current_span(); - let mut filter: BoxedFilter<()> = warp::post().boxed(); - if !path.is_empty() && path != "/" { - for s in path.split('/') { - filter = filter.and(warp::path(s)).boxed(); - } + for s in path.split('/').filter(|&x| !x.is_empty()) { + filter = filter.and(warp::path(s.to_string())).boxed() } let svc = filter - .and(warp::path::end()) + .and(warp::path::tail()) + .and_then(move |tail: Tail| async move { + if !strict_path || tail.as_str().is_empty() { + Ok(()) + } else { + debug!(message = "Path rejected."); + Err(warp::reject::custom( + ErrorMessage::new(StatusCode::NOT_FOUND, + "Not found".to_string()) + )) + } + }).untuple_one() + .and(warp::path::full()) .and(warp::header::optional::("authorization")) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) .and(warp::body::bytes()) .and(warp::query::>()) .and_then( - move |auth_header, + move |path: FullPath, + auth_header, encoding_header, headers: HeaderMap, body: Bytes, @@ -219,13 +232,12 @@ pub trait HttpSource: Clone + Send + Sync + 'static { debug!(message = "Handling HTTP request.", headers = ?headers); let mut out = out.clone(); - let events = auth .is_valid(&auth_header) .and_then(|()| decode(&encoding_header, body)) .and_then(|body| { let body_len=body.len(); - self.build_event(body, headers, query_parameters) + self.build_event(body, headers, query_parameters, path.as_str()) .map(|events| (events, body_len)) });