Skip to content

Commit

Permalink
bugfix/hubspot-duplicates (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
fivetran-joemarkiewicz authored Oct 22, 2024
1 parent fd581a0 commit 956b0d6
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Include the following package_display_name package version in your `packages.yml
```yml
packages:
- package: fivetran/unified_rag
version: [">=0.1.0", "<0.2.0"] # we recommend using ranges to capture non-breaking changes automatically
version: 0.1.0-a1
```
### Step 3: Define database and schema variables
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ engagement_emails as (
engagement_email.email_to_email,
engagement_email.email_cc_email,
engagement_email.email_from_email as commenter_email,
contacts.contact_name as commenter_name
{{ fivetran_utils.string_agg(field_to_agg="contacts.contact_name", delimiter="','") }} as commenter_name
from {{ ref('stg_rag_hubspot__engagement_email') }} engagement_email
left join engagement_contacts
on engagement_email.engagement_id = engagement_contacts.engagement_id
and engagement_email.source_relation = engagement_contacts.source_relation
left join contacts
on engagement_contacts.contact_id = contacts.contact_id
and engagement_contacts.source_relation = contacts.source_relation
{{ dbt_utils.group_by(12)}}
),

engagement_notes as (
Expand Down Expand Up @@ -177,4 +178,5 @@ truncated_comments as (
)

select *
from truncated_comments
from truncated_comments
where comment_markdown is not null
17 changes: 16 additions & 1 deletion models/intermediate/hubspot/int_rag_hubspot__deal_document.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ engagement_deals as (
from {{ ref('stg_rag_hubspot__engagement_deal') }}
),

engagement_details as (
engagement_detail_prep as (

select
deals.deal_id,
Expand Down Expand Up @@ -84,6 +84,21 @@ engagement_details as (
and engagement_deals.source_relation = engagement_notes.source_relation
),

engagement_details as (
select
deal_id,
deal_name,
url_reference,
created_on,
source_relation,
{{ fivetran_utils.string_agg(field_to_agg="distinct engagement_type", delimiter="', '") }} as engagement_type,
{{ fivetran_utils.string_agg(field_to_agg="distinct contact_name", delimiter="', '") }} as contact_name,
{{ fivetran_utils.string_agg(field_to_agg="distinct created_by", delimiter="', '") }} as created_by,
{{ fivetran_utils.string_agg(field_to_agg="distinct company_name", delimiter="', '") }} as company_name
from engagement_detail_prep
group by 1,2,3,4,5
),

engagement_markdown as (

select
Expand Down
7 changes: 4 additions & 3 deletions models/rag__unified_document.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{{
config(
materialized='table' if unified_rag.is_databricks_sql_warehouse() else 'incremental',
partition_by = {'field': 'most_recent_chunk_update', 'data_type': 'date', 'granularity': 'month'}
if target.type not in ['spark', 'databricks'] else ['most_recent_chunk_update'],
partition_by = {'field': 'update_date', 'data_type': 'date'}
if target.type not in ['spark', 'databricks'] else ['update_date'],
cluster_by = ['unique_id'],
unique_key='unique_id',
incremental_strategy = 'insert_overwrite' if target.type in ('bigquery', 'databricks', 'spark') else 'delete+insert',
Expand All @@ -26,14 +26,15 @@
" platform, \n" ~
" source_relation, \n" ~
" most_recent_chunk_update, \n" ~
" cast(most_recent_chunk_update as date) as update_date, \n" ~
" chunk_index, \n" ~
" chunk_tokens_approximate, \n" ~
" chunk \n" ~
"from " ~ ref('rag_' ~ platform_name ~ '__document')) %}

{% if is_incremental() %}
{% set select_statement = select_statement ~
"\n where most_recent_chunk_update >= (select max(most_recent_chunk_update) from " ~ this ~ ")" %}
"\n where cast(most_recent_chunk_update as date) >= (select max(update_date) from " ~ this ~ ")" %}
{% endif %}

{% do queries.append(select_statement) -%}
Expand Down
5 changes: 5 additions & 0 deletions models/unified_rag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ models:
columns:
- name: unique_id
description: Unique identifier of the table represented as a combination of document_id, platform, and source_relation fields.
tests:
- unique
- not_null
- name: document_id
description: Identifier of the base object which the unstructured data is associated (ie. Zendesk ticket_id, Jira issue_id, and HubSpot deal_id).
- name: url_reference
Expand All @@ -14,6 +17,8 @@ models:
description: Record identifying the respective upstream connector type (ie. HubSpot, Jira, Zendesk).
- name: most_recent_chunk_update
description: Timestamp indicating the most recent update to the overall chunk.
- name: update_date
description: Truncated date of the most_recent_chunk_update field used for incremental and partition logic.
- name: chunk_index
description: The index of the chunk associated with the `document_id`.
- name: chunk_tokens_approximate
Expand Down

0 comments on commit 956b0d6

Please sign in to comment.