From 7d8b256942f54890bf010bc0ce7a296ddfd715d5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 4 Dec 2024 19:48:55 +0800 Subject: [PATCH] refactor: replace LogHandler with PipelineHandler (#5096) * refactor: replace LogHandler with PipelineHandler Signed-off-by: Ruihang Xia * change method name Signed-off-by: Ruihang Xia * rename transform to insert Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/frontend/src/instance.rs | 6 +++--- src/frontend/src/instance/log_handler.rs | 10 +++------- src/servers/src/http.rs | 6 +++--- src/servers/src/http/event.rs | 10 +++++----- src/servers/src/query_handler.rs | 18 ++++++++++-------- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 6ffab3c1f619..ad387cc5dd96 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -64,8 +64,8 @@ use servers::prometheus_handler::PrometheusHandler; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, LogHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, - PromStoreProtocolHandler, ScriptHandler, + InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, + PipelineHandler, PromStoreProtocolHandler, ScriptHandler, }; use servers::server::ServerHandlers; use session::context::QueryContextRef; @@ -98,7 +98,7 @@ pub trait FrontendInstance: + OpenTelemetryProtocolHandler + ScriptHandler + PrometheusHandler - + LogHandler + + PipelineHandler + Send + Sync + 'static diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 441501b242c1..c3422066a387 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -24,19 +24,15 @@ use servers::error::{ AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; -use servers::query_handler::LogHandler; +use servers::query_handler::PipelineHandler; use session::context::QueryContextRef; use snafu::ResultExt; use crate::instance::Instance; #[async_trait] -impl LogHandler for Instance { - async fn insert_logs( - &self, - log: RowInsertRequests, - ctx: QueryContextRef, - ) -> ServerResult { +impl PipelineHandler for Instance { + async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult { self.plugins .get::() .as_ref() diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index e2d5fbce4754..5b64fbd283e1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -66,8 +66,8 @@ use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, LogHandlerRef, OpenTelemetryProtocolHandlerRef, - OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, + InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, + PipelineHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, }; use crate::server::Server; @@ -576,7 +576,7 @@ impl HttpServerBuilder { pub fn with_log_ingest_handler( self, - handler: LogHandlerRef, + handler: PipelineHandlerRef, validator: Option, ingest_interceptor: Option>, ) -> Self { diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 2f318d14a059..69498c209ab4 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -61,7 +61,7 @@ use crate::metrics::{ METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE, }; use crate::prom_store; -use crate::query_handler::LogHandlerRef; +use crate::query_handler::PipelineHandlerRef; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; @@ -502,7 +502,7 @@ pub async fn loki_ingest( }; let handler = log_state.log_handler; - let output = handler.insert_logs(ins_reqs, ctx).await; + let output = handler.insert(ins_reqs, ctx).await; if let Ok(Output { data: OutputData::AffectedRows(rows), @@ -599,7 +599,7 @@ fn extract_pipeline_value_by_content_type( } async fn ingest_logs_inner( - state: LogHandlerRef, + state: PipelineHandlerRef, pipeline_name: String, version: PipelineVersion, table_name: String, @@ -664,7 +664,7 @@ async fn ingest_logs_inner( let insert_requests = RowInsertRequests { inserts: vec![insert_request], }; - let output = state.insert_logs(insert_requests, query_ctx).await; + let output = state.insert(insert_requests, query_ctx).await; if let Ok(Output { data: OutputData::AffectedRows(rows), @@ -701,7 +701,7 @@ pub type LogValidatorRef = Arc; /// axum state struct to hold log handler and validator #[derive(Clone)] pub struct LogState { - pub log_handler: LogHandlerRef, + pub log_handler: PipelineHandlerRef, pub log_validator: Option, pub ingest_interceptor: Option>, } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index cee866c61b93..58812e9350bc 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -51,7 +51,7 @@ pub type InfluxdbLineProtocolHandlerRef = Arc; pub type OpenTelemetryProtocolHandlerRef = Arc; pub type ScriptHandlerRef = Arc; -pub type LogHandlerRef = Arc; +pub type PipelineHandlerRef = Arc; #[async_trait] pub trait ScriptHandler { @@ -107,7 +107,7 @@ pub trait PromStoreProtocolHandler { } #[async_trait] -pub trait OpenTelemetryProtocolHandler: LogHandler { +pub trait OpenTelemetryProtocolHandler: PipelineHandler { /// Handling opentelemetry metrics request async fn metrics( &self, @@ -132,14 +132,16 @@ pub trait OpenTelemetryProtocolHandler: LogHandler { ) -> Result; } -/// LogHandler is responsible for handling log related requests. +/// PipelineHandler is responsible for handling pipeline related requests. /// -/// It should be able to insert logs and manage pipelines. -/// The pipeline is a series of transformations that can be applied to logs. -/// The pipeline is stored in the database and can be retrieved by name. +/// The "Pipeline" is a series of transformations that can be applied to unstructured +/// data like logs. This handler is responsible to manage pipelines and accept data for +/// processing. +/// +/// The pipeline is stored in the database and can be retrieved by its name. #[async_trait] -pub trait LogHandler { - async fn insert_logs(&self, log: RowInsertRequests, ctx: QueryContextRef) -> Result; +pub trait PipelineHandler { + async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result; async fn get_pipeline( &self,