diff --git a/Cargo.lock b/Cargo.lock index 77112f61ada5..76c62d1f3430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6179,7 +6179,7 @@ dependencies = [ [[package]] name = "meter-core" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=049171eb16cb4249d8099751a0c46750d1fe88e7#049171eb16cb4249d8099751a0c46750d1fe88e7" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" dependencies = [ "anymap", "once_cell", @@ -6189,7 +6189,7 @@ dependencies = [ [[package]] name = "meter-macros" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=049171eb16cb4249d8099751a0c46750d1fe88e7#049171eb16cb4249d8099751a0c46750d1fe88e7" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" dependencies = [ "meter-core", ] diff --git a/Cargo.toml b/Cargo.toml index 32a65dac6e01..d2fb1632cdbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" -meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "049171eb16cb4249d8099751a0c46750d1fe88e7" } +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" } mockall = "0.11.4" moka = "0.12" notify = "6.1" @@ -243,7 +243,7 @@ table = { path = "src/table" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" -rev = "049171eb16cb4249d8099751a0c46750d1fe88e7" +rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" [profile.release] debug = 1 diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 9381c1b7d13b..cb7a9d3f72f4 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -241,10 +241,14 @@ pub async fn log_ingester( TypedHeader(content_type): TypedHeader, payload: String, ) -> Result { - if let Some(log_validator) = log_state.log_validator { - if let Some(response) = log_validator.validate(query_params.source.clone(), &payload) { - return response; - } + // validate source and payload + let source = query_params.source.as_deref(); + let response = match &log_state.log_validator { + Some(validator) => validator.validate(source, &payload).await, + None => None, + }; + if let Some(response) = response { + return response; } let handler = log_state.log_handler; @@ -367,13 +371,14 @@ async fn ingest_logs_inner( Ok(response) } -pub trait LogValidator { +#[async_trait] +pub trait LogValidator: Send + Sync { /// validate payload by source before processing /// Return a `Some` result to indicate validation failure. - fn validate(&self, source: Option, payload: &str) -> Option>; + async fn validate(&self, source: Option<&str>, payload: &str) -> Option>; } -pub type LogValidatorRef = Arc; +pub type LogValidatorRef = Arc; /// axum state struct to hold log handler and validator #[derive(Clone)]