Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
NiallRees committed Feb 4, 2021
0 parents commit 6cecd4d
Show file tree
Hide file tree
Showing 13 changed files with 448 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_modules/
logs/
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Tails.com's dbt Artifacts Package

This package builds `fct_dbt_model_executions` and `fct_dbt_run_results` tables from dbt artifacts loaded into a table. It is compatible with Snowflake only. The models are based off of the v1 schema introduced in dbt 0.19.0: https://docs.getdbt.com/reference/artifacts/dbt-artifacts/#notes

## Generating the source table

This package requires that the source data already exists in a table in Snowflake. How you achieve that will depend on your implementation.

The author recommends generating the source table using the following query to copy from an external stage (in a snowpipe):

```
copy into ${snowflake_table.dbt_artifacts.database}.${snowflake_table.dbt_artifacts.schema}.${snowflake_table.dbt_artifacts.name}
from (
select
$1 as data,
$1:metadata:generated_at::timestamp_tz as generated_at,
metadata$filename as path,
regexp_substr(metadata$filename, '([a-z_]+.json)$') as artifact_type
from @${snowflake_stage.dbt_artifacts.database}.${snowflake_stage.dbt_artifacts.schema}.${snowflake_stage.dbt_artifacts.name}
)
file_format = (type = 'JSON')
```

Where the external stage's prefix is a destination for all dbt artifacts.

## Usage

Add the package to your `packages.yml` following the instructions at https://docs.getdbt.com/docs/building-a-dbt-project/package-management/

Configure the required variables in your `dbt_project.yml`:

```
vars:
dbt_artifacts:
dbt_artifacts_database: your_db
dbt_artifacts_schema: your_schema
dbt_artifacts_table: your_table
models:
...
dbt_artifacts:
+schema: your_destination_schema
+materialized: table
staging:
+schema: your_destination_schema
+materialized: view # The staging tables cannot be ephemeral
```

Run `dbt deps` and then run the package specifically to test with `dbt run -m dbt_artifacts`.

The two fct_ tables are both [incremental](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models/).

## Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](http://slack.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
3 changes: 3 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: 'dbt_artifacts'
version: '0.1.0'
config-version: 2
62 changes: 62 additions & 0 deletions models/fct_dbt__model_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{{ config( materialized='incremental', unique_key='model_execution_id' ) }}

with models as (

select *
from {{ ref('stg_dbt__models') }}

),

model_executions as (

select *
from {{ ref('stg_dbt__model_executions') }}

),

