Skip to content

Commit

Permalink
blop
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 5, 2024
1 parent 94b575a commit 42f4501
Show file tree
Hide file tree
Showing 19 changed files with 209 additions and 28 deletions.
34 changes: 34 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ tikv-jemalloc-ctl = "0.5"
tikv-jemallocator = "0.5"
time = { version = "0.3", features = ["std", "formatting", "macros"] }
tokio = { version = "1.40", features = ["full"] }
tokio-inherit-task-local = "0.2"
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-authorize/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tower = { workspace = true}
biscuit-auth = { workspace = true, optional=true }
futures = { workspace = true }
http = { workspace = true }
tokio-inherit-task-local = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::task::{Context, Poll};

Expand All @@ -7,6 +26,7 @@ use tower::{Layer, Service};

use crate::AuthorizationError;

#[derive(Clone, Copy, Debug)]
pub struct AuthorizationLayer;

impl<S: Clone> Layer<S> for AuthorizationLayer {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::task::{Context, Poll};

use futures::future::Either;
use http::Request;
use tokio::task::futures::TaskLocalFuture;
use tokio_inherit_task_local::TaskLocalInheritableTable;
use tower::{Layer, Service};

use super::AuthorizationToken;

#[derive(Clone, Copy, Debug)]
pub struct AuthorizationTokenExtractionLayer;

impl<S: Clone> Layer<S> for AuthorizationTokenExtractionLayer {
type Service = AuthorizationTokenExtractionService<S>;

fn layer(&self, service: S) -> Self::Service {
AuthorizationTokenExtractionService { service }
}
}

#[derive(Clone)]
pub struct AuthorizationTokenExtractionService<S> {
service: S,
}

fn get_authorization_token_opt(headers: &http::HeaderMap) -> Option<AuthorizationToken> {
let authorization_header_value = headers.get("Authorization")?;
let authorization_header_str = authorization_header_value.to_str().ok()?;
crate::get_auth_token_from_str(authorization_header_str).ok()
}

impl<B, S> Service<Request<B>> for AuthorizationTokenExtractionService<S>
where S: Service<Request<B>>
{
type Response = S::Response;
type Error = S::Error;
type Future = Either<S::Future, TaskLocalFuture<TaskLocalInheritableTable, S::Future>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: Request<B>) -> Self::Future {
let authorization_token_opt = get_authorization_token_opt(request.headers());
let fut = self.service.call(request);
if let Some(authorization_token) = authorization_token_opt {
Either::Right(crate::execute_with_authorization(authorization_token, fut))
} else {
Either::Left(fut)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
// components are licensed under the original license provided by the owner of the
// applicable component.

mod authorization_layer;
mod authorization_token_extraction_layer;

use std::future::Future;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

pub use authorization_layer::AuthorizationLayer;
pub use authorization_token_extraction_layer::AuthorizationTokenExtractionLayer;
use biscuit_auth::macros::authorizer;
use biscuit_auth::{Authorizer, Biscuit, RootKeyProvider};
use tokio::task::futures::TaskLocalFuture;
use tokio_inherit_task_local::TaskLocalInheritableTable;

use crate::AuthorizationError;

Expand Down Expand Up @@ -79,7 +86,7 @@ impl FromStr for AuthorizationToken {
}
}

tokio::task_local! {
tokio_inherit_task_local::inheritable_task_local! {
pub static AUTHORIZATION_TOKEN: AuthorizationToken;
}

Expand Down Expand Up @@ -146,6 +153,16 @@ impl From<biscuit_auth::error::Token> for AuthorizationError {
}
}

pub fn get_auth_token_from_str(
authorization_header_value: &str,
) -> Result<AuthorizationToken, AuthorizationError> {
let authorization_token_str: &str = authorization_header_value
.strip_prefix(AUTHORIZATION_VALUE_PREFIX)
.ok_or(AuthorizationError::InvalidToken)?;
let biscuit: Biscuit = Biscuit::from_base64(authorization_token_str, get_root_key_provider())?;
Ok(AuthorizationToken(biscuit))
}

pub fn get_auth_token(
req_metadata: &tonic::metadata::MetadataMap,
) -> Result<AuthorizationToken, AuthorizationError> {
Expand All @@ -154,11 +171,7 @@ pub fn get_auth_token(
.ok_or(AuthorizationError::AuthorizationTokenMissing)?
.to_str()
.map_err(|_| AuthorizationError::InvalidToken)?;
let authorization_token_str: &str = authorization_header_value
.strip_prefix(AUTHORIZATION_VALUE_PREFIX)
.ok_or(AuthorizationError::InvalidToken)?;
let biscuit: Biscuit = Biscuit::from_base64(authorization_token_str, get_root_key_provider())?;
Ok(AuthorizationToken(biscuit))
get_auth_token_from_str(authorization_header_value)
}

pub fn set_auth_token(
Expand Down Expand Up @@ -224,7 +237,7 @@ pub fn authorize_request<R: Authorization>(req: &R) -> Result<(), AuthorizationE
pub fn execute_with_authorization<F, O>(
token: AuthorizationToken,
f: F,
) -> impl Future<Output = O>
) -> TaskLocalFuture<TaskLocalInheritableTable, F>
where
F: Future<Output = O>,
{
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-authorize/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod authorization_layer;

#[cfg(not(feature = "enterprise"))]
#[path = "community.rs"]
#[path = "community/mod.rs"]
mod implementation;

#[cfg(feature = "enterprise")]
#[path = "enterprise.rs"]
#[path = "enterprise/mod.rs"]
mod implementation;

pub use implementation::*;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }

[features]
enterprise = ["quickwit-config/enterprise", "quickwit-ingest/enterprise", "quickwit-proto/enterprise"]
enterprise = ["quickwit-config/enterprise", "quickwit-ingest/enterprise", "quickwit-proto/enterprise", "quickwit-serve/enterprise"]
jemalloc = ["dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator"]
ci-test = []
pprof = ["quickwit-serve/pprof"]
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-codegen/example/src/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

use quickwit_authorize::{Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization};
use quickwit_authorize::{
Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization,
};

use crate::{GoodbyeRequest, HelloRequest, PingRequest};

Expand All @@ -38,9 +40,7 @@ impl Authorization for GoodbyeRequest {
}

impl StreamAuthorization for PingRequest {
fn attenuate(
auth_token: AuthorizationToken,
) -> Result<AuthorizationToken, AuthorizationError> {
fn attenuate(auth_token: AuthorizationToken) -> Result<AuthorizationToken, AuthorizationError> {
Ok(auth_token)
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ thiserror = { workspace = true }
tokio = { workspace = true }
tokio-metrics = { workspace = true }
tokio-stream = { workspace = true }
tokio-inherit-task-local = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ pub fn num_cpus() -> usize {
}
}

pub fn spawn_inherit_task_local<F>(future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
use tokio_inherit_task_local::FutureInheritTaskLocal;
tokio::task::spawn(future.inherit_task_local())
}

// The following are helpers to build named tasks.
//
// Named tasks require the tokio feature `tracing` to be enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
fn call(&mut self, request: Request) -> Self::Future {
let request_name: &'static str = Request::rpc_name();
let future = self.service.call(request);
let join_handle = tokio::spawn(future);
let join_handle = crate::spawn_inherit_task_local(future);
UnwrapOrElseFuture {
request_name,
join_handle,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

use quickwit_authorize::::{Authorization, AuthorizationError, AuthorizationToken};
use quickwit_authorize::{Authorization, AuthorizationError, AuthorizationToken};

use crate::{FetchRequest, IngestRequest, TailRequest};

Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
biscuit-auth = { workspace = true, optional = true }
bytes = { workspace = true }
bytesize = { workspace = true }
bytestring = { workspace = true }
Expand Down Expand Up @@ -53,4 +54,4 @@ quickwit-codegen = { workspace = true }
[features]
postgres = ["sea-query", "sqlx"]
testsuite = ["mockall", "futures"]
enterprise = [ "quickwit-authorize/enterprise"]
enterprise = [ "quickwit-authorize/enterprise", "dep:biscuit-auth"]
9 changes: 6 additions & 3 deletions quickwit/quickwit-proto/src/authorization.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::{Duration, SystemTime};

use biscuit_auth::builder_ext::BuilderExt;
use biscuit_auth::macros::*;
use quickwit_authorize::::{Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization};
pub use biscuit_auth;
pub use biscuit_auth::builder_ext::BuilderExt;
pub use biscuit_auth::macros::*;
use quickwit_authorize::{
Authorization, AuthorizationError, AuthorizationToken, StreamAuthorization,
};

use crate::cluster::FetchClusterStateRequest;
use crate::control_plane::{AdviseResetShardsRequest, GetOrCreateOpenShardsRequest};
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ warp = { workspace = true }
zstd = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-authorize = { workspace = true, features = ["enterprise"], optional = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
Expand Down Expand Up @@ -97,4 +98,5 @@ quickwit-storage = { workspace = true, features = ["testsuite"] }
pprof = [
"dep:pprof"
]
enterprise = ["dep:quickwit-authorize"]
testsuite = []
Loading

0 comments on commit 42f4501

Please sign in to comment.