Skip to content

Commit

Permalink
feat: remove some redundent clone/conversion on constructing MergeSca…
Browse files Browse the repository at this point in the history
…n stream (#4632)

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Aug 28, 2024
1 parent 7449469 commit 64ae32d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion_physical_expr::EquivalenceProperties;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use greptime_proto::v1::QueryContext;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -185,24 +184,25 @@ impl MergeScanExec {
context: Arc<TaskContext>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
// prepare states to move
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = Self::arrow_schema_to_schema(self.schema())?;

let schema = self.schema.clone();
let query_ctx = self.query_ctx.clone();
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
let target_partition = self.target_partition;
let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let current_catalog = self.query_ctx.current_catalog().to_string();
let current_schema = self.query_ctx.current_schema().to_string();
let current_channel = self.query_ctx.channel();
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;

let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
let stream = Box::pin(stream!({
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
// only report metrics once for each MergeScan
if partition == 0 {
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
}

let _finish_timer = metric.finish_time().timer();
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
Expand All @@ -217,13 +217,7 @@ impl MergeScanExec {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
dbname: dbname.clone(),
query_context: Some(QueryContext {
current_catalog: current_catalog.clone(),
current_schema: current_schema.clone(),
timezone: timezone.clone(),
extensions: extensions.clone(),
channel: current_channel as u32,
}),
query_context: Some(query_ctx.as_ref().into()),
}),
region_id,
plan: plan.clone(),
Expand Down
6 changes: 6 additions & 0 deletions src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ impl From<QueryContext> for api::v1::QueryContext {
}
}

impl From<&QueryContext> for api::v1::QueryContext {
fn from(ctx: &QueryContext) -> Self {
ctx.clone().into()
}
}

impl QueryContext {
pub fn arc() -> QueryContextRef {
Arc::new(QueryContextBuilder::default().build())
Expand Down

0 comments on commit 64ae32d

Please sign in to comment.