Skip to content

Commit

Permalink
Feat/cities v1 (#940)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Raphaël Courivaud <[email protected]>

* add ff and lovac in 2020

Signed-off-by: Raphaël Courivaud <[email protected]>

* add cities data and aggregations

Signed-off-by: Raphaël Courivaud <[email protected]>

* add macros and common cities

Signed-off-by: Raphaël Courivaud <[email protected]>

* remove notion key

Signed-off-by: Raphaël Courivaud <[email protected]>

---------

Signed-off-by: Raphaël Courivaud <[email protected]>
  • Loading branch information
rcourivaud authored Oct 7, 2024
1 parent ef8a64c commit ec97904
Show file tree
Hide file tree
Showing 156 changed files with 79,805 additions and 469 deletions.
32 changes: 32 additions & 0 deletions analytics/dagster/.dlt/.sources
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
engine_version: 1
sources:
notion:
is_dirty: false
last_commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
last_commit_timestamp: '2024-08-27T15:40:56+02:00'
files:
notion/__init__.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 6a6771a4106829c1077a86c78830fd80737857f6
sha3_256: 271c4eafc21dd74be2b89d0c48cf7499f182569ff8f690ea77b870adf901127c
notion/README.md:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 6739772d216548cd6ddb38fabb9f570138211dc3
sha3_256: 9f4438b8f98ff4b46f064cdacb241f08fabc594abf4404cba85b2e9b51f198ef
notion/settings.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: fe4ecbb1ee2ba1907246f141db7af0b214ebfb5f
sha3_256: e96550d8ae5cf81df9b2d1465fa66832ac4e00f357df2b419b32de0dfbfb96bf
notion/helpers/database.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 29212850ca3cde0450c8bdcc247b08671d2f721e
sha3_256: 78d592328f054bf52d6c059365eaee08fcef0696b2186e1f76cc43642d07e96d
notion/helpers/client.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 72a9fbbb60ef4432c5f047834ae36080e325bb81
sha3_256: df5803242f35735fa8e4dd0bd7ca625ec55cb8066420736410da17c09f0a0997
notion/helpers/__init__.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
sha3_256: a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a
dlt_version_constraint: '>=0.3.5'
2 changes: 2 additions & 0 deletions analytics/dagster/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[runtime]
dlthub_telemetry = true
10 changes: 10 additions & 0 deletions analytics/dagster/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ignore secrets, virtual environments and typical python compilation artifacts
secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
32 changes: 18 additions & 14 deletions analytics/dagster/dagster.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
storage:
postgres:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port:
env: DAGSTER_PG_PORT
# storage:
# postgres:
# postgres_db:
# username:
# env: DAGSTER_PG_USERNAME
# password:
# env: DAGSTER_PG_PASSWORD
# hostname:
# env: DAGSTER_PG_HOST
# db_name:
# env: DAGSTER_PG_DB
# port:
# env: DAGSTER_PG_PORT

telemetry:
enabled: false
enabled: false

python_logs:
managed_python_loggers:
- my_logger
1 change: 0 additions & 1 deletion analytics/dagster/data/most_frequent_words.json

This file was deleted.

101 changes: 0 additions & 101 deletions analytics/dagster/data/topstories.csv

This file was deleted.

1 change: 0 additions & 1 deletion analytics/dagster/data/topstory_ids.json

This file was deleted.

2 changes: 2 additions & 0 deletions analytics/dagster/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dagster<w
dagster-dbt
dagster-embedded-elt
dbt-duckdb
duckdb
matplotlib
Expand Down
28 changes: 28 additions & 0 deletions analytics/dagster/src/.dlt/.sources
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
engine_version: 1
sources:
sql_database:
is_dirty: false
last_commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
last_commit_timestamp: '2024-08-27T15:40:56+02:00'
files:
sql_database/arrow_helpers.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 25d6eb7268cb048af1f96fcc803405b8e05b45a7
sha3_256: d53530001111b759a5b9602dfd89e09532a2f0101673e834097508a3cd59786a
sql_database/__init__.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 729fd387122400d0961a5a5629a9ae0578bed8b4
sha3_256: 11765bb12b6f4a45236093c7891dcf0d89f427cc64e0dc7ff030a888c6484252
sql_database/README.md:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: dfa4b5e161f059b8399db203801ed0d638b7734c
sha3_256: a21100e31c5f4d514b38ba6229735d3c3a9a7f364ef54d2e9d6068c75c8d8e2b
sql_database/schema_types.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 4c281d222daa902fef91241ee13ad6579f6161cf
sha3_256: d30b68acd8e6a3f9cc052dc2d994562b0f5eb4934fc67e6f6726bb6f643b74d5
sql_database/helpers.py:
commit_sha: b304784c92f44582f0214b863232b7c3d28c5962
git_sha: 2c79a59a5775cf38d1f11d78107eae407ccb9ace
sha3_256: ad44ea1ce01dab0d9fa1942c0d2f6e6d8a1fffaee9771302d52970c3821ce179
dlt_version_constraint: '>=0.4.11'
5 changes: 5 additions & 0 deletions analytics/dagster/src/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[sources.sql_database]
table = "table" # please set me up!

[runtime]
dlthub_telemetry = true
10 changes: 10 additions & 0 deletions analytics/dagster/src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ignore secrets, virtual environments and typical python compilation artifacts
secrets.toml
# ignore basic python artifacts
.env
**/__pycache__/
**/*.py[cod]
**/*$py.class
# ignore duckdb
*.duckdb
*.wal
2 changes: 1 addition & 1 deletion analytics/dagster/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .definitions import defs as defs
from .definitions import defs as defs
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .production_dlt import dagster_production_assets

__all__ = ["dagster_production_assets"]
98 changes: 0 additions & 98 deletions analytics/dagster/src/assets/hackernews.py

This file was deleted.

19 changes: 19 additions & 0 deletions analytics/dagster/src/assets/notion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster import AssetExecutionContext
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dlt import pipeline

from ..dlt_sources.notion import notion_databases

@dlt_assets(
dlt_source=notion_databases(database_ids=[{"id": "a57fc47a6e3b4ebd835cf0d7a5460e29"}]),
dlt_pipeline=pipeline(
pipeline_name="notion",
dataset_name="notion_dataset",
destination="duckdb",
progress="log",
),
name="notion_asset",
group_name="notion",
)
def dagster_notion_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
23 changes: 23 additions & 0 deletions analytics/dagster/src/assets/production_dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator
from ..project import dbt_project


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props):
type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if type == "source":
return AssetKey(f"raw_{name}")
else:
return super().get_asset_key(dbt_resource_props)




