Skip to content

Commit

Permalink
feat: extract hints from http header (#5128)
Browse files Browse the repository at this point in the history
* feat: extract hints from http header

* Update src/servers/src/http/hints.rs

Co-authored-by: shuiyisong <[email protected]>

* chore: by comment

* refactor: get instead of loop

---------

Co-authored-by: shuiyisong <[email protected]>
  • Loading branch information
fengjiachun and shuiyisong authored Dec 18, 2024
1 parent c6b7caa commit 9b4e855
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 55 deletions.
57 changes: 3 additions & 54 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{cancellation, TonicResult};

pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint-";
use crate::hint_headers;

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -45,7 +43,7 @@ impl GreptimeDatabase for DatabaseService {
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let remote_addr = request.remote_addr();
let hints = extract_hints(request.metadata());
let hints = hint_headers::extract_hints(request.metadata());
debug!(
"GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
remote_addr, hints
Expand Down Expand Up @@ -91,7 +89,7 @@ impl GreptimeDatabase for DatabaseService {
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let remote_addr = request.remote_addr();
let hints = extract_hints(request.metadata());
let hints = hint_headers::extract_hints(request.metadata());
debug!(
"GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
remote_addr, hints
Expand Down Expand Up @@ -142,52 +140,3 @@ impl GreptimeDatabase for DatabaseService {
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}

fn extract_hints(metadata: &MetadataMap) -> Vec<(String, String)> {
metadata
.iter()
.filter_map(|kv| {
let KeyAndValueRef::Ascii(key, value) = kv else {
return None;
};
let key = key.as_str();
let new_key = key.strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX)?;
let Ok(value) = value.to_str() else {
// Simply return None for non-string values.
return None;
};
Some((new_key.to_string(), value.trim().to_string()))
})
.collect()
}

#[cfg(test)]
mod tests {
use tonic::metadata::MetadataValue;

use super::*;

#[test]
fn test_extract_hints() {
let mut metadata = MetadataMap::new();
let prev = metadata.insert(
"x-greptime-hint-append_mode",
MetadataValue::from_static("true"),
);
metadata.insert("test-key", MetadataValue::from_static("test-value"));
assert!(prev.is_none());
let hints = extract_hints(&metadata);
assert_eq!(hints, vec![("append_mode".to_string(), "true".to_string())]);
}

#[test]
fn extract_hints_ignores_non_ascii_metadata() {
let mut metadata = MetadataMap::new();
metadata.insert_bin(
"x-greptime-hint-merge_mode-bin",
MetadataValue::from_bytes(b"last_non_null"),
);
let hints = extract_hints(&metadata);
assert!(hints.is_empty());
}
}
170 changes: 170 additions & 0 deletions src/servers/src/hint_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use http::HeaderMap;
use tonic::metadata::MetadataMap;

pub const HINT_KEYS: [&str; 5] = [
"x-greptime-hint-auto_create_table",
"x-greptime-hint-ttl",
"x-greptime-hint-append_mode",
"x-greptime-hint-merge_mode",
"x-greptime-hint-physical_table",
];

pub(crate) fn extract_hints<T: ToHeaderMap>(headers: &T) -> Vec<(String, String)> {
let mut hints = Vec::new();
for key in HINT_KEYS.iter() {
if let Some(value) = headers.get(key) {
let new_key = key.replace("x-greptime-hint-", "");
hints.push((new_key, value.trim().to_string()));
}
}
hints
}

pub(crate) trait ToHeaderMap {
fn get(&self, key: &str) -> Option<&str>;
}

impl ToHeaderMap for MetadataMap {
fn get(&self, key: &str) -> Option<&str> {
self.get(key).and_then(|v| v.to_str().ok())
}
}

impl ToHeaderMap for HeaderMap {
fn get(&self, key: &str) -> Option<&str> {
self.get(key).and_then(|v| v.to_str().ok())
}
}
#[cfg(test)]
mod tests {
use http::header::{HeaderMap, HeaderValue};
use tonic::metadata::{MetadataMap, MetadataValue};

use super::*;

#[test]
fn test_extract_hints_with_full_header_map() {
let mut headers = HeaderMap::new();
headers.insert(
"x-greptime-hint-auto_create_table",
HeaderValue::from_static("true"),
);
headers.insert("x-greptime-hint-ttl", HeaderValue::from_static("3600d"));
headers.insert(
"x-greptime-hint-append_mode",
HeaderValue::from_static("true"),
);
headers.insert(
"x-greptime-hint-merge_mode",
HeaderValue::from_static("false"),
);
headers.insert(
"x-greptime-hint-physical_table",
HeaderValue::from_static("table1"),
);

let hints = extract_hints(&headers);

assert_eq!(hints.len(), 5);
assert_eq!(
hints[0],
("auto_create_table".to_string(), "true".to_string())
);
assert_eq!(hints[1], ("ttl".to_string(), "3600d".to_string()));
assert_eq!(hints[2], ("append_mode".to_string(), "true".to_string()));
assert_eq!(hints[3], ("merge_mode".to_string(), "false".to_string()));
assert_eq!(
hints[4],
("physical_table".to_string(), "table1".to_string())
);
}

#[test]
fn test_extract_hints_with_missing_keys() {
let mut headers = HeaderMap::new();
headers.insert(
"x-greptime-hint-auto_create_table",
HeaderValue::from_static("true"),
);
headers.insert("x-greptime-hint-ttl", HeaderValue::from_static("3600d"));

let hints = extract_hints(&headers);

assert_eq!(hints.len(), 2);
assert_eq!(
hints[0],
("auto_create_table".to_string(), "true".to_string())
);
assert_eq!(hints[1], ("ttl".to_string(), "3600d".to_string()));
}

#[test]
fn test_extract_hints_with_metadata_map() {
let mut metadata = MetadataMap::new();
metadata.insert(
"x-greptime-hint-auto_create_table",
MetadataValue::from_static("true"),
);
metadata.insert("x-greptime-hint-ttl", MetadataValue::from_static("3600d"));
metadata.insert(
"x-greptime-hint-append_mode",
MetadataValue::from_static("true"),
);
metadata.insert(
"x-greptime-hint-merge_mode",
MetadataValue::from_static("false"),
);
metadata.insert(
"x-greptime-hint-physical_table",
MetadataValue::from_static("table1"),
);

let hints = extract_hints(&metadata);

assert_eq!(hints.len(), 5);
assert_eq!(
hints[0],
("auto_create_table".to_string(), "true".to_string())
);
assert_eq!(hints[1], ("ttl".to_string(), "3600d".to_string()));
assert_eq!(hints[2], ("append_mode".to_string(), "true".to_string()));
assert_eq!(hints[3], ("merge_mode".to_string(), "false".to_string()));
assert_eq!(
hints[4],
("physical_table".to_string(), "table1".to_string())
);
}

#[test]
fn test_extract_hints_with_partial_metadata_map() {
let mut metadata = MetadataMap::new();
metadata.insert(
"x-greptime-hint-auto_create_table",
MetadataValue::from_static("true"),
);
metadata.insert("x-greptime-hint-ttl", MetadataValue::from_static("3600d"));

let hints = extract_hints(&metadata);

assert_eq!(hints.len(), 2);
assert_eq!(
hints[0],
("auto_create_table".to_string(), "true".to_string())
);
assert_eq!(hints[1], ("ttl".to_string(), "3600d".to_string()));
}
}
4 changes: 3 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ mod timeout;

pub(crate) use timeout::DynamicTimeoutLayer;

mod hints;
#[cfg(any(test, feature = "testing"))]
pub mod test_helpers;

Expand Down Expand Up @@ -703,7 +704,8 @@ impl HttpServer {
.layer(middleware::from_fn_with_state(
AuthState::new(self.user_provider.clone()),
authorize::check_http_auth,
)),
))
.layer(middleware::from_fn(hints::extract_hints)),
)
// Handlers for debug, we don't expect a timeout.
.nest(
Expand Down
30 changes: 30 additions & 0 deletions src/servers/src/http/hints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use axum::http::Request;
use axum::middleware::Next;
use axum::response::Response;
use session::context::QueryContext;

use crate::hint_headers;

pub async fn extract_hints<B>(mut request: Request<B>, next: Next<B>) -> Response {
let hints = hint_headers::extract_hints(request.headers());
if let Some(query_ctx) = request.extensions_mut().get_mut::<QueryContext>() {
for (key, value) in hints {
query_ctx.set_extension(key, value);
}
}
next.run(request).await
}
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod error;
pub mod export_metrics;
pub mod grpc;
pub mod heartbeat_options;
mod hint_headers;
pub mod http;
pub mod influxdb;
pub mod interceptor;
Expand Down

0 comments on commit 9b4e855

Please sign in to comment.