From ab83399cda23f32c1f0994795d9a1d6da285b6e0 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 19 Dec 2024 15:02:53 +0800 Subject: [PATCH] refactor: per review --- src/flow/src/adapter.rs | 2 +- src/flow/src/compute/render/reduce.rs | 1 - src/flow/src/expr.rs | 7 ++----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index ab2b6e3b8d64..962d75201ab5 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -533,7 +533,7 @@ impl FlowWorkerManager { let default_interval = Duration::from_secs(1); let mut avg_spd = 0; // rows/sec let mut since_last_run = tokio::time::Instant::now(); - let run_per_trace = 5; + let run_per_trace = 10; let mut run_cnt = 0; loop { // TODO(discord9): only run when new inputs arrive or scheduled to diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 5376c466fbbe..1d0689c4032f 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -47,7 +47,6 @@ impl Context<'_, '_> { reduce_plan: &ReducePlan, output_type: &RelationType, ) -> Result, Error> { - common_telemetry::debug!("render reduce batch"); let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan { if !accum_plan.distinct_aggrs.is_empty() { NotImplementedSnafu { diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 95b99c9e208c..5dde62b43a69 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -240,11 +240,8 @@ impl Batch { dts.push(datatypes::prelude::ConcreteDataType::null_datatype()) } } - if self.batch.is_empty() { - other.batch.iter().map(|v| v.data_type()).collect_vec() - } else { - self.batch.iter().map(|v| v.data_type()).collect_vec() - } + + dts }; let batch_builders = dts