Skip to content

Commit

Permalink
chore: change pipeline get_table function signature
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 12, 2024
1 parent 074079f commit 58a9a15
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 10 deletions.
9 changes: 5 additions & 4 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use servers::error::{
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use table::Table;

Expand Down Expand Up @@ -88,12 +88,13 @@ impl PipelineHandler for Instance {

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
self.catalog_manager
.table(catalog, schema, table, None)
.table(catalog, &schema, table, None)
.await
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,8 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let table = state
.get_table(catalog, &schema, &table_name)
.get_table(&table_name, &query_ctx)
.await
.context(CatalogSnafu)?;
let rows = pipeline::identity_pipeline(pipeline_data, table)
Expand Down
5 changes: 2 additions & 3 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
use serde_json::Value;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};

use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
Expand Down Expand Up @@ -167,8 +167,7 @@ pub trait PipelineHandler {

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}

0 comments on commit 58a9a15

Please sign in to comment.