diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 52c7ffcc63c9..6d70377cf2aa 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -565,7 +565,7 @@ impl FlowWorkerManager { let default_interval = Duration::from_secs(1); let mut avg_spd = 0; // rows/sec let mut since_last_run = tokio::time::Instant::now(); - let run_per_trace = 5; + let run_per_trace = 10; let mut run_cnt = 0; loop { // TODO(discord9): only run when new inputs arrive or scheduled to diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 5376c466fbbe..1d0689c4032f 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -47,7 +47,6 @@ impl Context<'_, '_> { reduce_plan: &ReducePlan, output_type: &RelationType, ) -> Result, Error> { - common_telemetry::debug!("render reduce batch"); let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan { if !accum_plan.distinct_aggrs.is_empty() { NotImplementedSnafu { diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 95b99c9e208c..5dde62b43a69 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -240,11 +240,8 @@ impl Batch { dts.push(datatypes::prelude::ConcreteDataType::null_datatype()) } } - if self.batch.is_empty() { - other.batch.iter().map(|v| v.data_type()).collect_vec() - } else { - self.batch.iter().map(|v| v.data_type()).collect_vec() - } + + dts }; let batch_builders = dts