model_executions_incremental as (

select *
from model_executions

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

model_executions_with_materialization as (

select
model_executions_incremental.*,
models.model_materialization,
models.model_schema,
models.name
from model_executions_incremental
left join models on (
model_executions_incremental.command_invocation_id = models.command_invocation_id
and model_executions_incremental.node_id = models.node_id
)

),

fields as (

select
model_execution_id,
command_invocation_id,
artifact_generated_at,
was_full_refresh,
node_id,
status,
execution_time,
rows_affected,
model_materialization,
model_schema,
name
from model_executions_with_materialization

)

select * from fields
43 changes: 43 additions & 0 deletions models/fct_dbt__run_results.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{{ config( materialized='incremental', unique_key='command_invocation_id' ) }}

{% set env_keys = dbt_utils.get_column_values(table=ref('stg_dbt__run_results_env_keys'), column='key') %}

with run_results as (

select *
from {{ ref('stg_dbt__run_results') }}

),

incremental_run_results as (

select *
from run_results

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
artifact_generated_at,
command_invocation_id,
dbt_version,
elapsed_time,
execution_command,
selected_models,
target,
was_full_refresh

{% for key in env_keys %}
,env:{{ key }} as env_{{ key }}
{% endfor %}
from incremental_run_results

)

select * from fields
57 changes: 57 additions & 0 deletions models/schemas.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: 2

models:

- name: fct_dbt__model_executions
description: All historic dbt model executions.
columns:
- name: model_execution_id
description: Primary key.
tests:
- unique
- not_null
- name: name
description: The name of the model.
- name: model_schema
description: The schema containing the model.
- name: was_full_refresh
description: Was this model executed with a --full-refresh flag?
- name: model_materialization
description: The configured materialization of the model.
- name: execution_time
description: How long did the model take to run?
- name: status
description: success/fail status of the model's execution.
- name: command_invocation_id
description: Foreign key to fct_dbt_run_results. The id of the command which resulted in the source artifact's generation.
- name: artifact_generated_at
description: Timestamp of when the source artifact was generated.
- name: node_id
description: Unique id for the node, in the form of model.[package_name].[model_name]
- name: rows_affected
description: The number of rows affected by the model's execution. Always 1 for non-incremental executions.

- name: fct_dbt__run_results
description: Metadata for dbt run commands.
columns:
- name: command_invocation_id
description: The id of the command which resulted in the source artifact's generation.
tests:
- unique
- not_null
- name: artifact_generated_at
description: Timestamp of when the source artifact was generated.
- name: dbt_version
description: The version of dbt used to generate the source artifact.
- name: elapsed_time
description: The total run time of the command.
- name: execution_command
description: The actual command used.
- name: selected_models
description: A list of model selectors used in the command.
- name: target
description: The configured target for the command.
- name: was_full_refresh
description: Was the run executed with a --full-refresh flag?
- name: env_*
description: Columns for the environment variables set when the command was executed.
20 changes: 20 additions & 0 deletions models/staging/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: 2

sources:
- name: dbt_artifacts
database: "{{ var('dbt_artifacts_database') }}"
schema: "{{ var('dbt_artifacts_schema') }}"
tables:
- name: artifacts
identifier: "{{ var('dbt_artifacts_table') }}"
description: |
The source table containing loaded dbt artifacts. All of the artifacts must be loaded into this table. See the README for more info.
columns:
- name: data
description: A variant type object containing all the artifact's data.
- name: generated_at
description: Timestamp for when the artifact was generated.
- name: path
description: The path of the artifact in the external stage.
- name: artifact_type
description: The type of the artifact, e.g. manifest.json
19 changes: 19 additions & 0 deletions models/staging/stg_dbt__artifacts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
with base as (

select *
from {{ source('dbt_artifacts', 'artifacts') }}

),

fields as (

select
data,
generated_at,
path,
artifact_type
from base

)

select * from fields
54 changes: 54 additions & 0 deletions models/staging/stg_dbt__model_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
with base as (

select *
from {{ ref('stg_dbt__artifacts') }}

),

run_results as (

select *
from base
where artifact_type = 'run_results.json'

),

dbt_run as (

select *
from run_results
where data:args:which = 'run'

),

fields as (

select
data:metadata:invocation_id::string as command_invocation_id,
generated_at as artifact_generated_at,
coalesce(data:args:full_refresh, 'false')::boolean as was_full_refresh,
result.value:unique_id::string as node_id,
result.value:status::string as status,
result.value:execution_time::float as execution_time,
result.value:adapter_response:rows_affected::int as rows_affected
from dbt_run,
lateral flatten(input => data:results) as result

),

surrogate_key as (

select
{{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as model_execution_id,
command_invocation_id,
artifact_generated_at,
was_full_refresh,
node_id,
status,
execution_time,
rows_affected
from fields

)

select * from surrogate_key
51 changes: 51 additions & 0 deletions models/staging/stg_dbt__models.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
with base as (

select *
from {{ ref('stg_dbt__artifacts') }}

),

manifests as (

select *
from base
where artifact_type = 'manifest.json'

),

flatten as (

select
data:metadata:invocation_id::string as command_invocation_id,
generated_at as artifact_generated_at,
node.key as node_id,
node.value:name::string as name,
node.value:schema::string as model_schema,
node.value:package_name::string as package_name,
node.value:path::string as model_path,
node.value:checksum.checksum::string as checksum,
node.value:config.materialized::string as model_materialization
from manifests,
lateral flatten(input => data:nodes) as node
where node.value:resource_type = 'model'

),

surrogate_key as (

select
{{ dbt_utils.surrogate_key(['command_invocation_id', 'checksum']) }} as manifest_model_id,
command_invocation_id,
artifact_generated_at,
node_id,
name,
model_schema,
package_name,
model_path,
checksum,
model_materialization
from flatten

)

select * from surrogate_key
Loading

0 comments on commit 6cecd4d

Please sign in to comment.