Skip to content

Commit

Permalink
Merge pull request #2222 from taozhi8833998/feat-filter-func-athena
Browse files Browse the repository at this point in the history
feat: support filter function in athena
  • Loading branch information
taozhi8833998 authored Nov 20, 2024
2 parents d886e5e + 09741bd commit 125b7c9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
17 changes: 16 additions & 1 deletion pegjs/athena.pegjs
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,21 @@ count_arg
star_expr
= "*" { return { type: 'star', value: '*' }; }

arrow_func
= v:ident_without_kw_type __ '->' __ e:expr {
return createBinaryExpr('->', v, e)
}

filter_func
= 'filter'i __ LPAREN __ ar:expr __ COMMA __ af:arrow_func __ RPAREN {
return {
type: 'function',
name: { name: [{ type: 'origin', value: 'filter' }] },
args: { type: 'expr_list', value: [ar, af] },
...getLocationObject(),
};
}

func_call
= name:scalar_func __ LPAREN __ l:expr_list? __ RPAREN __ bc:over_partition? {
return {
Expand All @@ -2049,7 +2064,7 @@ func_call
...getLocationObject(),
};
}
/ extract_func
/ extract_func / filter_func
/ f:scalar_time_func __ up:on_update_current_timestamp? {
return {
type: 'function',
Expand Down
12 changes: 12 additions & 0 deletions test/athena.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,16 @@ describe('athena', () => {
AND lower(u.username) NOT LIKE '%admin%'`
expect(getParsedSql(sql)).to.be.equal("WITH `user_logins` AS (SELECT `user_id`, `event`, `dttm`, `dashboard_id`, `slice_id` FROM (SELECT `l`.`user_id`, 'login' AS `event`, `l`.`dttm`, CAST(NULL AS BIGINT) AS `dashboard_id`, CAST(NULL AS BIGINT) AS `slice_id`, LAG(`l`.`dttm`) OVER (PARTITION BY `l`.`user_id` ORDER BY `l`.`dttm` ASC) AS `previous_dttm` FROM `bronze_prod`.`superset_logs` AS `l` WHERE `l`.`action` = 'welcome') WHERE `previous_dttm` IS NULL OR `dttm` > `previous_dttm` + INTERVAL '1' HOUR ORDER BY `user_id` ASC, `dttm` ASC), `user_events` AS (SELECT `l`.`user_id`, json_extract_scalar(`l`.`json`, '$.event_name') AS `event`, `l`.`dttm`, NULLIF(COALESCE(CAST(json_extract_scalar(`l`.`json`, '$.source_id') AS BIGINT), `l`.`dashboard_id`), 0) AS `dashboard_id`, NULLIF(COALESCE(CAST(json_extract_scalar(`l`.`json`, '$.slice_id') AS BIGINT), CAST(json_extract_scalar(`l`.`json`, '$.chartId') AS BIGINT), `l`.`slice_id`), 0) AS `slice_id` FROM `bronze_prod`.`superset_logs` AS `l` WHERE json_extract_scalar(\"json\", '$.event_name') IN ('spa_navigation', 'mount_dashboard', 'export_csv_dashboard_chart', 'chart_download_as_image', 'export_xlsx_dashboard_chart', 'change_dashboard_filter')), `export_dashboard_logs` AS (SELECT `user_id`, `event`, `dttm`, CAST(json_extract_scalar(`json_array_element`, '$.value') AS BIGINT) AS `dashboard_id`, CAST(NULL AS BIGINT) AS `slice_id` FROM (SELECT `user_id`, `event`, `dttm`, `json_array_element` FROM (SELECT `l`.`user_id`, 'export_dashboard' AS `event`, `l`.`dttm`, json_extract(`l`.`json`, '$.rison.filters') AS `filters_array` FROM `bronze_prod`.`superset_logs` AS `l` WHERE `action` = 'ReportScheduleRestApi.get_list') CROSS JOIN UNNEST(CAST(`filters_array` AS ARRAY<JSON>)) AS t(`json_array_element`) WHERE json_extract_scalar(`json_array_element`, '$.col') = 'dashboard_id')), `relevant_logs` AS (SELECT *, ROW_NUMBER() OVER (PARTITION BY `user_id`, `dttm` ORDER BY `dashboard_id` ASC) AS `RN` FROM (SELECT `user_id`, `dttm`, `event`, MAX(`dashboard_id`) AS `dashboard_id`, MAX(`slice_id`) AS `slice_id` FROM (SELECT * FROM `user_logins` UNION ALL SELECT * FROM `user_events` UNION ALL SELECT * FROM `export_dashboard_logs`) GROUP BY `user_id`, `dttm`, `event`)), `organizational_domains` AS (SELECT lower(split_part(split_part(`therapist_mail`, '@', 2), '.', 1)) AS `organization_domain`, MAX(`therapist_organization_name`) AS `organization` FROM `silver_prod`.`eleos_full_therapist_info` GROUP BY 1) SELECT `l`.`user_id`, `l`.`dttm`, `l`.`event`, `l`.`dashboard_id`, `l`.`slice_id`, `u`.`last_name`, `u`.`email`, `o`.`organization`, `d`.`dashboard_title`, `s`.`slice_name`, 'Client Facing' AS `superset_instance` FROM `relevant_logs` AS `l` INNER JOIN `bronze_prod`.`superset_ab_user` AS `u` ON `l`.`user_id` = `u`.`id` LEFT JOIN `bronze_prod`.`superset_dashboards` AS `d` ON `l`.`dashboard_id` = `d`.`id` LEFT JOIN `bronze_prod`.`superset_slices` AS `s` ON `l`.`slice_id` = `s`.`id` LEFT JOIN `organizational_domains` AS `o` ON lower(split_part(split_part(`u`.`email`, '@', 2), '.', 1)) = `o`.`organization_domain` WHERE `RN` = 1 AND lower(`u`.`email`) NOT LIKE '%eleos%' AND lower(`u`.`email`) NOT LIKE '%test%' AND lower(`u`.`username`) NOT LIKE '%eleos%' AND lower(`u`.`username`) NOT LIKE '%test%' AND lower(`u`.`username`) NOT LIKE '%admin%'")
})
it('should support filter function', () => {
const sql = `SELECT
id,
CAST(CURRENT_TIMESTAMP AS TIMESTAMP(6)) AS dbt_insert_time
FROM
some_table
WHERE
cardinality(
filter(map_values(note), VALUE -> VALUE IS NOT NULL)
) = 0;`
expect(getParsedSql(sql)).to.be.equal('SELECT `id`, CAST(CURRENT_TIMESTAMP AS TIMESTAMP(6)) AS `dbt_insert_time` FROM `some_table` WHERE cardinality(FILTER(map_values(`note`), VALUE -> `VALUE` IS NOT NULL)) = 0')
})
})

0 comments on commit 125b7c9

Please sign in to comment.