From a27ed7b04ca062f81420d2c99a62ebd4df015fb0 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 16 Nov 2023 14:50:54 -0500 Subject: [PATCH] Add to_utc_timestamp support in more dialects (#420) * Add support for to_utc_timestamp in more dialects * Use setup-pixi action * Manual pixi installation * again --- .github/workflows/build_test.yml | 76 +++++++------ .github/workflows/java.yml | 29 +++-- vegafusion-sql/Cargo.toml | 2 +- vegafusion-sql/src/compile/scalar.rs | 51 +++++---- vegafusion-sql/src/dialect/mod.rs | 40 ++++++- .../transforms/epoch_ms_to_utc_timestamp.rs | 7 +- .../dialect/transforms/to_utc_timestamp.rs | 107 +++++++++++++++++- vegafusion-sql/tests/expected/select.toml | 37 +++++- vegafusion-sql/tests/test_select.rs | 75 ++++++++++++ 9 files changed, 349 insertions(+), 75 deletions(-) diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml index 4480cc827..aeb12e127 100644 --- a/.github/workflows/build_test.yml +++ b/.github/workflows/build_test.yml @@ -8,8 +8,10 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 + cache: true - name: Cache uses: actions/cache@v3 with: @@ -37,14 +39,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi (linux / osx) - if: ${{ runner.os != 'Windows' }} - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH - - name: Install pixi (windows) - if: ${{ runner.os == 'Windows' }} - run: | - iwr -useb https://pixi.sh/install.ps1 | iex - echo "${HOME}/AppData/Local/pixi/bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -71,8 +68,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi (linux / osx) - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -158,10 +156,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: | - iwr -useb https://pixi.sh/install.ps1 | iex - echo "${HOME}/AppData/Local/pixi/bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -186,8 +183,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -212,8 +210,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -239,8 +238,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi (linux / osx) - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -265,8 +265,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi (linux / osx) - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -401,8 +402,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi on linux - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -452,8 +454,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -560,10 +563,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: | - iwr -useb https://pixi.sh/install.ps1 | iex - echo "${HOME}/AppData/Local/pixi/bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -593,8 +595,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -623,8 +626,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 6ef018d92..c142402c0 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -8,8 +8,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -36,8 +37,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -64,8 +66,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: @@ -94,10 +97,17 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 +# # Using pixi action here results in error on cleanup: +# # EBUSY: resource busy or locked, unlink 'D:\a\vegafusion\vegafusion\.pixi\env\Library\lib\jvm\lib\modules' +# - uses: prefix-dev/setup-pixi@v0.4.1 +# with: +# pixi-version: v0.7.0 +# # So use manual install for now - name: Install pixi run: | iwr -useb https://pixi.sh/install.ps1 | iex - echo "${HOME}/AppData/Local/pixi/bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append + echo "${HOME}\.pixi\bin" + echo "${HOME}\.pixi\bin" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append - name: Cache uses: actions/cache@v3 with: @@ -129,8 +139,9 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # pin@v3.5.2 - - name: Install pixi - run: curl -fsSL https://pixi.sh/install.sh | bash && echo "${HOME}/.pixi/bin" >> $GITHUB_PATH + - uses: prefix-dev/setup-pixi@v0.4.1 + with: + pixi-version: v0.7.0 - name: Cache uses: actions/cache@v3 with: diff --git a/vegafusion-sql/Cargo.toml b/vegafusion-sql/Cargo.toml index 6a2689cf4..65140aa60 100644 --- a/vegafusion-sql/Cargo.toml +++ b/vegafusion-sql/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" description = "VegaFusion SQL dialect generation and connection implementations" [features] -datafusion-conn = [ "datafusion", "tempfile", "reqwest", "reqwest-retry", "reqwest-middleware", "vegafusion-datafusion-udfs", "object_store", "url"] +datafusion-conn = [ "datafusion", "tempfile", "reqwest", "reqwest-retry", "reqwest-middleware", "vegafusion-datafusion-udfs", "object_store", "url", "vegafusion-common/object_store",] [dependencies] async-trait = "0.1.73" diff --git a/vegafusion-sql/src/compile/scalar.rs b/vegafusion-sql/src/compile/scalar.rs index 5c9f8a14b..eefe47d1d 100644 --- a/vegafusion-sql/src/compile/scalar.rs +++ b/vegafusion-sql/src/compile/scalar.rs @@ -1,12 +1,19 @@ use crate::compile::data_type::ToSqlDataType; +use crate::compile::expr::ToSqlExpr; use crate::dialect::Dialect; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::scalar::ScalarValue; +use datafusion_common::DFSchema; +use datafusion_expr::{ + expr, lit, ColumnarValue, Expr, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, + Signature, Volatility, +}; use sqlparser::ast::{ Expr as SqlExpr, Function as SqlFunction, FunctionArg as SqlFunctionArg, FunctionArgExpr, Ident, ObjectName as SqlObjectName, Value as SqlValue, }; use std::ops::Add; +use std::sync::Arc; use vegafusion_common::error::{Result, VegaFusionError}; pub trait ToSqlScalar { @@ -167,28 +174,28 @@ impl ToSqlScalar for ScalarValue { )), ScalarValue::TimestampSecond(v, _) => { if let Some(v) = v { - Ok(ms_to_timestamp(v * 1000)) + Ok(ms_to_timestamp(v * 1000, dialect)?) } else { Ok(SqlExpr::Value(SqlValue::Null)) } } ScalarValue::TimestampMillisecond(v, _) => { if let Some(v) = v { - Ok(ms_to_timestamp(*v)) + Ok(ms_to_timestamp(*v, dialect)?) } else { Ok(SqlExpr::Value(SqlValue::Null)) } } ScalarValue::TimestampMicrosecond(v, _) => { if let Some(v) = v { - Ok(ms_to_timestamp(v / 1000)) + Ok(ms_to_timestamp(v / 1000, dialect)?) } else { Ok(SqlExpr::Value(SqlValue::Null)) } } ScalarValue::TimestampNanosecond(v, _) => { if let Some(v) = v { - Ok(ms_to_timestamp(v / 1000000)) + Ok(ms_to_timestamp(v / 1000000, dialect)?) } else { Ok(SqlExpr::Value(SqlValue::Null)) } @@ -245,24 +252,26 @@ impl ToSqlScalar for ScalarValue { } } -fn ms_to_timestamp(v: i64) -> SqlExpr { - let function_ident = Ident { - value: "epoch_ms_to_utc_timestamp".to_string(), - quote_style: None, - }; - - let v_ms_expr = SqlExpr::Value(SqlValue::Number(v.to_string(), false)); - - let args = vec![SqlFunctionArg::Unnamed(FunctionArgExpr::Expr(v_ms_expr))]; +fn ms_to_timestamp(v: i64, dialect: &Dialect) -> Result { + // Hack to recursively transform the epoch_ms_to_utc_timestamp + let return_type: ReturnTypeFunction = + Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None)))); + let signature: Signature = Signature::exact(vec![DataType::Int64], Volatility::Immutable); + let scalar_fn: ScalarFunctionImplementation = Arc::new(move |_args: &[ColumnarValue]| { + panic!("Placeholder UDF implementation should not be called") + }); - SqlExpr::Function(SqlFunction { - name: SqlObjectName(vec![function_ident]), - args, - over: None, - distinct: false, - special: false, - order_by: Default::default(), + let udf = ScalarUDF::new( + "epoch_ms_to_utc_timestamp", + &signature, + &return_type, + &scalar_fn, + ); + Expr::ScalarUDF(expr::ScalarUDF { + fun: Arc::new(udf), + args: vec![lit(v)], }) + .to_sql(dialect, &DFSchema::empty()) } fn date32_to_date(days: &Option, dialect: &Dialect) -> Result { diff --git a/vegafusion-sql/src/dialect/mod.rs b/vegafusion-sql/src/dialect/mod.rs index ce73b61b6..c42ee0514 100644 --- a/vegafusion-sql/src/dialect/mod.rs +++ b/vegafusion-sql/src/dialect/mod.rs @@ -35,7 +35,10 @@ use crate::dialect::transforms::str_to_utc_timestamp::{ StrToUtcTimestampSnowflakeTransformer, StrToUtcTimestampWithCastAndAtTimeZoneTransformer, StrToUtcTimestampWithCastFunctionAtTransformer, StrToUtcTimestampWithFunctionTransformer, }; -use crate::dialect::transforms::to_utc_timestamp::ToUtcTimestampWithAtTimeZoneTransformer; +use crate::dialect::transforms::to_utc_timestamp::{ + ToUtcTimestampBigQueryTransform, ToUtcTimestampSnowflakeTransform, + ToUtcTimestampWithAtTimeZoneTransformer, +}; use crate::dialect::transforms::utc_timestamp_to_epoch_ms::{ UtcTimestampToEpochMsDatabricksTransform, UtcTimestampToEpochMsDuckdbTransform, UtcTimestampToEpochMsPostgresTransform, UtcTimestampToEpochMsSnowflakeTransform, @@ -484,6 +487,10 @@ impl Dialect { "date_to_utc_timestamp", DateToUtcTimestampWithFunctionTransformer::new_dyn("timestamp"), ), + ( + "to_utc_timestamp", + ToUtcTimestampBigQueryTransform::new_dyn(), + ), ] .into_iter() .map(|(name, v)| (name.to_string(), v)) @@ -654,9 +661,30 @@ impl Dialect { .collect(), binary_op_transforms: Default::default(), scalar_functions: vec![ - "abs", "acos", "asin", "atan", "atan2", "ceil", "coalesce", "cos", "exp", "floor", - "ln", "log10", "log2", "pow", "round", "sin", "sqrt", "tan", "random", "substr", - "concat", "upper", "lower", + "abs", + "acos", + "asin", + "atan", + "atan2", + "ceil", + "coalesce", + "cos", + "exp", + "floor", + "ln", + "log10", + "log2", + "pow", + "round", + "sin", + "sqrt", + "tan", + "random", + "substr", + "concat", + "upper", + "lower", + "to_utc_timestamp", ] .iter() .map(|s| s.to_string()) @@ -1613,6 +1641,10 @@ impl Dialect { "date_to_utc_timestamp", DateToUtcTimestampSnowflakeTransform::new_dyn(), ), + ( + "to_utc_timestamp", + ToUtcTimestampSnowflakeTransform::new_dyn(), + ), ] .into_iter() .map(|(name, v)| (name.to_string(), v)) diff --git a/vegafusion-sql/src/dialect/transforms/epoch_ms_to_utc_timestamp.rs b/vegafusion-sql/src/dialect/transforms/epoch_ms_to_utc_timestamp.rs index 9468617cf..a32b04905 100644 --- a/vegafusion-sql/src/dialect/transforms/epoch_ms_to_utc_timestamp.rs +++ b/vegafusion-sql/src/dialect/transforms/epoch_ms_to_utc_timestamp.rs @@ -213,6 +213,11 @@ impl FunctionTransformer for EpochMsToUtcTimestampPostgresTransformer { order_by: Default::default(), }); + let to_timestamp_at_utc_expr = SqlExpr::AtTimeZone { + timestamp: Box::new(to_timestamp_expr), + time_zone: "UTC".to_string(), + }; + let interval_expr = SqlExpr::Interval(sqlparser::ast::Interval { value: Box::new(SqlExpr::Value(SqlValue::SingleQuotedString( "1 millisecond".to_string(), @@ -230,7 +235,7 @@ impl FunctionTransformer for EpochMsToUtcTimestampPostgresTransformer { }; let interval_addition_expr = SqlExpr::BinaryOp { - left: Box::new(to_timestamp_expr), + left: Box::new(to_timestamp_at_utc_expr), op: SqlBinaryOperator::Plus, right: Box::new(interval_mult_expr), }; diff --git a/vegafusion-sql/src/dialect/transforms/to_utc_timestamp.rs b/vegafusion-sql/src/dialect/transforms/to_utc_timestamp.rs index 6ad5c9aa9..dc573974f 100644 --- a/vegafusion-sql/src/dialect/transforms/to_utc_timestamp.rs +++ b/vegafusion-sql/src/dialect/transforms/to_utc_timestamp.rs @@ -3,7 +3,11 @@ use crate::dialect::{Dialect, FunctionTransformer}; use arrow::datatypes::DataType; use datafusion_common::DFSchema; use datafusion_expr::{Expr, ExprSchemable}; -use sqlparser::ast::{Expr as SqlExpr, Value as SqlValue}; +use sqlparser::ast::{ + Expr as SqlExpr, Function as SqlFunction, FunctionArg as SqlFunctionArg, + FunctionArgExpr as SqlFunctionArgExpr, Ident as SqlIdent, ObjectName as SqlObjectName, + Value as SqlValue, +}; use std::sync::Arc; use vegafusion_common::error::{Result, VegaFusionError}; @@ -77,3 +81,104 @@ impl FunctionTransformer for ToUtcTimestampWithAtTimeZoneTransformer { Ok(utc_expr) } } + +/// Convert to_utc_timestamp(ts, tz) -> +/// CONVERT_TIMEZONE(tz, 'UTC', ts) +/// or if tz = 'UTC' +/// ts +#[derive(Clone, Debug)] +pub struct ToUtcTimestampSnowflakeTransform; + +impl ToUtcTimestampSnowflakeTransform { + pub fn new_dyn() -> Arc { + Arc::new(Self) + } +} + +impl FunctionTransformer for ToUtcTimestampSnowflakeTransform { + fn transform(&self, args: &[Expr], dialect: &Dialect, schema: &DFSchema) -> Result { + let (ts_arg, time_zone) = process_to_utc_timestamp_args(args, dialect, schema)?; + + if time_zone == "UTC" { + // No conversion needed + Ok(ts_arg) + } else { + let convert_tz_expr = SqlExpr::Function(SqlFunction { + name: SqlObjectName(vec![SqlIdent { + value: "convert_timezone".to_string(), + quote_style: None, + }]), + args: vec![ + SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(SqlExpr::Value( + SqlValue::SingleQuotedString(time_zone), + ))), + SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(SqlExpr::Value( + SqlValue::SingleQuotedString("UTC".to_string()), + ))), + SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(ts_arg)), + ], + over: None, + distinct: false, + special: false, + order_by: Default::default(), + }); + + Ok(convert_tz_expr) + } + } +} + +/// Convert to_utc_timestamp(ts, tz) -> +/// timestamp(CAST(ts as DATETIME), tz) +/// or if tz = 'UTC' +/// ts +#[derive(Clone, Debug)] +pub struct ToUtcTimestampBigQueryTransform; + +impl ToUtcTimestampBigQueryTransform { + pub fn new_dyn() -> Arc { + Arc::new(Self) + } +} + +impl FunctionTransformer for ToUtcTimestampBigQueryTransform { + fn transform(&self, args: &[Expr], dialect: &Dialect, schema: &DFSchema) -> Result { + let (ts_arg, time_zone) = process_to_utc_timestamp_args(args, dialect, schema)?; + + if time_zone == "UTC" { + // No conversion needed + Ok(ts_arg) + } else { + let datetime_expr = SqlExpr::Function(SqlFunction { + name: SqlObjectName(vec![SqlIdent { + value: "datetime".to_string(), + quote_style: None, + }]), + args: vec![SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(ts_arg))], + over: None, + distinct: false, + special: false, + order_by: Default::default(), + }); + + let convert_tz_expr = SqlExpr::Function(SqlFunction { + name: SqlObjectName(vec![SqlIdent { + value: "timestamp".to_string(), + quote_style: None, + }]), + args: vec![ + SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(datetime_expr)), + SqlFunctionArg::Unnamed(SqlFunctionArgExpr::Expr(SqlExpr::Value( + SqlValue::SingleQuotedString(time_zone), + ))), + ], + over: None, + distinct: false, + special: false, + order_by: Default::default(), + }); + + Ok(convert_tz_expr) + } + } +} diff --git a/vegafusion-sql/tests/expected/select.toml b/vegafusion-sql/tests/expected/select.toml index 49619c924..3db7689e9 100644 --- a/vegafusion-sql/tests/expected/select.toml +++ b/vegafusion-sql/tests/expected/select.toml @@ -595,7 +595,7 @@ WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), ( """ mysql = "UNSUPPORTED" postgres = """ -WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), (3, 1641192141999), (4, NULL)) AS "_values" ("a", "t")), values1 AS (SELECT "a", "t", to_timestamp(FLOOR("t" / 1000)) + INTERVAL '1 millisecond' * ("t" % 1000) AS "t_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST +WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), (3, 1641192141999), (4, NULL)) AS "_values" ("a", "t")), values1 AS (SELECT "a", "t", to_timestamp(FLOOR("t" / 1000)) AT TIME ZONE 'UTC' + INTERVAL '1 millisecond' * ("t" % 1000) AS "t_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST """ redshift = "UNSUPPORTED" snowflake = """ @@ -629,7 +629,7 @@ WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), ( """ mysql = "UNSUPPORTED" postgres = """ -WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), (3, 1641192141999), (4, NULL)) AS "_values" ("a", "t")), values1 AS (SELECT "a", "t", to_timestamp(FLOOR("t" / 1000)) + INTERVAL '1 millisecond' * ("t" % 1000) AS "t_utc" FROM values0), values2 AS (SELECT "a", "t", "t_utc", FLOOR(EXTRACT(EPOCH FROM "t_utc") * 1000) AS "epoch_millis" FROM values1) SELECT * FROM values2 ORDER BY "a" ASC NULLS FIRST +WITH values0 AS (SELECT * FROM (VALUES (1, 1641058496123), (2, 1641108601321), (3, 1641192141999), (4, NULL)) AS "_values" ("a", "t")), values1 AS (SELECT "a", "t", to_timestamp(FLOOR("t" / 1000)) AT TIME ZONE 'UTC' + INTERVAL '1 millisecond' * ("t" % 1000) AS "t_utc" FROM values0), values2 AS (SELECT "a", "t", "t_utc", FLOOR(EXTRACT(EPOCH FROM "t_utc") * 1000) AS "epoch_millis" FROM values1) SELECT * FROM values2 ORDER BY "a" ASC NULLS FIRST """ redshift = "UNSUPPORTED" snowflake = """ @@ -758,6 +758,39 @@ result = ''' +---+------------+---------------------+ ''' +[to_utc_timestamp] +athena = "UNSUPPORTED" +bigquery = """ +WITH values0 AS (SELECT 0 AS `a`, timestamp_millis(1641058496123) AS `b` UNION ALL SELECT 1 AS `a`, timestamp_millis(1641108601321) AS `b` UNION ALL SELECT 2 AS `a`, timestamp_millis(1641192141999) AS `b`), values1 AS (SELECT `a`, `b`, timestamp(datetime(`b`), 'America/New_York') AS `b_utc` FROM values0) SELECT * FROM values1 ORDER BY `a` ASC NULLS FIRST +""" +clickhouse = "UNSUPPORTED" +databricks = """ +WITH values0 AS (SELECT * FROM (VALUES (0, dateadd(millisecond, 1641058496123 % 1000, from_unixtime(floor(1641058496123 / 1000)))), (1, dateadd(millisecond, 1641108601321 % 1000, from_unixtime(floor(1641108601321 / 1000)))), (2, dateadd(millisecond, 1641192141999 % 1000, from_unixtime(floor(1641192141999 / 1000))))) AS `_values` (`a`, `b`)), values1 AS (SELECT `a`, `b`, to_utc_timestamp(`b`, 'America/New_York') AS `b_utc` FROM values0) SELECT * FROM values1 ORDER BY `a` ASC NULLS FIRST +""" +datafusion = """ +WITH values0 AS (SELECT * FROM (VALUES (0, epoch_ms_to_utc_timestamp(1641058496123)), (1, epoch_ms_to_utc_timestamp(1641108601321)), (2, epoch_ms_to_utc_timestamp(1641192141999))) AS "_values" ("a", "b")), values1 AS (SELECT "a", "b", to_utc_timestamp("b", 'America/New_York') AS "b_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST +""" +duckdb = """ +WITH values0 AS (SELECT * FROM (VALUES (0, epoch_ms(1641058496123)), (1, epoch_ms(1641108601321)), (2, epoch_ms(1641192141999))) AS "_values" ("a", "b")), values1 AS (SELECT "a", "b", "b" AT TIME ZONE 'America/New_York' AT TIME ZONE 'UTC' AS "b_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST +""" +mysql = "UNSUPPORTED" +postgres = """ +WITH values0 AS (SELECT * FROM (VALUES (0, to_timestamp(floor(1641058496123 / 1000)) AT TIME ZONE 'UTC' + INTERVAL '1 millisecond' * (1641058496123 % 1000)), (1, to_timestamp(floor(1641108601321 / 1000)) AT TIME ZONE 'UTC' + INTERVAL '1 millisecond' * (1641108601321 % 1000)), (2, to_timestamp(floor(1641192141999 / 1000)) AT TIME ZONE 'UTC' + INTERVAL '1 millisecond' * (1641192141999 % 1000))) AS "_values" ("a", "b")), values1 AS (SELECT "a", "b", "b" AT TIME ZONE 'America/New_York' AT TIME ZONE 'UTC' AS "b_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST +""" +redshift = "UNSUPPORTED" +snowflake = """ +WITH values0 AS (SELECT "COLUMN1" AS "a", "COLUMN2" AS "b" FROM (VALUES (0, to_timestamp_ntz(1641058496123, 3)), (1, to_timestamp_ntz(1641108601321, 3)), (2, to_timestamp_ntz(1641192141999, 3)))), values1 AS (SELECT "a", "b", convert_timezone('America/New_York', 'UTC', "b") AS "b_utc" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST +""" +result = ''' ++---+-------------------------+-------------------------+ +| a | b | b_utc | ++---+-------------------------+-------------------------+ +| 0 | 2022-01-01T17:34:56.123 | 2022-01-01T22:34:56.123 | +| 1 | 2022-01-02T07:30:01.321 | 2022-01-02T12:30:01.321 | +| 2 | 2022-01-03T06:42:21.999 | 2022-01-03T11:42:21.999 | ++---+-------------------------+-------------------------+ +''' + [test_string_ops] athena = """ WITH values0 AS (SELECT * FROM (VALUES (0, '1234', 'efGH'), (1, 'abCD', '5678'), (3, NULL, NULL)) AS "_values" ("a", "b", "c")), values1 AS (SELECT "a", "b", "c", substr("b", 2, 2) AS "b_substr", concat("b", ' ', "c") AS "bc_concat", upper("b") AS "b_upper", lower("b") AS "b_lower" FROM values0) SELECT * FROM values1 ORDER BY "a" ASC NULLS FIRST diff --git a/vegafusion-sql/tests/test_select.rs b/vegafusion-sql/tests/test_select.rs index 60b51b91b..c1c52ff70 100644 --- a/vegafusion-sql/tests/test_select.rs +++ b/vegafusion-sql/tests/test_select.rs @@ -1367,6 +1367,81 @@ mod test_date_to_utc_timestamp { fn test_marker() {} // Help IDE detect test module } +#[cfg(test)] +mod test_timestamp_to_utc_timestamp { + use crate::*; + use arrow::array::{ArrayRef, Int32Array, TimestampMillisecondArray}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; + use arrow::record_batch::RecordBatch; + use datafusion_expr::{expr, lit, Expr}; + use std::sync::Arc; + use vegafusion_common::column::flat_col; + use vegafusion_datafusion_udfs::udfs::datetime::to_utc_timestamp::TO_UTC_TIMESTAMP_UDF; + + #[apply(dialect_names)] + async fn test(dialect_name: &str) { + println!("{dialect_name}"); + let (conn, evaluable) = TOKIO_RUNTIME.block_on(make_connection(dialect_name)); + + let schema_ref: SchemaRef = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Timestamp(TimeUnit::Millisecond, None), true), + ])); + let columns = vec![ + Arc::new(Int32Array::from(vec![0, 1, 2])) as ArrayRef, + Arc::new(TimestampMillisecondArray::from(vec![ + 1641058496123i64, // 2022-01-01 05:34:56.123 + 1641108601321i64, // 2022-01-02 07:30:01.321 + 1641192141999i64, // 2022-01-03 06:42:21.999 + ])) as ArrayRef, + ]; + + let batch = RecordBatch::try_new(schema_ref.clone(), columns).unwrap(); + let table = VegaFusionTable::try_new(schema_ref, vec![batch]).unwrap(); + let df_result = SqlDataFrame::from_values(&table, conn, Default::default()); + + let df_result = if let Ok(df) = df_result { + df.select(vec![ + flat_col("a"), + flat_col("b"), + Expr::ScalarUDF(expr::ScalarUDF { + fun: Arc::new(TO_UTC_TIMESTAMP_UDF.clone()), + args: vec![flat_col("b"), lit("America/New_York")], + }) + .alias("b_utc"), + ]) + .await + } else { + df_result + }; + + let df_result = if let Ok(df) = df_result { + df.sort( + vec![Expr::Sort(expr::Sort { + expr: Box::new(flat_col("a")), + asc: true, + nulls_first: true, + })], + None, + ) + .await + } else { + df_result + }; + + check_dataframe_query( + df_result, + "select", + "to_utc_timestamp", + dialect_name, + evaluable, + ); + } + + #[test] + fn test_marker() {} // Help IDE detect test module +} + #[cfg(test)] mod test_string_ops { use crate::*;