From 70cded6e3d95036d4150d4c77b7e57caa90d7a22 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 30 Sep 2023 00:30:26 +0800 Subject: [PATCH] fix: substrait limit when fetch is None (#7669) * fix: substrait limit when fetch is None Signed-off-by: Ruihang Xia * Add comments --------- Signed-off-by: Ruihang Xia Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/limit.rs | 2 +- datafusion/substrait/src/logical_plan/consumer.rs | 9 +++++++-- datafusion/substrait/src/logical_plan/producer.rs | 3 ++- .../substrait/tests/cases/roundtrip_logical_plan.rs | 5 +++++ 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 922c3db0efc8..31ed08399c2e 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -442,7 +442,7 @@ impl LimitStream { match &poll { Poll::Ready(Some(Ok(batch))) => { - if batch.num_rows() > 0 && self.skip == 0 { + if batch.num_rows() > 0 { break poll; } else { // continue to poll input stream diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 32b8f8ea547f..e1dde39427a5 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -227,8 +227,13 @@ pub async fn from_substrait_rel( from_substrait_rel(ctx, input, extensions).await?, ); let offset = fetch.offset as usize; - let count = fetch.count as usize; - input.limit(offset, Some(count))?.build() + // Since protobuf can't directly distinguish `None` vs `0` `None` is encoded as `MAX` + let count = if fetch.count as usize == usize::MAX { + None + } else { + Some(fetch.count as usize) + }; + input.limit(offset, count)?.build() } else { not_impl_err!("Fetch without an input is not valid") } diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index e17b022f3b53..1124ea53a557 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -193,7 +193,8 @@ pub fn to_substrait_rel( } LogicalPlan::Limit(limit) => { let input = to_substrait_rel(limit.input.as_ref(), ctx, extension_info)?; - let limit_fetch = limit.fetch.unwrap_or(0); + // Since protobuf can't directly distinguish `None` vs `0` encode `None` as `MAX` + let limit_fetch = limit.fetch.unwrap_or(usize::MAX); Ok(Box::new(Rel { rel_type: Some(RelType::Fetch(Box::new(FetchRel { common: None, diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f4d74ae42681..2554d0667e48 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -188,6 +188,11 @@ async fn select_with_limit() -> Result<()> { roundtrip_fill_na("SELECT * FROM data LIMIT 100").await } +#[tokio::test] +async fn select_without_limit() -> Result<()> { + roundtrip_fill_na("SELECT * FROM data OFFSET 10").await +} + #[tokio::test] async fn select_with_limit_offset() -> Result<()> { roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await