diff --git a/Cargo.lock b/Cargo.lock index 387706a3fbcb6..6f18a4607ad36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2724,9 +2724,9 @@ dependencies = [ [[package]] name = "const-str" -version = "0.5.6" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" +checksum = "671927c085eb5827d30b95df08f6c6a2301eafe2274c368bb2c16f42e03547eb" [[package]] name = "constant_time_eq" diff --git a/README.md b/README.md index 96bc0f35c7ddf..8717e7725e920 100644 --- a/README.md +++ b/README.md @@ -9,25 +9,18 @@
-### 🌊 Reimagine real-time data engineering. +### 🌊 Ride the Wave of Real-Time Data.
- -

   ðŸ“š  - Documentation   ðŸš€  - - Slack Community - +

+ Docs | Benchmarks | Demos

+ +

+

Slack @@ -46,11 +39,11 @@
-RisingWave is a Postgres-compatible SQL database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data. +RisingWave is the world's most advanced streaming database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data. It provides both a Postgres-compatible [SQL interface](https://docs.risingwave.com/sql/overview) and a DataFrame-style [Python interface](https://docs.risingwave.com/python-sdk/intro). -RisingWave can ingest millions of events per second, continuously join and analyze live data streams with historical tables, serve ad-hoc queries in real-time, and deliver fresh, consistent results wherever needed. +RisingWave can ingest millions of events per second, continuously join and analyze live data streams with historical tables, serve ad-hoc queries at low latency, and deliver fresh, consistent results wherever needed. -![RisingWave](./docs/dev/src/images/architecture_20240908.png) +![RisingWave](./docs/dev/src/images/architecture_20250127.png) ## Try it out in 60 seconds diff --git a/docs/dev/src/images/architecture_20250127.png b/docs/dev/src/images/architecture_20250127.png new file mode 100644 index 0000000000000..703a38e83f356 Binary files /dev/null and b/docs/dev/src/images/architecture_20250127.png differ diff --git a/e2e_test/batch/join/asof_join.slt b/e2e_test/batch/join/asof_join.slt new file mode 100644 index 0000000000000..bf905b661e107 --- /dev/null +++ b/e2e_test/batch/join/asof_join.slt @@ -0,0 +1,43 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t1 (v1 int, v2 int, v3 int primary key); + +statement ok +create table t2 (v1 int, v2 int, v3 int primary key); + +statement ok +insert into t1 values (1, 2, 3), (2, 3, 4), (1, 2, 9); + +statement ok +insert into t2 values (1, NULL, 8), (1, 3, 4), (1, 2, 5), (1, 2, 6); + +# asof inner join +query IIIIII +SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 order by t1.v1, t1.v3; +---- +1 2 3 1 3 4 +1 2 9 1 3 4 + +# asof left join +query IIIIII +SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 order by t1.v1, t1.v3; +---- +1 2 3 1 3 4 +1 2 9 1 3 4 +2 3 4 NULL NULL NULL + +# asof left join +query IIIIII +SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 > t2.v2 order by t1.v1, t1.v3; +---- +1 2 3 NULL NULL NULL +1 2 9 NULL NULL NULL +2 3 4 NULL NULL NULL + +statement ok +drop table t1; + +statement ok +drop table t2; \ No newline at end of file diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 7ffdf94e3c30a..6e07ceae4d5d4 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -289,6 +289,7 @@ message HashJoinNode { // Null safe means it treats `null = null` as true. // Each key pair can be null safe independently. (left_key, right_key, null_safe) repeated bool null_safe = 6; + optional plan_common.AsOfJoinDesc asof_desc = 7; } message SortMergeJoinNode { diff --git a/src/batch/executors/benches/hash_join.rs b/src/batch/executors/benches/hash_join.rs index 330fc299594d0..6d64461dd1c2d 100644 --- a/src/batch/executors/benches/hash_join.rs +++ b/src/batch/executors/benches/hash_join.rs @@ -76,6 +76,7 @@ fn create_hash_join_executor( "HashJoinExecutor".into(), CHUNK_SIZE, None, + None, BatchSpillMetrics::for_test(), ShutdownToken::empty(), MemoryContext::none(), diff --git a/src/batch/executors/src/executor/join/hash_join.rs b/src/batch/executors/src/executor/join/hash_join.rs index af89b3d1503a9..44518e5155496 100644 --- a/src/batch/executors/src/executor/join/hash_join.rs +++ b/src/batch/executors/src/executor/join/hash_join.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::iter; use std::iter::empty; use std::marker::PhantomData; @@ -25,8 +26,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap}; use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher}; use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc}; -use risingwave_common::row::{repeat_n, RowExt}; -use risingwave_common::types::{DataType, Datum}; +use risingwave_common::row::{repeat_n, Row, RowExt}; +use risingwave_common::types::{DataType, Datum, DefaultOrd}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common_estimate_size::EstimateSize; @@ -35,7 +36,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::data::DataChunk as PbDataChunk; use risingwave_pb::Message; -use super::{ChunkedData, JoinType, RowId}; +use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId}; use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, @@ -83,6 +84,8 @@ pub struct HashJoinExecutor { null_matched: Vec, identity: String, chunk_size: usize, + /// Whether the join is an as-of join + asof_desc: Option, spill_backend: Option, spill_metrics: Arc, @@ -179,6 +182,7 @@ pub struct EquiJoinParams { next_build_row_with_same_key: ChunkedData>, chunk_size: usize, shutdown_rx: ShutdownToken, + asof_desc: Option, } impl EquiJoinParams { @@ -194,6 +198,7 @@ impl EquiJoinParams { next_build_row_with_same_key: ChunkedData>, chunk_size: usize, shutdown_rx: ShutdownToken, + asof_desc: Option, ) -> Self { Self { probe_side, @@ -206,6 +211,7 @@ impl EquiJoinParams { next_build_row_with_same_key, chunk_size, shutdown_rx, + asof_desc, } } } @@ -648,6 +654,7 @@ impl HashJoinExecutor { self.cond.clone(), format!("{}-sub{}", self.identity.clone(), i), self.chunk_size, + self.asof_desc.clone(), self.spill_backend.clone(), self.spill_metrics.clone(), Some(partition_size), @@ -683,9 +690,12 @@ impl HashJoinExecutor { next_build_row_with_same_key, self.chunk_size, self.shutdown_rx.clone(), + self.asof_desc, ); - if let Some(cond) = self.cond.as_ref() { + if let Some(cond) = self.cond.as_ref() + && params.asof_desc.is_none() + { let stream = match self.join_type { JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond), JoinType::LeftOuter => { @@ -709,6 +719,9 @@ impl HashJoinExecutor { JoinType::FullOuter => { Self::do_full_outer_join_with_non_equi_condition(params, cond) } + JoinType::AsOfInner | JoinType::AsOfLeftOuter => { + unreachable!("AsOf join should not reach here") + } }; // For non-equi join, we need an output chunk builder to align the output chunks. let mut output_chunk_builder = @@ -726,8 +739,10 @@ impl HashJoinExecutor { } } else { let stream = match self.join_type { - JoinType::Inner => Self::do_inner_join(params), - JoinType::LeftOuter => Self::do_left_outer_join(params), + JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params), + JoinType::LeftOuter | JoinType::AsOfLeftOuter => { + Self::do_left_outer_join(params) + } JoinType::LeftSemi => Self::do_left_semi_anti_join::(params), JoinType::LeftAnti => Self::do_left_semi_anti_join::(params), JoinType::RightOuter => Self::do_right_outer_join(params), @@ -754,6 +769,7 @@ impl HashJoinExecutor { next_build_row_with_same_key, chunk_size, shutdown_rx, + asof_desc, .. }: EquiJoinParams, ) { @@ -767,19 +783,39 @@ impl HashJoinExecutor { .enumerate() .filter_by_bitmap(probe_chunk.visibility()) { - for build_row_id in - next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()) - { - shutdown_rx.check()?; - let build_chunk = &build_side[build_row_id.chunk_id()]; - if let Some(spilled) = Self::append_one_row( - &mut chunk_builder, - &probe_chunk, - probe_row_id, - build_chunk, - build_row_id.row_id(), + let build_side_row_iter = + next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied()); + if let Some(asof_desc) = &asof_desc { + if let Some(build_row_id) = Self::find_asof_matched_rows( + probe_chunk.row_at_unchecked_vis(probe_row_id), + &build_side, + build_side_row_iter, + asof_desc, ) { - yield spilled + shutdown_rx.check()?; + if let Some(spilled) = Self::append_one_row( + &mut chunk_builder, + &probe_chunk, + probe_row_id, + &build_side[build_row_id.chunk_id()], + build_row_id.row_id(), + ) { + yield spilled + } + } + } else { + for build_row_id in build_side_row_iter { + shutdown_rx.check()?; + let build_chunk = &build_side[build_row_id.chunk_id()]; + if let Some(spilled) = Self::append_one_row( + &mut chunk_builder, + &probe_chunk, + probe_row_id, + build_chunk, + build_row_id.row_id(), + ) { + yield spilled + } } } } @@ -814,6 +850,7 @@ impl HashJoinExecutor { next_build_row_with_same_key, chunk_size, shutdown_rx, + asof_desc, .. }: EquiJoinParams, ) { @@ -828,19 +865,49 @@ impl HashJoinExecutor { .filter_by_bitmap(probe_chunk.visibility()) { if let Some(first_matched_build_row_id) = hash_map.get(probe_key) { - for build_row_id in - next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)) - { - shutdown_rx.check()?; - let build_chunk = &build_side[build_row_id.chunk_id()]; - if let Some(spilled) = Self::append_one_row( - &mut chunk_builder, - &probe_chunk, - probe_row_id, - build_chunk, - build_row_id.row_id(), + let build_side_row_iter = + next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id)); + if let Some(asof_desc) = &asof_desc { + if let Some(build_row_id) = Self::find_asof_matched_rows( + probe_chunk.row_at_unchecked_vis(probe_row_id), + &build_side, + build_side_row_iter, + asof_desc, ) { - yield spilled + shutdown_rx.check()?; + if let Some(spilled) = Self::append_one_row( + &mut chunk_builder, + &probe_chunk, + probe_row_id, + &build_side[build_row_id.chunk_id()], + build_row_id.row_id(), + ) { + yield spilled + } + } else { + shutdown_rx.check()?; + let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id); + if let Some(spilled) = Self::append_one_row_with_null_build_side( + &mut chunk_builder, + probe_row, + build_data_types.len(), + ) { + yield spilled + } + } + } else { + for build_row_id in build_side_row_iter { + shutdown_rx.check()?; + let build_chunk = &build_side[build_row_id.chunk_id()]; + if let Some(spilled) = Self::append_one_row( + &mut chunk_builder, + &probe_chunk, + probe_row_id, + build_chunk, + build_row_id.row_id(), + ) { + yield spilled + } } } } else { @@ -1916,6 +1983,64 @@ impl HashJoinExecutor { ) -> Option { chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref)) } + + fn find_asof_matched_rows( + probe_row_ref: RowRef<'_>, + build_side: &[DataChunk], + build_side_row_iter: RowIdIter<'_>, + asof_join_condition: &AsOfDesc, + ) -> Option { + let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx); + if let Some(probe_inequality_scalar) = probe_inequality_value { + let mut result_row_id: Option = None; + let mut build_row_ref; + + for build_row_id in build_side_row_iter { + build_row_ref = + build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id()); + let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx); + if let Some(build_inequality_scalar) = build_inequality_value { + let mut pick_result = |compare: fn(Ordering) -> bool| { + if let Some(result_row_id_inner) = result_row_id { + let result_row_ref = build_side[result_row_id_inner.chunk_id()] + .row_at_unchecked_vis(result_row_id_inner.row_id()); + let result_inequality_scalar = result_row_ref + .datum_at(asof_join_condition.right_idx) + .unwrap(); + if compare( + probe_inequality_scalar.default_cmp(&build_inequality_scalar), + ) && compare( + probe_inequality_scalar.default_cmp(&result_inequality_scalar), + ) { + result_row_id = Some(build_row_id); + } + } else if compare( + probe_inequality_scalar.default_cmp(&build_inequality_scalar), + ) { + result_row_id = Some(build_row_id); + } + }; + match asof_join_condition.inequality_type { + AsOfInequalityType::Lt => { + pick_result(Ordering::is_lt); + } + AsOfInequalityType::Le => { + pick_result(Ordering::is_le); + } + AsOfInequalityType::Gt => { + pick_result(Ordering::is_gt); + } + AsOfInequalityType::Ge => { + pick_result(Ordering::is_ge); + } + } + } + } + result_row_id + } else { + None + } + } } /// `DataChunkMutator` transforms the given data chunk for non-equi join. @@ -2195,6 +2320,11 @@ impl BoxedExecutorBuilder for HashJoinExecutor<()> { let identity = context.plan_node().get_identity().clone(); + let asof_desc = hash_join_node + .asof_desc + .map(|desc| AsOfDesc::from_protobuf(&desc)) + .transpose()?; + Ok(HashJoinExecutorArgs { join_type, output_indices, @@ -2207,6 +2337,7 @@ impl BoxedExecutorBuilder for HashJoinExecutor<()> { identity: identity.clone(), right_key_types, chunk_size: context.context().get_config().developer.chunk_size, + asof_desc, spill_backend: if context.context().get_config().enable_spill { Some(Disk) } else { @@ -2232,6 +2363,7 @@ struct HashJoinExecutorArgs { identity: String, right_key_types: Vec, chunk_size: usize, + asof_desc: Option, spill_backend: Option, spill_metrics: Arc, shutdown_rx: ShutdownToken, @@ -2253,6 +2385,7 @@ impl HashKeyDispatcher for HashJoinExecutorArgs { self.cond.map(Arc::new), self.identity, self.chunk_size, + self.asof_desc, self.spill_backend, self.spill_metrics, self.shutdown_rx, @@ -2278,6 +2411,7 @@ impl HashJoinExecutor { cond: Option>, identity: String, chunk_size: usize, + asof_desc: Option, spill_backend: Option, spill_metrics: Arc, shutdown_rx: ShutdownToken, @@ -2294,6 +2428,7 @@ impl HashJoinExecutor { cond, identity, chunk_size, + asof_desc, spill_backend, spill_metrics, None, @@ -2314,6 +2449,7 @@ impl HashJoinExecutor { cond: Option>, identity: String, chunk_size: usize, + asof_desc: Option, spill_backend: Option, spill_metrics: Arc, memory_upper_bound: Option, @@ -2352,6 +2488,7 @@ impl HashJoinExecutor { cond, identity, chunk_size, + asof_desc, shutdown_rx, spill_backend, spill_metrics, @@ -2632,6 +2769,7 @@ mod tests { cond, "HashJoinExecutor".to_owned(), chunk_size, + None, if test_spill { Some(SpillBackend::Memory) } else { diff --git a/src/batch/executors/src/executor/join/lookup_join_base.rs b/src/batch/executors/src/executor/join/lookup_join_base.rs index d3f806e01c547..3a8466cbad26e 100644 --- a/src/batch/executors/src/executor/join/lookup_join_base.rs +++ b/src/batch/executors/src/executor/join/lookup_join_base.rs @@ -178,6 +178,7 @@ impl LookupJoinBase { next_build_row_with_same_key, self.chunk_size, self.shutdown_rx.clone(), + None, ); if let Some(cond) = self.condition.as_ref() { @@ -197,7 +198,9 @@ impl LookupJoinBase { JoinType::RightOuter | JoinType::RightSemi | JoinType::RightAnti - | JoinType::FullOuter => unimplemented!(), + | JoinType::FullOuter + | JoinType::AsOfInner + | JoinType::AsOfLeftOuter => unimplemented!(), }; // For non-equi join, we need an output chunk builder to align the output chunks. let mut output_chunk_builder = @@ -222,7 +225,9 @@ impl LookupJoinBase { JoinType::RightOuter | JoinType::RightSemi | JoinType::RightAnti - | JoinType::FullOuter => unimplemented!(), + | JoinType::FullOuter + | JoinType::AsOfInner + | JoinType::AsOfLeftOuter => unimplemented!(), }; #[for_await] for chunk in stream { diff --git a/src/batch/executors/src/executor/join/mod.rs b/src/batch/executors/src/executor/join/mod.rs index 6c404a7cf5b6b..571babc4ca1fb 100644 --- a/src/batch/executors/src/executor/join/mod.rs +++ b/src/batch/executors/src/executor/join/mod.rs @@ -30,7 +30,7 @@ use risingwave_common::array::{DataChunk, RowRef}; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_pb::plan_common::JoinType as PbJoinType; +use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinInequalityType, JoinType as PbJoinType}; use crate::error::Result; @@ -49,6 +49,8 @@ pub enum JoinType { /// Anti join when build side should output when matched RightAnti, FullOuter, + AsOfInner, + AsOfLeftOuter, } impl JoinType { @@ -62,7 +64,9 @@ impl JoinType { PbJoinType::RightSemi => JoinType::RightSemi, PbJoinType::RightAnti => JoinType::RightAnti, PbJoinType::FullOuter => JoinType::FullOuter, - PbJoinType::AsofInner | PbJoinType::AsofLeftOuter | PbJoinType::Unspecified => { + PbJoinType::AsofInner => JoinType::AsOfInner, + PbJoinType::AsofLeftOuter => JoinType::AsOfLeftOuter, + PbJoinType::Unspecified => { unreachable!() } } @@ -73,28 +77,6 @@ impl JoinType { impl JoinType { #![allow(dead_code)] - #[inline(always)] - pub(super) fn need_join_remaining(self) -> bool { - matches!( - self, - JoinType::RightOuter | JoinType::RightAnti | JoinType::FullOuter - ) - } - - fn need_build(self) -> bool { - match self { - JoinType::RightSemi => true, - other => other.need_join_remaining(), - } - } - - fn need_probe(self) -> bool { - matches!( - self, - JoinType::FullOuter | JoinType::LeftOuter | JoinType::LeftAnti | JoinType::LeftSemi - ) - } - fn keep_all(self) -> bool { matches!( self, @@ -111,6 +93,40 @@ impl JoinType { } } +#[derive(Clone, Debug)] +pub enum AsOfInequalityType { + Le, + Lt, + Ge, + Gt, +} + +#[derive(Clone, Debug)] +pub struct AsOfDesc { + pub left_idx: usize, + pub right_idx: usize, + pub inequality_type: AsOfInequalityType, +} + +impl AsOfDesc { + pub fn from_protobuf(desc_proto: &AsOfJoinDesc) -> crate::error::Result { + let typ = match desc_proto.inequality_type() { + AsOfJoinInequalityType::AsOfInequalityTypeLt => AsOfInequalityType::Lt, + AsOfJoinInequalityType::AsOfInequalityTypeLe => AsOfInequalityType::Le, + AsOfJoinInequalityType::AsOfInequalityTypeGt => AsOfInequalityType::Gt, + AsOfJoinInequalityType::AsOfInequalityTypeGe => AsOfInequalityType::Ge, + AsOfJoinInequalityType::AsOfInequalityTypeUnspecified => { + bail!("unspecified AsOf join inequality type") + } + }; + Ok(Self { + left_idx: desc_proto.left_idx as usize, + right_idx: desc_proto.right_idx as usize, + inequality_type: typ, + }) + } +} + /// The layout be like: /// /// [ `left` chunk | `right` chunk ] diff --git a/src/batch/executors/src/executor/join/nested_loop_join.rs b/src/batch/executors/src/executor/join/nested_loop_join.rs index 14917646e84c1..19bc46baccf3f 100644 --- a/src/batch/executors/src/executor/join/nested_loop_join.rs +++ b/src/batch/executors/src/executor/join/nested_loop_join.rs @@ -116,6 +116,9 @@ impl NestedLoopJoinExecutor { JoinType::RightSemi => Self::do_right_semi_anti_join::, JoinType::RightAnti => Self::do_right_semi_anti_join::, JoinType::FullOuter => Self::do_full_outer_join, + JoinType::AsOfInner | JoinType::AsOfLeftOuter => { + unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor") + } }; #[for_await] diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index 698ee898109bd..8a0080f59f612 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -27,7 +27,7 @@ ignored = ["workspace-hack", "workspace-config", "task_stats_alloc", "tikv-jemal [dependencies] clap = { workspace = true } console = "0.15" -const-str = "0.5" +const-str = "0.6" home = "0.5" risingwave_batch_executors = { workspace = true } risingwave_cmd = { workspace = true } diff --git a/src/frontend/planner_test/tests/testdata/input/asof_join.yaml b/src/frontend/planner_test/tests/testdata/input/asof_join.yaml index f6ca65716c2ea..5697a8f2c111d 100644 --- a/src/frontend/planner_test/tests/testdata/input/asof_join.yaml +++ b/src/frontend/planner_test/tests/testdata/input/asof_join.yaml @@ -4,14 +4,15 @@ SELECT * FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1; expected_outputs: - stream_error + - batch_error - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 || 'a' and t1.v2 > t2.v2; expected_outputs: - - batch_error - stream_plan + - batch_plan - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); @@ -19,6 +20,7 @@ SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 *2 < t2.v2; expected_outputs: - stream_plan + - batch_plan - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); @@ -26,6 +28,7 @@ SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 and t1.v3 < t2.v3; expected_outputs: - stream_error + - batch_error - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); @@ -33,3 +36,4 @@ SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v2 < t2.v2; expected_outputs: - stream_error + - batch_error diff --git a/src/frontend/planner_test/tests/testdata/output/asof_join.yaml b/src/frontend/planner_test/tests/testdata/output/asof_join.yaml index 508c9de04f18d..74f8464a74bd7 100644 --- a/src/frontend/planner_test/tests/testdata/output/asof_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/asof_join.yaml @@ -1,7 +1,16 @@ # This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT * FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1; + batch_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition' stream_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition' - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 || 'a' and t1.v2 > t2.v2; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: AsofInner, predicate: t1.v1 = $expr1 AND (t1.v2 > t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2] } + ├─BatchExchange { order: [], dist: HashShard(t1.v1) } + │ └─BatchScan { table: t1, columns: [t1.v1, t1.v2], distribution: SomeShard } + └─BatchExchange { order: [], dist: HashShard($expr1) } + └─BatchProject { exprs: [t2.v1, t2.v2, ConcatOp(t2.v1, 'a':Varchar) as $expr1] } + └─BatchScan { table: t2, columns: [t2.v1, t2.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: NoCheck } └─StreamAsOfJoin { type: AsofInner, predicate: t1.v1 = $expr1 AND (t1.v2 > t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] } @@ -10,10 +19,15 @@ └─StreamExchange { dist: HashShard($expr1) } └─StreamProject { exprs: [t2.v1, t2.v2, ConcatOp(t2.v1, 'a':Varchar) as $expr1, t2._row_id] } └─StreamTableScan { table: t2, columns: [t2.v1, t2.v2, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } - batch_error: |- - Not supported: AsOf join in batch query - HINT: AsOf join is only supported in streaming query - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 *2 < t2.v2; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: AsofLeftOuter, predicate: t1.v1 = t2.v1 AND ($expr1 < t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2] } + ├─BatchExchange { order: [], dist: HashShard(t1.v1) } + │ └─BatchProject { exprs: [t1.v1, t1.v2, (t1.v2 * 2:Int32) as $expr1] } + │ └─BatchScan { table: t1, columns: [t1.v1, t1.v2], distribution: SomeShard } + └─BatchExchange { order: [], dist: HashShard(t2.v1) } + └─BatchScan { table: t2, columns: [t2.v1, t2.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: NoCheck } └─StreamAsOfJoin { type: AsofLeftOuter, predicate: t1.v1 = t2.v1 AND ($expr1 < t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] } @@ -23,6 +37,8 @@ └─StreamExchange { dist: HashShard(t2.v1) } └─StreamTableScan { table: t2, columns: [t2.v1, t2.v2, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 and t1.v3 < t2.v3; + batch_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition' stream_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition' - sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v2 < t2.v2; + batch_error: 'Invalid input syntax: AsOf join requires at least 1 equal condition' stream_error: 'Invalid input syntax: AsOf join requires at least 1 equal condition' diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs index 9256131f569ad..a902160066844 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -15,13 +15,13 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::HashJoinNode; -use risingwave_pb::plan_common::JoinType; +use risingwave_pb::plan_common::{AsOfJoinDesc, JoinType}; use super::batch::prelude::*; use super::utils::{childless_record, Distill}; use super::{ - generic, EqJoinPredicate, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, - ToDistributedBatch, + generic, EqJoinPredicate, ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, + ToBatchPb, ToDistributedBatch, }; use crate::error::Result; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; @@ -38,14 +38,19 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct BatchHashJoin { pub base: PlanBase, core: generic::Join, - /// The join condition must be equivalent to `logical.on`, but separated into equal and /// non-equal parts to facilitate execution later eq_join_predicate: EqJoinPredicate, + /// `AsOf` desc + asof_desc: Option, } impl BatchHashJoin { - pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { + pub fn new( + core: generic::Join, + eq_join_predicate: EqJoinPredicate, + asof_desc: Option, + ) -> Self { let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core); let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); @@ -53,6 +58,7 @@ impl BatchHashJoin { base, core, eq_join_predicate, + asof_desc, } } @@ -66,11 +72,16 @@ impl BatchHashJoin { // we can not derive the hash distribution from the side where outer join can generate a // NULL row (Distribution::HashShard(_), Distribution::HashShard(_)) => match join.join_type { - JoinType::AsofInner | JoinType::AsofLeftOuter | JoinType::Unspecified => { + JoinType::Unspecified => { unreachable!() } JoinType::FullOuter => Distribution::SomeShard, - JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => { + JoinType::Inner + | JoinType::LeftOuter + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::AsofInner + | JoinType::AsofLeftOuter => { let l2o = join.l2i_col_mapping().composite(&join.i2o_col_mapping()); l2o.rewrite_provided_distribution(left) } @@ -127,7 +138,7 @@ impl PlanTreeNodeBinary for BatchHashJoin { let mut core = self.core.clone(); core.left = left; core.right = right; - Self::new(core, self.eq_join_predicate.clone()) + Self::new(core, self.eq_join_predicate.clone(), self.asof_desc) } } @@ -215,6 +226,7 @@ impl ToBatchPb for BatchHashJoin { .as_expr_unless_true() .map(|x| x.to_expr_proto()), output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), + asof_desc: self.asof_desc, }) } } @@ -238,7 +250,15 @@ impl ExprRewritable for BatchHashJoin { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); - Self::new(core, self.eq_join_predicate.rewrite_exprs(r)).into() + let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r); + let desc = self.asof_desc.map(|_| { + LogicalJoin::get_inequality_desc_from_predicate( + eq_join_predicate.other_cond().clone(), + core.left.schema().len(), + ) + .unwrap() + }); + Self::new(core, eq_join_predicate, desc).into() } } diff --git a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs index 449e26434d34b..071e63dfc6281 100644 --- a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs +++ b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs @@ -162,6 +162,16 @@ impl EqJoinPredicate { &mut self.other_cond } + /// Get the equal predicate + pub fn eq_predicate(&self) -> Self { + Self { + other_cond: Condition::true_cond(), + eq_keys: self.eq_keys.clone(), + left_cols_num: self.left_cols_num, + right_cols_num: self.right_cols_num, + } + } + /// Get a reference to the join predicate's eq keys. /// /// Note: `right_col_index` starts from `left_cols_num` diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 208e947ef40d5..180989a2b0467 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -17,7 +17,9 @@ use std::collections::HashMap; use fixedbitset::FixedBitSet; use itertools::{EitherOrBoth, Itertools}; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_pb::plan_common::JoinType; +use risingwave_expr::bail; +use risingwave_pb::expr::expr_node::PbType; +use risingwave_pb::plan_common::{AsOfJoinDesc, JoinType, PbAsOfJoinInequalityType}; use risingwave_pb::stream_plan::StreamScanType; use risingwave_sqlparser::ast::AsOf; @@ -1379,7 +1381,7 @@ impl LogicalJoin { let logical_join = self.clone_with_left_right(left, right); let inequality_desc = - StreamAsOfJoin::get_inequality_desc_from_predicate(predicate.clone(), left_len)?; + Self::get_inequality_desc_from_predicate(predicate.other_cond().clone(), left_len)?; Ok(StreamAsOfJoin::new( logical_join.core.clone(), @@ -1387,17 +1389,71 @@ impl LogicalJoin { inequality_desc, )) } -} -impl ToBatch for LogicalJoin { - fn to_batch(&self) -> Result { - if JoinType::AsofInner == self.join_type() || JoinType::AsofLeftOuter == self.join_type() { - return Err(ErrorCode::NotSupported( - "AsOf join in batch query".to_owned(), - "AsOf join is only supported in streaming query".to_owned(), + /// Convert the logical `AsOf` join to a Hash join + a Group top 1. + fn to_batch_asof_join( + &self, + logical_join: generic::Join, + predicate: EqJoinPredicate, + ) -> Result { + use super::batch::prelude::*; + + if predicate.eq_keys().is_empty() { + return Err(ErrorCode::InvalidInputSyntax( + "AsOf join requires at least 1 equal condition".to_owned(), ) .into()); } + + let left_schema_len = logical_join.left.schema().len(); + let asof_desc = + Self::get_inequality_desc_from_predicate(predicate.non_eq_cond(), left_schema_len)?; + + let batch_join = BatchHashJoin::new(logical_join, predicate, Some(asof_desc)); + Ok(batch_join.into()) + } + + pub fn get_inequality_desc_from_predicate( + predicate: Condition, + left_input_len: usize, + ) -> Result { + let expr: ExprImpl = predicate.into(); + if let Some((left_input_ref, expr_type, right_input_ref)) = expr.as_comparison_cond() { + if left_input_ref.index() < left_input_len && right_input_ref.index() >= left_input_len + { + Ok(AsOfJoinDesc { + left_idx: left_input_ref.index() as u32, + right_idx: (right_input_ref.index() - left_input_len) as u32, + inequality_type: Self::expr_type_to_comparison_type(expr_type)?.into(), + }) + } else { + bail!("inequal condition from the same side should be push down in optimizer"); + } + } else { + Err(ErrorCode::InvalidInputSyntax( + "AsOf join requires exactly 1 ineuquality condition".to_owned(), + ) + .into()) + } + } + + fn expr_type_to_comparison_type(expr_type: PbType) -> Result { + match expr_type { + PbType::LessThan => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeLt), + PbType::LessThanOrEqual => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeLe), + PbType::GreaterThan => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeGt), + PbType::GreaterThanOrEqual => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeGe), + _ => Err(ErrorCode::InvalidInputSyntax(format!( + "Invalid comparison type: {}", + expr_type.as_str_name() + )) + .into()), + } + } +} + +impl ToBatch for LogicalJoin { + fn to_batch(&self) -> Result { let predicate = EqJoinPredicate::create( self.left().schema().len(), self.right().schema().len(), @@ -1411,7 +1467,9 @@ impl ToBatch for LogicalJoin { let ctx = self.base.ctx(); let config = ctx.session_ctx().config(); - if predicate.has_eq() { + if self.join_type() == JoinType::AsofInner || self.join_type() == JoinType::AsofLeftOuter { + self.to_batch_asof_join(logical_join, predicate) + } else if predicate.has_eq() { if !predicate.eq_keys_are_type_aligned() { return Err(ErrorCode::InternalError(format!( "Join eq keys are not aligned for predicate: {predicate:?}" @@ -1427,7 +1485,7 @@ impl ToBatch for LogicalJoin { } } - Ok(BatchHashJoin::new(logical_join, predicate).into()) + Ok(BatchHashJoin::new(logical_join, predicate, None).into()) } else { // Convert to Nested-loop Join for non-equal joins Ok(BatchNestedLoopJoin::new(logical_join).into()) diff --git a/src/frontend/src/optimizer/plan_node/stream_asof_join.rs b/src/frontend/src/optimizer/plan_node/stream_asof_join.rs index 49d6e9c43aabc..b0c417e749cd4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_asof_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_asof_join.rs @@ -15,9 +15,7 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::util::sort_util::OrderType; -use risingwave_expr::bail; -use risingwave_pb::expr::expr_node::PbType; -use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType, PbAsOfJoinInequalityType}; +use risingwave_pb::plan_common::{AsOfJoinDesc, AsOfJoinType, JoinType}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::AsOfJoinNode; @@ -26,10 +24,10 @@ use super::utils::{ childless_record, plan_node_name, watermark_pretty, Distill, TableCatalogBuilder, }; use super::{ - generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamJoinCommon, StreamNode, + generic, ExprRewritable, LogicalJoin, PlanBase, PlanRef, PlanTreeNodeBinary, StreamJoinCommon, + StreamNode, }; -use crate::error::{ErrorCode, Result}; -use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor}; +use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; @@ -97,44 +95,6 @@ impl StreamAsOfJoin { } } - pub fn get_inequality_desc_from_predicate( - predicate: EqJoinPredicate, - left_input_len: usize, - ) -> Result { - let expr: ExprImpl = predicate.other_cond().clone().into(); - if let Some((left_input_ref, expr_type, right_input_ref)) = expr.as_comparison_cond() { - if left_input_ref.index() < left_input_len && right_input_ref.index() >= left_input_len - { - Ok(AsOfJoinDesc { - left_idx: left_input_ref.index() as u32, - right_idx: (right_input_ref.index() - left_input_len) as u32, - inequality_type: Self::expr_type_to_comparison_type(expr_type)?.into(), - }) - } else { - bail!("inequal condition from the same side should be push down in optimizer"); - } - } else { - Err(ErrorCode::InvalidInputSyntax( - "AsOf join requires exactly 1 ineuquality condition".to_owned(), - ) - .into()) - } - } - - fn expr_type_to_comparison_type(expr_type: PbType) -> Result { - match expr_type { - PbType::LessThan => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeLt), - PbType::LessThanOrEqual => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeLe), - PbType::GreaterThan => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeGt), - PbType::GreaterThanOrEqual => Ok(PbAsOfJoinInequalityType::AsOfInequalityTypeGe), - _ => Err(ErrorCode::InvalidInputSyntax(format!( - "Invalid comparison type: {}", - expr_type.as_str_name() - )) - .into()), - } - } - /// Get join type pub fn join_type(&self) -> JoinType { self.core.join_type @@ -332,8 +292,8 @@ impl ExprRewritable for StreamAsOfJoin { let mut core = self.core.clone(); core.rewrite_exprs(r); let eq_join_predicate = self.eq_join_predicate.rewrite_exprs(r); - let desc = Self::get_inequality_desc_from_predicate( - eq_join_predicate.clone(), + let desc = LogicalJoin::get_inequality_desc_from_predicate( + eq_join_predicate.other_cond().clone(), core.left.schema().len(), ) .unwrap(); diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index cf6319c894a0e..f1915dc7e5748 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -657,7 +657,7 @@ pub(crate) mod tests { let eq_join_predicate = EqJoinPredicate::new(Condition::true_cond(), vec![eq_key_1, eq_key_2], 2, 2); let hash_join_node: PlanRef = - BatchHashJoin::new(logical_join_node, eq_join_predicate).into(); + BatchHashJoin::new(logical_join_node, eq_join_predicate, None).into(); let batch_exchange_node: PlanRef = BatchExchange::new( hash_join_node.clone(), Order::default(),