Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: default UDWFImpl::expressions returns all expressions #13169

Merged

Conversation

Michael-J-Ward
Copy link
Contributor

Which issue does this PR close?

Closes #13168.

Rationale for this change

Same as in #13168.

What changes are included in this PR?

WindowUDFImpl::expressions is changed to return all the input expressions by default instead of only the 1st one.

Are these changes tested?

Covered by existing tests.

Are there any user-facing changes?

Not from a released version.

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Oct 29, 2024
Copy link
Contributor

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I tested this same code on datafusion-python. We do have a unit test there that caught the issue. I wonder if we should add one here. Not a blocking suggestion, because I do think this is a regression.

@alamb
Copy link
Contributor

alamb commented Oct 29, 2024

Thank you! I tested this same code on datafusion-python. We do have a unit test there that caught the issue. I wonder if we should add one here. Not a blocking suggestion, because I do think this is a regression.

Thank you @Michael-J-Ward and @timsaucer

The reason we introduced a regression is likely due to the lack of a test and thus I think a test with the fix would be super helpful. Any thoughts about how to trigger it ? I can potentially help write such a test

@timsaucer
Copy link
Contributor

The reason we introduced a regression is likely due to the lack of a test and thus I think a test with the fix would be super helpful. Any thoughts about how to trigger it ? I can potentially help write such a test

I'll write it up.

@jcsherin
Copy link
Contributor

jcsherin commented Oct 29, 2024

This is a possible way to unit test the default implementation of expressions().

But putting this in test datafusion/expr/src/udwf.rs is not possible because of the dependency on datafusion_physical_expr::expressions::lit. So maybe we can add this to the functions-window crate.

#[cfg(test)]
mod tests {
    use arrow::datatypes::{DataType, Field};
    use datafusion_common::Result;
    use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl};
    use datafusion_functions_window_common::expr::ExpressionArgs;
    use datafusion_functions_window_common::field::WindowUDFFieldArgs;
    use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
    use std::any::Any;
    use datafusion_physical_expr::expressions::lit;

    #[derive(Debug)]
    struct ThreeArgWindowUDF {
        signature: Signature,
    }

    impl ThreeArgWindowUDF {
        fn new() -> Self {
            Self {
                signature: Signature::uniform(
                    3,
                    vec![DataType::Int32, DataType::Boolean, DataType::Float32],
                    Volatility::Immutable,
                ),
            }
        }
    }

    impl WindowUDFImpl for ThreeArgWindowUDF {
        fn as_any(&self) -> &dyn Any {
            self
        }

        fn name(&self) -> &str {
            "three_arg_window_udf"
        }

        fn signature(&self) -> &Signature {
            &self.signature
        }

        fn partition_evaluator(&self, _: PartitionEvaluatorArgs) -> Result<Box<dyn PartitionEvaluator>> {
            todo!()
        }

        fn field(&self, _: WindowUDFFieldArgs) -> Result<Field> {
            todo!()
        }
    }

    #[test]
    fn test_input_expressions() -> Result<()> {
        let udwf = WindowUDF::from(ThreeArgWindowUDF::new());

        let input_exprs = vec![lit(1), lit(false), lit(0.5)]; // Vec<Arc<dyn PhysicalExpr>>
        let input_types =[DataType::Int32, DataType::Boolean, DataType::Float32]; // Vec<DataType>
        let actual = udwf.expressions(ExpressionArgs::new(&input_exprs, &input_types));

        assert_eq!(actual.len(), 3);

        assert_eq!(
            format!("{:?}", actual.first().unwrap()),
            format!("{:?}", input_exprs.first().unwrap()),
        );
        assert_eq!(
            format!("{:?}", actual.get(1).unwrap()),
            format!("{:?}", input_exprs.get(1).unwrap())
        );
        assert_eq!(
            format!("{:?}", actual.get(2).unwrap()),
            format!("{:?}", input_exprs.get(2).unwrap())
        );

        Ok(())
    }
}

@timsaucer
Copy link
Contributor

Thank you for writing that up! I added it, verified it fails without @Michael-J-Ward 's fix and passes with the correction. @Michael-J-Ward do you mind if I push it to your branch?

@jcsherin
Copy link
Contributor

Since the testing approach made sense, I rewrote it to be a bit more exhaustive than the initial iteration.

I added more test cases to cover when 0, 1, 2 and 3 input expressions are provided. Earlier version only checks for exactly 3 arguments and I think that is insufficient testing to avoid any regressions in the future.

It passes in main when there are only 0 or 1 argument. It fails when 2 or more arguments are provided. The failing test cases will only pass with @Michael-J-Ward's fix.

$ cargo test -p datafusion-functions-window test_default_expressions

running 1 test
test tests::test_default_expressions ... FAILED

failures:

---- tests::test_default_expressions stdout ----
thread 'tests::test_default_expressions' panicked at datafusion/functions-window/src/lib.rs:192:13:
assertion `left == right` failed:
Input expressions: [Column { name: "a", index: 0 }, Column { name: "b", index: 1 }]
Returned expressions: [Column { name: "a", index: 0 }]
  left: 2
 right: 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_default_expressions

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 10 filtered out; finished in 0.00s