@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_production(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
30 changes: 30 additions & 0 deletions analytics/dagster/src/assets/production_dlt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Iterable
from dagster import AssetExecutionContext, AssetKey
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets, DagsterDltTranslator
from dlt import pipeline
from dlt.extract.resource import DltResource

from ..dlt_sources.sources import get_production_source

class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DagsterDltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"raw_{resource.name}")

def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
return AssetKey(f"raw_{resource.name}")

@dlt_assets(
dlt_source=get_production_source(),
dlt_pipeline=pipeline(
pipeline_name="production",
dataset_name="production_dataset",
destination="duckdb",
progress="log",
),
name="production_asset",
group_name="production",
dagster_dlt_translator=CustomDagsterDltTranslator(),
)
def dagster_production_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
57 changes: 52 additions & 5 deletions analytics/dagster/src/definitions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,63 @@
from pathlib import Path
from dagster import (
AssetSelection,
Definitions,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
load_assets_from_modules,
)

from . import assets
from .assets import dagster_production_assets
from dagster_embedded_elt.dlt import DagsterDltResource
from dagster_dbt import DbtCliResource

import warnings
import dagster

from .project import dbt_project
from .assets import production_dbt
from .assets.notion import dagster_notion_assets

warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning)

dbt_analytics_assets = load_assets_from_modules(modules=[production_dbt]) # Load the assets from the file

# Initialize DLT resource
dlt_resource = DagsterDltResource(
name="dlt",
group_name="production",
)

dbt_resource = DbtCliResource(
project_dir=dbt_project,
)

# Define job for running all assets
daily_refresh_job = define_asset_job(
name="production_job",
#selection=[""]
)

# Schedule the job to run daily at midnight
daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
job=daily_refresh_job,
cron_schedule="0 0 * * *"
)

# Load definitions with assets, resources, and schedule
defs = Definitions(
assets=load_assets_from_package_module(assets), schedules=[daily_refresh_schedule]
)
assets=[
dagster_production_assets,
dagster_notion_assets,
# dagster_notion_assets,
*dbt_analytics_assets
],
resources={
"dlt": dlt_resource,
"dbt": dbt_resource

},
schedules=[
daily_refresh_schedule,
],
)
Loading

0 comments on commit ec97904

Please sign in to comment.