diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 6346efdab035..9ae782c7d4ab 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -25,7 +25,7 @@ use servers::error::{ }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; use table::Table; @@ -88,12 +88,13 @@ impl PipelineHandler for Instance { async fn get_table( &self, - catalog: &str, - schema: &str, table: &str, + query_ctx: &QueryContext, ) -> std::result::Result>, catalog::error::Error> { + let catalog = query_ctx.current_catalog(); + let schema = query_ctx.current_schema(); self.catalog_manager - .table(catalog, schema, table, None) + .table(catalog, &schema, table, None) .await } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 7e8a0e6e2981..5069db51975d 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -612,10 +612,8 @@ async fn ingest_logs_inner( let mut results = Vec::with_capacity(pipeline_data.len()); let transformed_data: Rows; if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - let catalog = query_ctx.current_catalog(); - let schema = query_ctx.current_schema(); let table = state - .get_table(catalog, &schema, &table_name) + .get_table(&table_name, &query_ctx) .await .context(CatalogSnafu)?; let rows = pipeline::identity_pipeline(pipeline_data, table) diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 8e841fa8af27..96a01593a8f1 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use crate::error::Result; use crate::influxdb::InfluxdbRequest; @@ -167,8 +167,7 @@ pub trait PipelineHandler { async fn get_table( &self, - catalog: &str, - schema: &str, table: &str, + query_ctx: &QueryContext, ) -> std::result::Result>, catalog::error::Error>; }