The same limitation as earlier applies. It is not possible to place this unit test in datafusion/expr/src/udwf.rs because of this dependency: datafusion_physical_expr::expressions::lit.

I think we can add this to the functions-window crate.


@timsaucer Please feel free to make any improvements you feel would be beneficial.

Code:

#[cfg(test)]
mod tests {
    use arrow::datatypes::{DataType, Field, Schema};
    use datafusion_common::Result;
    use datafusion_expr::{
        PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF,
        WindowUDFImpl,
    };
    use datafusion_functions_window_common::expr::ExpressionArgs;
    use datafusion_functions_window_common::field::WindowUDFFieldArgs;
    use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
    use datafusion_physical_expr::expressions::{col, lit};
    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
    use std::any::Any;

    #[derive(Debug)]
    struct VariadicWindowUDF {
        signature: Signature,
    }

    impl VariadicWindowUDF {
        fn new() -> Self {
            Self {
                signature: Signature::one_of(
                    vec![
                        TypeSignature::Any(0),
                        TypeSignature::Any(1),
                        TypeSignature::Any(2),
                        TypeSignature::Any(3),
                    ],
                    Volatility::Immutable,
                ),
            }
        }
    }

    impl WindowUDFImpl for VariadicWindowUDF {
        fn as_any(&self) -> &dyn Any {
            self
        }

        fn name(&self) -> &str {
            "variadic_window_udf"
        }

        fn signature(&self) -> &Signature {
            &self.signature
        }

        fn partition_evaluator(
            &self,
            _: PartitionEvaluatorArgs,
        ) -> Result<Box<dyn PartitionEvaluator>> {
            unimplemented!("unnecessary for testing");
        }

        fn field(&self, _: WindowUDFFieldArgs) -> Result<Field> {
            unimplemented!("unnecessary for testing");
        }
    }

    #[test]
    // Fixes: default implementation of `WindowUDFImpl::expressions`
    // returns all input expressions to the user-defined window
    // function unmodified.
    //
    // See: https://github.com/apache/datafusion/pull/13169
    fn test_default_expressions() -> Result<()> {
        let udwf = WindowUDF::from(VariadicWindowUDF::new());

        let field_a = Field::new("a", DataType::Int32, false);
        let field_b = Field::new("b", DataType::Float32, false);
        let field_c = Field::new("c", DataType::Boolean, false);
        let schema = Schema::new(vec![field_a, field_b, field_c]);

        let test_cases = vec![
            //
            // Zero arguments
            //
            vec![],
            //
            // Single argument
            //
            vec![col("a", &schema)?],
            vec![lit(1)],
            //
            // Two arguments
            //
            vec![col("a", &schema)?, col("b", &schema)?],
            vec![col("a", &schema)?, lit(2)],
            vec![lit(false), col("a", &schema)?],
            //
            // Three arguments
            //
            vec![col("a", &schema)?, col("b", &schema)?, col("c", &schema)?],
            vec![col("a", &schema)?, col("b", &schema)?, lit(false)],
            vec![col("a", &schema)?, lit(0.5), col("c", &schema)?],
            vec![lit(3), col("b", &schema)?, col("c", &schema)?],
        ];

        for input_exprs in &test_cases {
            let input_types = input_exprs
                .iter()
                .map(|expr: &std::sync::Arc<dyn PhysicalExpr>| {
                    expr.data_type(&schema).unwrap()
                })
                .collect::<Vec<_>>();
            let expr_args = ExpressionArgs::new(input_exprs, &input_types);

            let ret_exprs = udwf.expressions(expr_args);

            // Verify same number of input expressions are returned
            assert_eq!(
                input_exprs.len(),
                ret_exprs.len(),
                "\nInput expressions: {:?}\nReturned expressions: {:?}",
                input_exprs,
                ret_exprs
            );

            // Compares each returned expression with original input expressions
            for (expected, actual) in input_exprs.iter().zip(&ret_exprs) {
                assert_eq!(
                    format!("{expected:?}"),
                    format!("{actual:?}"),
                    "\nInput expressions: {:?}\nReturned expressions: {:?}",
                    input_exprs,
                    ret_exprs
                );
            }
        }
        Ok(())
    }
}

@alamb
Copy link
Contributor

alamb commented Oct 30, 2024

The same limitation as earlier applies. It is not possible to place this unit test in datafusion/expr/src/udwf.rs because of this dependency: datafusion_physical_expr::expressions::lit.

We can always put the tests in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/core_integration.rs which has access to all the functionality

@github-actions github-actions bot added the core Core DataFusion crate label Oct 30, 2024
@timsaucer
Copy link
Contributor

Thank you @jcsherin . I appreciate you taking point on writing up the unit test. I've tested and pushed it. Does anyone else want to review?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome -- thank you everyone 🙏

@alamb alamb merged commit 68c042d into apache:main Oct 31, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

default impl for WindowUDFImpl::expressions should use all input expressions
4 participants