diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 905e999110a..3307d0e6fca 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -1684,6 +1684,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -5959,6 +5979,7 @@ dependencies = [ "serde", "thiserror", "tokio", + "tokio-inherit-task-local", "tonic", "tower", "tracing", @@ -6134,6 +6155,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-inherit-task-local", "tokio-metrics", "tokio-stream", "tonic", @@ -6621,6 +6643,7 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "biscuit-auth", "bytes", "bytesize", "bytestring", @@ -6772,6 +6795,7 @@ dependencies = [ "prost 0.11.9", "prost-types 0.11.9", "quickwit-actors", + "quickwit-authorize", "quickwit-cluster", "quickwit-common", "quickwit-config", @@ -8888,6 +8912,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-inherit-task-local" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d42db185acdff44279cff7f8765608129ae4a01a2f955008a4f96054c75e77ac" +dependencies = [ + "const-random", + "tokio", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index c8878d3c515..f2578232b1e 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -240,6 +240,7 @@ tikv-jemalloc-ctl = "0.5" tikv-jemallocator = "0.5" time = { version = "0.3", features = ["std", "formatting", "macros"] } tokio = { version = "1.40", features = ["full"] } +tokio-inherit-task-local = "0.2" tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["full"] } diff --git a/quickwit/quickwit-authorize/Cargo.toml b/quickwit/quickwit-authorize/Cargo.toml index 4c3985fcff3..2f6fef2c244 100644 --- a/quickwit/quickwit-authorize/Cargo.toml +++ b/quickwit/quickwit-authorize/Cargo.toml @@ -13,6 +13,7 @@ tower = { workspace = true} biscuit-auth = { workspace = true, optional=true } futures = { workspace = true } http = { workspace = true } +tokio-inherit-task-local = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } tonic = { workspace = true } diff --git a/quickwit/quickwit-authorize/src/community.rs b/quickwit/quickwit-authorize/src/community/mod.rs similarity index 100% rename from quickwit/quickwit-authorize/src/community.rs rename to quickwit/quickwit-authorize/src/community/mod.rs diff --git a/quickwit/quickwit-authorize/src/authorization_layer.rs b/quickwit/quickwit-authorize/src/enterprise/authorization_layer.rs similarity index 63% rename from quickwit/quickwit-authorize/src/authorization_layer.rs rename to quickwit/quickwit-authorize/src/enterprise/authorization_layer.rs index 3131bef4715..ae29555ee02 100644 --- a/quickwit/quickwit-authorize/src/authorization_layer.rs +++ b/quickwit/quickwit-authorize/src/enterprise/authorization_layer.rs @@ -1,3 +1,22 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + use std::fmt; use std::task::{Context, Poll}; @@ -7,6 +26,7 @@ use tower::{Layer, Service}; use crate::AuthorizationError; +#[derive(Clone, Copy, Debug)] pub struct AuthorizationLayer; impl Layer for AuthorizationLayer { diff --git a/quickwit/quickwit-authorize/src/enterprise/authorization_token_extraction_layer.rs b/quickwit/quickwit-authorize/src/enterprise/authorization_token_extraction_layer.rs new file mode 100644 index 00000000000..2fc4b65923c --- /dev/null +++ b/quickwit/quickwit-authorize/src/enterprise/authorization_token_extraction_layer.rs @@ -0,0 +1,72 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::task::{Context, Poll}; + +use futures::future::Either; +use http::Request; +use tokio::task::futures::TaskLocalFuture; +use tokio_inherit_task_local::TaskLocalInheritableTable; +use tower::{Layer, Service}; + +use super::AuthorizationToken; + +#[derive(Clone, Copy, Debug)] +pub struct AuthorizationTokenExtractionLayer; + +impl Layer for AuthorizationTokenExtractionLayer { + type Service = AuthorizationTokenExtractionService; + + fn layer(&self, service: S) -> Self::Service { + AuthorizationTokenExtractionService { service } + } +} + +#[derive(Clone)] +pub struct AuthorizationTokenExtractionService { + service: S, +} + +fn get_authorization_token_opt(headers: &http::HeaderMap) -> Option { + let authorization_header_value = headers.get("Authorization")?; + let authorization_header_str = authorization_header_value.to_str().ok()?; + crate::get_auth_token_from_str(authorization_header_str).ok() +} + +impl Service> for AuthorizationTokenExtractionService +where S: Service> +{ + type Response = S::Response; + type Error = S::Error; + type Future = Either>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let authorization_token_opt = get_authorization_token_opt(request.headers()); + let fut = self.service.call(request); + if let Some(authorization_token) = authorization_token_opt { + Either::Right(crate::execute_with_authorization(authorization_token, fut)) + } else { + Either::Left(fut) + } + } +} diff --git a/quickwit/quickwit-authorize/src/enterprise.rs b/quickwit/quickwit-authorize/src/enterprise/mod.rs similarity index 93% rename from quickwit/quickwit-authorize/src/enterprise.rs rename to quickwit/quickwit-authorize/src/enterprise/mod.rs index e1aa02e4436..24989412b3a 100644 --- a/quickwit/quickwit-authorize/src/enterprise.rs +++ b/quickwit/quickwit-authorize/src/enterprise/mod.rs @@ -19,12 +19,19 @@ // components are licensed under the original license provided by the owner of the // applicable component. +mod authorization_layer; +mod authorization_token_extraction_layer; + use std::future::Future; use std::str::FromStr; use std::sync::{Arc, OnceLock}; +pub use authorization_layer::AuthorizationLayer; +pub use authorization_token_extraction_layer::AuthorizationTokenExtractionLayer; use biscuit_auth::macros::authorizer; use biscuit_auth::{Authorizer, Biscuit, RootKeyProvider}; +use tokio::task::futures::TaskLocalFuture; +use tokio_inherit_task_local::TaskLocalInheritableTable; use crate::AuthorizationError; @@ -79,7 +86,7 @@ impl FromStr for AuthorizationToken { } } -tokio::task_local! { +tokio_inherit_task_local::inheritable_task_local! { pub static AUTHORIZATION_TOKEN: AuthorizationToken; } @@ -146,6 +153,16 @@ impl From for AuthorizationError { } } +pub fn get_auth_token_from_str( + authorization_header_value: &str, +) -> Result { + let authorization_token_str: &str = authorization_header_value + .strip_prefix(AUTHORIZATION_VALUE_PREFIX) + .ok_or(AuthorizationError::InvalidToken)?; + let biscuit: Biscuit = Biscuit::from_base64(authorization_token_str, get_root_key_provider())?; + Ok(AuthorizationToken(biscuit)) +} + pub fn get_auth_token( req_metadata: &tonic::metadata::MetadataMap, ) -> Result { @@ -154,11 +171,7 @@ pub fn get_auth_token( .ok_or(AuthorizationError::AuthorizationTokenMissing)? .to_str() .map_err(|_| AuthorizationError::InvalidToken)?; - let authorization_token_str: &str = authorization_header_value - .strip_prefix(AUTHORIZATION_VALUE_PREFIX) - .ok_or(AuthorizationError::InvalidToken)?; - let biscuit: Biscuit = Biscuit::from_base64(authorization_token_str, get_root_key_provider())?; - Ok(AuthorizationToken(biscuit)) + get_auth_token_from_str(authorization_header_value) } pub fn set_auth_token( @@ -224,7 +237,7 @@ pub fn authorize_request(req: &R) -> Result<(), AuthorizationE pub fn execute_with_authorization( token: AuthorizationToken, f: F, -) -> impl Future +) -> TaskLocalFuture where F: Future, { diff --git a/quickwit/quickwit-authorize/src/lib.rs b/quickwit/quickwit-authorize/src/lib.rs index 23206c0b434..3e0a7bb5ca4 100644 --- a/quickwit/quickwit-authorize/src/lib.rs +++ b/quickwit/quickwit-authorize/src/lib.rs @@ -17,14 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod authorization_layer; - #[cfg(not(feature = "enterprise"))] -#[path = "community.rs"] +#[path = "community/mod.rs"] mod implementation; #[cfg(feature = "enterprise")] -#[path = "enterprise.rs"] +#[path = "enterprise/mod.rs"] mod implementation; pub use implementation::*; diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 64bf88dabda..ec528e64ea9 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -79,7 +79,7 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } [features] -enterprise = ["quickwit-config/enterprise", "quickwit-ingest/enterprise", "quickwit-proto/enterprise"] +enterprise = ["quickwit-config/enterprise", "quickwit-ingest/enterprise", "quickwit-proto/enterprise", "quickwit-serve/enterprise"] jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"] ci-test = [] pprof = ["quickwit-serve/pprof"] diff --git a/quickwit/quickwit-codegen/example/src/authorization.rs b/quickwit/quickwit-codegen/example/src/authorization.rs index 509fed82f0f..1d0a000066a 100644 --- a/quickwit/quickwit-codegen/example/src/authorization.rs +++ b/quickwit/quickwit-codegen/example/src/authorization.rs @@ -15,7 +15,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -use quickwit_authorize::{Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization}; +use quickwit_authorize::{ + Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization, +}; use crate::{GoodbyeRequest, HelloRequest, PingRequest}; @@ -38,9 +40,7 @@ impl Authorization for GoodbyeRequest { } impl StreamAuthorization for PingRequest { - fn attenuate( - auth_token: AuthorizationToken, - ) -> Result { + fn attenuate(auth_token: AuthorizationToken) -> Result { Ok(auth_token) } } diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 83170a8ec56..fc2d579e0f4 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -39,6 +39,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-metrics = { workspace = true } tokio-stream = { workspace = true } +tokio-inherit-task-local = { workspace = true } tonic = { workspace = true } tower = { workspace = true } tracing = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index dff26829584..a1712a4105e 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -213,6 +213,15 @@ pub fn num_cpus() -> usize { } } +pub fn spawn_inherit_task_local(future: F) -> tokio::task::JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + use tokio_inherit_task_local::FutureInheritTaskLocal; + tokio::task::spawn(future.inherit_task_local()) +} + // The following are helpers to build named tasks. // // Named tasks require the tokio feature `tracing` to be enabled. diff --git a/quickwit/quickwit-common/src/tower/one_task_per_call_layer.rs b/quickwit/quickwit-common/src/tower/one_task_per_call_layer.rs index caf7ca3cdec..a4bfa33825c 100644 --- a/quickwit/quickwit-common/src/tower/one_task_per_call_layer.rs +++ b/quickwit/quickwit-common/src/tower/one_task_per_call_layer.rs @@ -77,7 +77,7 @@ where fn call(&mut self, request: Request) -> Self::Future { let request_name: &'static str = Request::rpc_name(); let future = self.service.call(request); - let join_handle = tokio::spawn(future); + let join_handle = crate::spawn_inherit_task_local(future); UnwrapOrElseFuture { request_name, join_handle, diff --git a/quickwit/quickwit-ingest/src/authorize.rs b/quickwit/quickwit-ingest/src/authorize.rs index 57ad079867a..5e4470b9ee1 100644 --- a/quickwit/quickwit-ingest/src/authorize.rs +++ b/quickwit/quickwit-ingest/src/authorize.rs @@ -15,7 +15,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -use quickwit_authorize::::{Authorization, AuthorizationError, AuthorizationToken}; +use quickwit_authorize::{Authorization, AuthorizationError, AuthorizationToken}; use crate::{FetchRequest, IngestRequest, TailRequest}; diff --git a/quickwit/quickwit-proto/Cargo.toml b/quickwit/quickwit-proto/Cargo.toml index e76a7a539af..e6035ac7d55 100644 --- a/quickwit/quickwit-proto/Cargo.toml +++ b/quickwit/quickwit-proto/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +biscuit-auth = { workspace = true, optional = true } bytes = { workspace = true } bytesize = { workspace = true } bytestring = { workspace = true } @@ -53,4 +54,4 @@ quickwit-codegen = { workspace = true } [features] postgres = ["sea-query", "sqlx"] testsuite = ["mockall", "futures"] -enterprise = [ "quickwit-authorize/enterprise"] +enterprise = [ "quickwit-authorize/enterprise", "dep:biscuit-auth"] diff --git a/quickwit/quickwit-proto/src/authorization.rs b/quickwit/quickwit-proto/src/authorization.rs index 54882b7cfbc..edf0ac68b7b 100644 --- a/quickwit/quickwit-proto/src/authorization.rs +++ b/quickwit/quickwit-proto/src/authorization.rs @@ -1,8 +1,11 @@ use std::time::{Duration, SystemTime}; -use biscuit_auth::builder_ext::BuilderExt; -use biscuit_auth::macros::*; -use quickwit_authorize::::{Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization}; +pub use biscuit_auth; +pub use biscuit_auth::builder_ext::BuilderExt; +pub use biscuit_auth::macros::*; +use quickwit_authorize::{ + Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization, +}; use crate::cluster::FetchClusterStateRequest; use crate::control_plane::{AdviseResetShardsRequest, GetOrCreateOpenShardsRequest}; diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index b82db775761..f86ade32293 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -50,6 +50,7 @@ warp = { workspace = true } zstd = { workspace = true } quickwit-actors = { workspace = true } +quickwit-authorize = { workspace = true, features = ["enterprise"], optional = true } quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } @@ -97,4 +98,5 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] +enterprise = ["dep:quickwit-authorize"] testsuite = [] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 6a7a252a0cd..62a89fee7ab 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -429,10 +429,23 @@ pub async fn serve_quickwit( 100 }; // These layers apply to all the RPCs of the metastore. - let shared_layer = ServiceBuilder::new() + let shared_layer_builder = ServiceBuilder::new() .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) - .layer(LoadShedLayer::new(max_in_flight_requests)) - .into_inner(); + .layer(LoadShedLayer::new(max_in_flight_requests)); + + let shared_layer; + + #[cfg(feature = "enterprise")] + { + use quickwit_authorize::AuthorizationLayer; + shared_layer = shared_layer_builder.layer(AuthorizationLayer).into_inner(); + } + + #[cfg(not(feature = "enterprise"))] + { + shared_layer = shared_layer_builder.into_inner(); + } + let broker_layer = EventListenerLayer::new(event_broker.clone()); let metastore = MetastoreServiceClient::tower() .stack_layer(shared_layer) diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 3c83c2d84f1..2c79513e0f5 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -198,7 +198,7 @@ pub(crate) async fn start_rest_server( let compression_predicate = CompressionPredicate::from_env().and(NotForContentType::IMAGES); let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); - let service = ServiceBuilder::new() + let service_builder = ServiceBuilder::new() .layer( CompressionLayer::new() .zstd(true) @@ -206,8 +206,21 @@ pub(crate) async fn start_rest_server( .quality(tower_http::CompressionLevel::Fastest) .compress_when(compression_predicate), ) - .layer(cors) - .service(warp_service); + .layer(cors); + + let service; + + #[cfg(feature = "enterprise")] + { + service = service_builder + .layer(quickwit_authorize::AuthorizationTokenExtractionLayer) + .service(warp_service); + } + + #[cfg(not(feature = "enterprise"))] + { + service = service_builder.service(warp_service); + } let rest_listen_addr = tcp_listener.local_addr()?; info!(