-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/performance enhancement #41
Changes from 24 commits
29c24b7
3497935
a9119ce
0de2a07
1129752
198081e
46fd5c4
18ac897
c6cd6a8
0fd82bf
334010b
f049d87
bcc310d
19901af
d1ae3f1
ba53a3e
00e2ccc
5f374f7
b17a9bd
702694b
dcb14e7
6345dc1
2cb15a3
823e2d0
1b66f95
4b760c0
6f4c906
59e2434
2fac3fd
6574992
5e91f92
7b54bdb
9bfcffa
ee45a9a
ea07fae
befd1be
c8fe97d
6e4fbc5
ed92bba
f9ae48d
d818a3a
4f245f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
database_key: mixpanel_database | ||
schema_key: mixpanel_schema | ||
|
||
dbt_versions: ">=1.3.0 <2.0.0" | ||
|
||
destination_configurations: | ||
databricks: | ||
dispatch: | ||
- macro_namespace: dbt_utils | ||
search_order: [ 'spark_utils', 'dbt_utils' ] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,17 @@ | ||
# dbt_mixpanel v0.9.0 | ||
[PR #41](https://github.com/fivetran/dbt_mixpanel/pull/41) includes the following updates: | ||
|
||
## 🚨 Breaking Changes 🚨 | ||
>Note: This update was made breaking since it will alter the materialization of existing models. While these changes do not necessitate a `--full-refresh`, it may be beneficial if you run into issues with this update. | ||
- Updated models with the following performance improvements: | ||
- Update the incremental strategy for all models to `insert_overwrite` for BigQuery and Databricks and `delete+insert` for all other warehouses. | ||
- Removed `stg_mixpanel__event_tmp` in favor of `stg_mixpanel__event_tmp`, which is now an incremental model. While this will increase storage, this change was made to improve compute. | ||
|
||
## Feature Updates | ||
- Added `cluster_by` columns to the configs for incremental models. This will benefit Snowflake and BigQuery users. | ||
- Added column `dbt_run_date` to incremental models to improve accuracy and optimize downstream models. This date captures the date a record was added or updated by this package. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you! it's always helpful to have these timestamp columns. |
||
- Added a 7-day look-back to incremental models to accommodate late arriving events. | ||
|
||
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# dbt_mixpanel v0.8.0 | ||
>Note: If you run into issues with this update, we suggest to try a **full refresh**. | ||
## 🎉 Feature Updates 🎉 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,11 +56,12 @@ dispatch: | |
``` | ||
|
||
### Database Incremental Strategies | ||
Some end models in this package are materialized incrementally. We currently use the `merge` strategy as the default strategy for BigQuery, Snowflake, and Databricks databases. For Redshift and Postgres databases, we use `delete+insert` as the default strategy. | ||
Some of the end models in this package are materialized incrementally. We have chosen `insert_overwrite` as the default strategy for **BigQuery** and **Databricks** databases, as it is only available for these dbt adapters. For **Snowflake**, **Redshift**, and **Postgres** databases, we have chosen `delete+insert` as the default strategy. | ||
|
||
We recognize there are some limitations with these strategies, particularly around updated records in the past which cause duplicates, and are assessing using a different strategy in the future. | ||
`insert_overwrite` is our preferred incremental strategy because it will be able to properly handle updates to records that exist outside the immediate incremental window. That is, because it leverages partitions, `insert_overwrite` will appropriately update existing rows that have been changed upstream instead of inserting duplicates of them--all without requiring a full table scan. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you define There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the doc comments. I am going to rework this tomorrow! |
||
|
||
> For either of these strategies, we highly recommend that users periodically run a `--full-refresh` to ensure a high level of data quality. | ||
`delete+insert` is our second-choice as it resembles `insert_overwrite` but lacks partitions. This strategy works most of the time and appropriately handles incremental loads that do not contain changes to past records. However, if a past record has been updated and is outside of the incremental window, `delete+insert` will insert a duplicate record. 😱 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it might be worth qualifying https://docs.getdbt.com/docs/build/incremental-models#about-incremental_strategy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @jasongroob and this can be a bit misleading. @fivetran-catfritz can you reword this to be more direct as we discussed earlier. |
||
> Because of this, we highly recommend that **Snowflake**, **Redshift**, and **Postgres** users periodically run a `--full-refresh` to ensure a high level of data quality and remove any possible duplicates. | ||
|
||
## Step 2: Install the package | ||
Include the following mixpanel package version in your `packages.yml` file: | ||
|
@@ -69,7 +70,7 @@ Include the following mixpanel package version in your `packages.yml` file: | |
```yaml | ||
packages: | ||
- package: fivetran/mixpanel | ||
version: [">=0.8.0", "<0.9.0"] # we recommend using ranges to capture non-breaking changes automatically | ||
version: [">=0.9.0", "<0.10.0"] # we recommend using ranges to capture non-breaking changes automatically | ||
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
|
||
## Step 3: Define database and schema variables | ||
|
@@ -82,7 +83,6 @@ vars: | |
``` | ||
|
||
## (Optional) Step 4: Additional configurations | ||
<details><summary>Expand for configurations</summary> | ||
|
||
## Macros | ||
### analyze_funnel [(source)](https://github.com/fivetran/dbt_mixpanel/blob/master/macros/analyze_funnel.sql) | ||
|
@@ -98,7 +98,7 @@ The macro takes the following as arguments: | |
- `event_funnel`: List of event types (not case sensitive). | ||
- Example: `'['play_song', 'stop_song', 'exit']` | ||
- `group_by_column`: (Optional) A column by which you want to segment the funnel (this macro pulls data from the `mixpanel__event` model). The default value is `None`. | ||
- Examaple: `group_by_column = 'country_code'`. | ||
- Example: `group_by_column = 'country_code'`. | ||
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- `conversion_criteria`: (Optional) A `WHERE` clause that will be applied when selecting from `mixpanel__event`. | ||
- Example: To limit all events in the funnel to the United States, you'd provide `conversion_criteria = 'country_code = "US"'`. To limit the events to only song play events to the US, you'd input `conversion_criteria = 'country_code = "US"' OR event_type != 'play_song'`. | ||
|
||
|
@@ -224,7 +224,7 @@ models: | |
### Change the source table references | ||
If an individual source table has a different name than the package expects, add the table name as it appears in your destination to the respective variable: | ||
|
||
> IMPORTANT: See this project's [`dbt_project.yml`](https://github.com/fivetran/dbt_mixpanel_source/blob/main/dbt_project.yml) variable declarations to see the expected names. | ||
> IMPORTANT: See this project's [`dbt_project.yml`](https://github.com/fivetran/dbt_mixpanel/blob/main/dbt_project.yml) variable declarations to see the expected names. | ||
|
||
```yml | ||
vars: | ||
|
@@ -241,8 +241,6 @@ Events are considered duplicates and consolidated by the package if they contain | |
|
||
This is performed in line with Mixpanel's internal de-duplication process, in which events are de-duped at the end of each day. This means that if an event was triggered during an offline session at 11:59 PM and _resent_ when the user came online at 12:01 AM, these records would _not_ be de-duplicated. This is the case in both Mixpanel and the Mixpanel dbt package. | ||
|
||
</details> | ||
|
||
## (Optional) Step 5: Orchestrate your models with Fivetran Transformations for dbt Core™ | ||
<details><summary>Expand for details</summary> | ||
<br> | ||
|
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{% macro date_today(col_name) %} | ||
|
||
cast( {{ dbt.date_trunc('day', dbt.current_timestamp_backcompat()) }} as date) as {{ col_name }} | ||
{# cast( '2024-02-06' as date) as {{ col_name }} -- for testing #} | ||
|
||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{% macro lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. having '2010-01-01' be a project variable could help if someone wants to have their own custom start date. 14 years is a LOT of data to include by default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize calling this value |
||
|
||
coalesce( | ||
(select {{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp=from_date) }} | ||
from {{ this }}), | ||
{{ "'" ~ default_start_date ~ "'" }} | ||
) | ||
|
||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,15 @@ | |
config( | ||
materialized='incremental', | ||
unique_key='unique_key', | ||
partition_by={'field': 'date_day', 'data_type': 'date'} if target.type not in ('spark','databricks') else ['date_day'], | ||
incremental_strategy = 'merge' if target.type not in ('postgres', 'redshift') else 'delete+insert', | ||
file_format = 'delta' | ||
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert', | ||
partition_by={ | ||
"field": "date_day", | ||
"data_type": "date" | ||
} if target.type not in ('spark','databricks') | ||
else ['date_day'], | ||
cluster_by=['date_day', 'event_type'], | ||
file_format='parquet', | ||
on_schema_change='append_new_columns' | ||
) | ||
}} | ||
|
||
|
@@ -20,13 +26,8 @@ with events as ( | |
from {{ ref('mixpanel__event') }} | ||
|
||
{% if is_incremental() %} | ||
|
||
-- we look at the most recent 28 days for this model's window functions to compute properly | ||
where date_day >= coalesce( ( select {{ dbt.dateadd(datepart='day', interval=-27, from_date_or_timestamp="max(date_day)") }} | ||
from {{ this }} ), '2010-01-01') | ||
|
||
where date_day >= {{ mixpanel.lookback(from_date="max(date_day)", interval=27) }} | ||
{% endif %} | ||
|
||
), | ||
|
||
|
||
|
@@ -36,13 +37,8 @@ date_spine as ( | |
from {{ ref('stg_mixpanel__user_event_date_spine') }} | ||
|
||
{% if is_incremental() %} | ||
|
||
-- look backward for the last 28 days | ||
where date_day >= coalesce((select {{ dbt.dateadd(datepart='day', interval=-27, from_date_or_timestamp="max(date_day)") }} | ||
from {{ this }} ), '2010-01-01') | ||
|
||
where date_day >= {{ mixpanel.lookback(from_date="max(date_day)", interval=27) }} | ||
{% endif %} | ||
|
||
), | ||
|
||
agg_user_events as ( | ||
|
@@ -55,7 +51,6 @@ agg_user_events as ( | |
|
||
from events | ||
group by 1,2,3 | ||
|
||
), | ||
|
||
-- join the spine with event metrics | ||
|
@@ -74,7 +69,6 @@ spine_joined as ( | |
on agg_user_events.date_day = date_spine.date_day | ||
and agg_user_events.people_id = date_spine.people_id | ||
and agg_user_events.event_type = date_spine.event_type | ||
|
||
), | ||
|
||
trailing_events as ( | ||
|
@@ -89,7 +83,6 @@ trailing_events as ( | |
and number_of_events > 0 as is_repeat_user | ||
|
||
from spine_joined | ||
|
||
), | ||
|
||
agg_event_days as ( | ||
|
@@ -109,7 +102,6 @@ agg_event_days as ( | |
|
||
from trailing_events | ||
group by 1,2 | ||
|
||
), | ||
|
||
final as ( | ||
|
@@ -127,18 +119,14 @@ final as ( | |
number_of_users - number_of_new_users - number_of_repeat_users as number_of_return_users, | ||
trailing_users_28d, | ||
trailing_users_7d, | ||
event_type || '-' || date_day as unique_key | ||
{{ dbt_utils.generate_surrogate_key(['event_type', 'date_day']) }} as unique_key, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to generate a surrogate key here? The surrogate key will create a hash whereas the previous record was a concatenation of the two records. We are losing some decipherable information if we leverage the surrogate key, although I am not sure if this change was made to work better with the incremental updates. If we do end up changing this field we will need to update the docs and also call this out as part of a breaking change as this will drastically change the previous results. What are your thoughts?
fivetran-catfritz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{{ mixpanel.date_today('dbt_run_date') }} | ||
|
||
from agg_event_days | ||
|
||
{% if is_incremental() %} | ||
|
||
-- only return the most recent day of data | ||
where date_day >= coalesce( (select max(date_day) from {{ this }} ), '2010-01-01') | ||
|
||
where date_day >= {{ mixpanel.lookback(from_date="max(dbt_run_date)") }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. won't this potentially exclude some late arriving data that occurred prior to the max(dbt_run_date)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, thank you for catching. I ended up scrapping using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. awesome! nice improvement. |
||
{% endif %} | ||
|
||
order by date_day desc, event_type | ||
) | ||
|
||
select * | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,29 +2,31 @@ | |
config( | ||
materialized='incremental', | ||
unique_key='unique_event_id', | ||
partition_by={'field': 'date_day', 'data_type': 'date'} if target.type not in ('spark','databricks') else ['date_day'], | ||
incremental_strategy = 'merge' if target.type not in ('postgres', 'redshift') else 'delete+insert', | ||
file_format = 'delta' | ||
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert', | ||
partition_by={ | ||
"field": "date_day", | ||
"data_type": "date" | ||
} if target.type not in ('spark','databricks') | ||
else ['date_day'], | ||
cluster_by=['date_day', 'event_type', 'people_id'], | ||
file_format='parquet', | ||
on_schema_change='append_new_columns' | ||
) | ||
}} | ||
|
||
with stg_event as ( | ||
|
||
select * | ||
|
||
from {{ ref('stg_mixpanel__event') }} | ||
|
||
where | ||
{% if is_incremental() %} | ||
|
||
-- events are only eligible for de-duping if they occurred on the same calendar day | ||
occurred_at >= coalesce((select cast( max(date_day) as {{ dbt.type_timestamp() }} ) from {{ this }} ), '2010-01-01') | ||
{% if is_incremental() %} | ||
dbt_run_date >= {{ mixpanel.lookback(from_date="max(dbt_run_date)", interval=1) }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe move the default interval period to a project variable? i assume one day will be enough but maybe there's scenarios where more is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @jasongroob that is a great idea. I have have created a variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, i think that should be fine. i don't have a good sense of how late new data can arrive. |
||
|
||
{% else %} | ||
|
||
-- limit date range on the first run / refresh | ||
occurred_at >= {{ "'" ~ var('date_range_start', '2010-01-01') ~ "'" }} | ||
|
||
{% endif %} | ||
), | ||
|
||
|
@@ -51,8 +53,8 @@ pivot_properties as ( | |
|
||
select | ||
* | ||
{% if var('event_properties_to_pivot') %}, | ||
{{ fivetran_utils.pivot_json_extract(string = 'event_properties', list_of_properties = var('event_properties_to_pivot')) }} | ||
{% if var('event_properties_to_pivot') %} | ||
, {{ fivetran_utils.pivot_json_extract(string = 'event_properties', list_of_properties = var('event_properties_to_pivot')) }} | ||
{% endif %} | ||
|
||
from dedupe | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the second table name is wrong. did you mean:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yes. 😄