diff --git a/.astro/config.yaml b/.astro/config.yaml new file mode 100644 index 0000000..d1edb2d --- /dev/null +++ b/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: tobiko-cloud-demo diff --git a/.astro/dag_integrity_exceptions.txt b/.astro/dag_integrity_exceptions.txt new file mode 100644 index 0000000..c9a2a63 --- /dev/null +++ b/.astro/dag_integrity_exceptions.txt @@ -0,0 +1 @@ +# Add dag files to exempt from parse test below. ex: dags/ \ No newline at end of file diff --git a/.astro/test_dag_integrity_default.py b/.astro/test_dag_integrity_default.py new file mode 100644 index 0000000..e433703 --- /dev/null +++ b/.astro/test_dag_integrity_default.py @@ -0,0 +1,141 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" + +from contextlib import contextmanager +import logging +import os + +import pytest + +from airflow.models import DagBag, Variable, Connection +from airflow.hooks.base import BaseHook +from airflow.utils.db import initdb + +# init airflow database +initdb() + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str, *args, **kwargs): + print( + f"Attempted to fetch connection during parse returning an empty Connection object for {key}" + ) + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, **kwargs): + default = None + if args: + default = args[0] # os.getenv should get at most 1 arg after the key + if kwargs: + default = kwargs.get( + "default", None + ) # and sometimes kwarg if people are using the sig + + env_value = os.environ.get(key, None) + + if env_value: + return env_value # if the env_value is set, return it + if ( + key == "JENKINS_HOME" and default is None + ): # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default # otherwise return whatever default has been passed + return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + + +class magic_dict(dict): + def __init__(self, *args, **kwargs): + self.update(*args, **kwargs) + + def __getitem__(self, key): + return {}.get(key, "MOCKED_KEY_VALUE") + + +_no_default = object() # allow falsey defaults + + +def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False): + print( + f"Attempted to get Variable value during parse, returning a mocked value for {key}" + ) + + if default_var is not _no_default: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag, and include DAGs without errors. + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # Initialize an empty list to store the tuples + result = [] + + # Iterate over the items in import_errors + for k, v in dag_bag.import_errors.items(): + result.append((strip_path_prefix(k), v.strip())) + + # Check if there are DAGs without errors + for file_path in dag_bag.dags: + # Check if the file_path is not in import_errors, meaning no errors + if file_path not in dag_bag.import_errors: + result.append((strip_path_prefix(file_path), "No import errors")) + + return result + + +@pytest.mark.parametrize( + "rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()] +) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if os.path.exists(".astro/dag_integrity_exceptions.txt"): + with open(".astro/dag_integrity_exceptions.txt", "r") as f: + exceptions = f.readlines() + print(f"Exceptions: {exceptions}") + if (rv != "No import errors") and rel_path not in exceptions: + # If rv is not "No import errors," consider it a failed test + raise Exception(f"{rel_path} failed to import with message \n {rv}") + else: + # If rv is "No import errors," consider it a passed test + print(f"{rel_path} passed the import test") diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..a334663 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +astro +.git +.env +airflow_settings.yaml +logs/ +.venv +airflow.db +airflow.cfg diff --git a/.github/workflows/tcloud_cicd.yml b/.github/workflows/tcloud_cicd.yml index 6f81530..80bee9f 100644 --- a/.github/workflows/tcloud_cicd.yml +++ b/.github/workflows/tcloud_cicd.yml @@ -7,9 +7,9 @@ on: - synchronize - opened # Required if using comments to issue commands to the bot - # issue_comment: - # types: - # - created + issue_comment: + types: + - created # Required if using required approvers to automate deployments pull_request_review: types: @@ -45,20 +45,14 @@ jobs: with: ref: refs/pull/${{ github.event.issue.pull_request && github.event.issue.number || github.event.pull_request.number }}/merge - - name: Authenticate to Google Cloud - uses: 'google-github-actions/auth@v2' - with: - credentials_json: '${{ secrets.GOOGLE_SQLMESH_CREDENTIALS }}' - - name: Install tcloud CLI + Dependencies run: | - pip install poetry - poetry install --no-root + pip install -r requirements.txt shell: bash - name: Run tcloud CI/CD Bot run: | - poetry run tcloud sqlmesh_cicd -p ${{ github.workspace }} github --token ${{ secrets.GITHUB_TOKEN }} run-all + tcloud sqlmesh_cicd -p ${{ github.workspace }} github --token ${{ secrets.GITHUB_TOKEN }} run-all env: # TODO: update your GitHub secrets to include TCLOUD_TOKEN, GOOGLE_SQLMESH_CREDENTIALS TCLOUD_TOKEN: ${{ secrets.TCLOUD_TOKEN }} GOOGLE_SQLMESH_CREDENTIALS: ${{ secrets.GOOGLE_SQLMESH_CREDENTIALS }} \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..08f9525 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,9 @@ +repos: # List of repository sources for pre-commit hooks +- repo: local # Specifies that these hooks are defined locally (not from external repos) + hooks: # List of hooks to run + - id: sqlmesh-format # Unique identifier for this hook + name: SQLMesh Format # Human-readable name for the hook + entry: tcloud sqlmesh format # The command to run when the hook is triggered + language: system # Specifies that this hook runs using system commands + types: [sql] # File types this hook should run on (only SQL files) + pass_filenames: false # Don't pass the filenames to the command \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a2efe83 --- /dev/null +++ b/Dockerfile @@ -0,0 +1 @@ +FROM quay.io/astronomer/astro-runtime:12.5.0 \ No newline at end of file diff --git a/airflow_settings.yaml b/airflow_settings.yaml new file mode 100644 index 0000000..1c16dc0 --- /dev/null +++ b/airflow_settings.yaml @@ -0,0 +1,25 @@ +# This file allows you to configure Airflow Connections, Pools, and Variables in a single place for local development only. +# NOTE: json dicts can be added to the conn_extra field as yaml key value pairs. See the example below. + +# For more information, refer to our docs: https://www.astronomer.io/docs/astro/cli/develop-project#configure-airflow_settingsyaml-local-development-only +# For questions, reach out to: https://support.astronomer.io +# For issues create an issue ticket here: https://github.com/astronomer/astro-cli/issues + +airflow: + connections: + - conn_id: + conn_type: + conn_host: + conn_schema: + conn_login: + conn_password: + conn_port: + conn_extra: + example_extra_field: example-value + pools: + - pool_name: + pool_slot: + pool_description: + variables: + - variable_name: + variable_value: diff --git a/audits/assert_positive_order_ids.sql b/audits/assert_positive_order_ids.sql index 6e5feca..99f6ae1 100644 --- a/audits/assert_positive_order_ids.sql +++ b/audits/assert_positive_order_ids.sql @@ -1,8 +1,9 @@ AUDIT ( - name assert_positive_order_ids, + name assert_positive_order_ids ); -SELECT * +SELECT + * FROM @this_model WHERE - item_id < 0 + item_id < 0 \ No newline at end of file diff --git a/config.yaml b/config.yaml index cc1946d..5db2c1a 100644 --- a/config.yaml +++ b/config.yaml @@ -9,18 +9,30 @@ gateways: register_comments: true keyfile_json: {{ env_var('GOOGLE_SQLMESH_CREDENTIALS') }} project: sqlmesh-public-demo + postgres: + connection: + type: postgres + host: {{ env_var('SQLMESH_STATE_HOST') }} + port: 5432 + user: {{ env_var('SQLMESH_STATE_USERNAME') }} + password: {{ env_var('SQLMESH_STATE_PASSWORD') }} + database: sqlmesh_state_demo default_gateway: tobiko_cloud +project: repo_a + model_defaults: dialect: bigquery start: 2024-12-01 + validate_query: true # enables synchronized deployments to prod when a PR is merged cicd_bot: type: github merge_method: squash enable_deploy_command: true + skip_pr_backfill: false auto_categorize_changes: external: full python: full diff --git a/dags/.airflowignore b/dags/.airflowignore new file mode 100644 index 0000000..e69de29 diff --git a/dags/exampledag.py b/dags/exampledag.py new file mode 100644 index 0000000..8b08b7b --- /dev/null +++ b/dags/exampledag.py @@ -0,0 +1,100 @@ +""" +## Astronaut ETL example DAG + +This DAG queries the list of astronauts currently in space from the +Open Notify API and prints each astronaut's name and flying craft. + +There are two tasks, one to get the data from the API and save the results, +and another to print the results. Both tasks are written in Python using +Airflow's TaskFlow API, which allows you to easily turn Python functions into +Airflow tasks, and automatically infer dependencies and pass data. + +The second task uses dynamic task mapping to create a copy of the task for +each Astronaut in the list retrieved from the API. This list will change +depending on how many Astronauts are in space, and the DAG will adjust +accordingly each time it runs. + +For more explanation and getting started instructions, see our Write your +first DAG tutorial: https://www.astronomer.io/docs/learn/get-started-with-airflow + +![Picture of the ISS](https://www.esa.int/var/esa/storage/images/esa_multimedia/images/2010/02/space_station_over_earth/10293696-3-eng-GB/Space_Station_over_Earth_card_full.jpg) +""" + +from airflow import Dataset +from airflow.decorators import dag, task +from pendulum import datetime +import requests + + +# Define the basic parameters of the DAG, like schedule and start_date +@dag( + start_date=datetime(2024, 1, 1), + schedule="@daily", + catchup=False, + doc_md=__doc__, + default_args={"owner": "Astro", "retries": 3}, + tags=["example"], +) +def example_astronauts(): + # Define tasks + @task( + # Define a dataset outlet for the task. This can be used to schedule downstream DAGs when this task has run. + outlets=[Dataset("current_astronauts")] + ) # Define that this task updates the `current_astronauts` Dataset + def get_astronauts(**context) -> list[dict]: + """ + This task uses the requests library to retrieve a list of Astronauts + currently in space. The results are pushed to XCom with a specific key + so they can be used in a downstream pipeline. The task returns a list + of Astronauts to be used in the next task. + """ + try: + r = requests.get("http://api.open-notify.org/astros.json") + r.raise_for_status() + number_of_people_in_space = r.json()["number"] + list_of_people_in_space = r.json()["people"] + except: + print("API currently not available, using hardcoded data instead.") + number_of_people_in_space = 12 + list_of_people_in_space = [ + {"craft": "ISS", "name": "Oleg Kononenko"}, + {"craft": "ISS", "name": "Nikolai Chub"}, + {"craft": "ISS", "name": "Tracy Caldwell Dyson"}, + {"craft": "ISS", "name": "Matthew Dominick"}, + {"craft": "ISS", "name": "Michael Barratt"}, + {"craft": "ISS", "name": "Jeanette Epps"}, + {"craft": "ISS", "name": "Alexander Grebenkin"}, + {"craft": "ISS", "name": "Butch Wilmore"}, + {"craft": "ISS", "name": "Sunita Williams"}, + {"craft": "Tiangong", "name": "Li Guangsu"}, + {"craft": "Tiangong", "name": "Li Cong"}, + {"craft": "Tiangong", "name": "Ye Guangfu"}, + ] + + context["ti"].xcom_push( + key="number_of_people_in_space", value=number_of_people_in_space + ) + return list_of_people_in_space + + @task + def print_astronaut_craft(greeting: str, person_in_space: dict) -> None: + """ + This task creates a print statement with the name of an + Astronaut in space and the craft they are flying on from + the API request results of the previous task, along with a + greeting which is hard-coded in this example. + """ + craft = person_in_space["craft"] + name = person_in_space["name"] + + print(f"{name} is currently in space flying on the {craft}! {greeting}") + + # Use dynamic task mapping to run the print_astronaut_craft task for each + # Astronaut in space + print_astronaut_craft.partial(greeting="Hello! :)").expand( + person_in_space=get_astronauts() # Define dependencies using TaskFlow API syntax + ) + + +# Instantiate the DAG +example_astronauts() diff --git a/dags/tcloud_dag.py b/dags/tcloud_dag.py new file mode 100644 index 0000000..25c30da --- /dev/null +++ b/dags/tcloud_dag.py @@ -0,0 +1,5 @@ +from tobikodata.scheduler_facades.airflow import SQLMeshEnterpriseAirflow + +tobiko_cloud = SQLMeshEnterpriseAirflow(conn_id="tobiko_cloud") + +first_task, last_task, dag = tobiko_cloud.create_cadence_dag(environment="prod") \ No newline at end of file diff --git a/dagster-quickstart/data/processed_data.csv b/dagster-quickstart/data/processed_data.csv new file mode 100644 index 0000000..3e8639c --- /dev/null +++ b/dagster-quickstart/data/processed_data.csv @@ -0,0 +1,5 @@ +id,name,age,city,age_group +1,Alice,28,New York,Young +2,Bob,35,San Francisco,Middle +3,Charlie,42,Chicago,Senior +4,Diana,31,Los Angeles,Middle diff --git a/dagster-quickstart/data/sample_data.csv b/dagster-quickstart/data/sample_data.csv new file mode 100644 index 0000000..457cc32 --- /dev/null +++ b/dagster-quickstart/data/sample_data.csv @@ -0,0 +1,5 @@ +id,name,age,city +1,Alice,28,New York +2,Bob,35,San Francisco +3,Charlie,42,Chicago +4,Diana,31,Los Angeles diff --git a/dagster-quickstart/quickstart/__init__.py b/dagster-quickstart/quickstart/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagster-quickstart/quickstart/assets.py b/dagster-quickstart/quickstart/assets.py new file mode 100644 index 0000000..ffe984a --- /dev/null +++ b/dagster-quickstart/quickstart/assets.py @@ -0,0 +1,24 @@ +import pandas as pd + +import dagster as dg + + +@dg.asset +def processed_data(): + ## Read data from the CSV + df = pd.read_csv("data/sample_data.csv") + + ## Add an age_group column based on the value of age + df["age_group"] = pd.cut( + df["age"], bins=[0, 30, 40, 100], labels=["Young", "Middle", "Senior"] + ) + + ## Save processed data + df.to_csv("data/processed_data.csv", index=False) + return "Data loaded successfully" + + +## Tell Dagster about the assets that make up the pipeline by +## passing it to the Definitions object +## This allows Dagster to manage the assets' execution and dependencies +defs = dg.Definitions(assets=[processed_data]) diff --git a/dagster-quickstart/quickstart/definitions.py b/dagster-quickstart/quickstart/definitions.py new file mode 100644 index 0000000..a399ccd --- /dev/null +++ b/dagster-quickstart/quickstart/definitions.py @@ -0,0 +1,45 @@ +from tobikodata.scheduler_facades.dagster import SQLMeshEnterpriseDagster +import dagster as dg + +# create and configure SQLMeshEnterpriseDagster instance named `sqlmesh` +sqlmesh = SQLMeshEnterpriseDagster( + url=dg.EnvVar( + "TOBIKO_CLOUD_BASE_URL" + ).get_value(), # get the base url from the environment variable + token=dg.EnvVar( + "TOBIKO_CLOUD_TOKEN" + ).get_value(), # get the token from the environment variable + dagster_graphql_host="localhost", # Example GraphQL host (could be passed in an environment variable instead) + dagster_graphql_port=3000, # Example GraphQL port (could be passed in an environment variable instead) +) + + +# define a job to run when the asset is updated +@dg.op +def log_customers_updated(context): + context.log.info("customers updated") + + +@dg.job +def internal_customers_pipeline(): + log_customers_updated() + + +@dg.asset_sensor( + asset_key=dg.AssetKey( + ["sqlmesh-public-demo", "tcloud_demo", "incremental_events_allow_partials"] + ), # Asset key found in Dagster Asset Catalog + job=internal_customers_pipeline, +) +def on_customers_updated( + context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry +): + yield dg.RunRequest() + + +# merge existing and sqlmesh definitions +defs = dg.Definitions( + jobs=[internal_customers_pipeline], sensors=[on_customers_updated] +) + +defs = dg.Definitions.merge(defs, sqlmesh.create_definitions(environment="prod")) diff --git a/dagster-quickstart/tmpm1lvbx3m/history/runs.db b/dagster-quickstart/tmpm1lvbx3m/history/runs.db new file mode 100644 index 0000000..3ec87da Binary files /dev/null and b/dagster-quickstart/tmpm1lvbx3m/history/runs.db differ diff --git a/dagster-quickstart/tmpm1lvbx3m/history/runs/951ffaa5-2143-4322-a65e-d5e97aa45043.db b/dagster-quickstart/tmpm1lvbx3m/history/runs/951ffaa5-2143-4322-a65e-d5e97aa45043.db new file mode 100644 index 0000000..752e562 Binary files /dev/null and b/dagster-quickstart/tmpm1lvbx3m/history/runs/951ffaa5-2143-4322-a65e-d5e97aa45043.db differ diff --git a/dagster-quickstart/tmpm1lvbx3m/history/runs/index.db b/dagster-quickstart/tmpm1lvbx3m/history/runs/index.db new file mode 100644 index 0000000..8d89d87 Binary files /dev/null and b/dagster-quickstart/tmpm1lvbx3m/history/runs/index.db differ diff --git a/dagster-quickstart/tmpm1lvbx3m/schedules/schedules.db b/dagster-quickstart/tmpm1lvbx3m/schedules/schedules.db new file mode 100644 index 0000000..1726f56 Binary files /dev/null and b/dagster-quickstart/tmpm1lvbx3m/schedules/schedules.db differ diff --git a/dagster-quickstart/tmpm1lvbx3m/storage/__repository__/tobiko_cloud_track_prod/1.err b/dagster-quickstart/tmpm1lvbx3m/storage/__repository__/tobiko_cloud_track_prod/1.err new file mode 100644 index 0000000..0828ac1 --- /dev/null +++ b/dagster-quickstart/tmpm1lvbx3m/storage/__repository__/tobiko_cloud_track_prod/1.err @@ -0,0 +1 @@ +{"args": [], "asctime": "2025-02-10 14:11:47 -0800", "created": 1739225507.092316, "dagster_meta": {"name": "tobiko_cloud_track_prod", "orig_message": "This is the first run; triggering initial model sync from Tobiko Cloud", "repository_name": "__repository__"}, "exc_info": null, "exc_text": null, "filename": "dagster.py", "funcName": "_sensor", "levelname": "INFO", "levelno": 20, "lineno": 407, "message": "__repository__ - tobiko_cloud_track_prod - This is the first run; triggering initial model sync from Tobiko Cloud", "module": "dagster", "msecs": 92.0, "msg": "__repository__ - tobiko_cloud_track_prod - This is the first run; triggering initial model sync from Tobiko Cloud", "name": "dagster", "pathname": "/Users/sung/Desktop/git_repos/tobiko-cloud-demo/.venv/lib/python3.12/site-packages/tobikodata/scheduler_facades/dagster.py", "process": 56249, "processName": "MainProcess", "relativeCreated": 1595.978021621704, "stack_info": null, "taskName": null, "thread": 6399225856, "threadName": "ThreadPoolExecutor-0_0"} diff --git a/models/customers.sql b/models/customers.sql index 623c47d..ef9b2bc 100644 --- a/models/customers.sql +++ b/models/customers.sql @@ -2,11 +2,14 @@ MODEL ( name tcloud_demo.customers, cron '@daily', grain customer_id, - audits (UNIQUE_VALUES(columns = ( - customer_id - )), NOT_NULL(columns = ( - customer_id - ))) + audits ( + UNIQUE_VALUES(columns = ( + customer_id + )), + NOT_NULL(columns = ( + customer_id + )) + ) ); WITH customers AS ( @@ -59,9 +62,4 @@ WITH customers AS ( ) SELECT * -FROM final - --- create a unit test from this SQL model --- sqlmesh create_test tcloud_demo.customers --query tcloud_demo.stg_customers "select * from tcloud_demo.stg_customers limit 5" \ --- --query tcloud_demo.stg_orders "select * from tcloud_demo.stg_orders limit 5" \ --- --query tcloud_demo.stg_payments "select * from tcloud_demo.stg_payments limit 5" \ No newline at end of file +FROM final \ No newline at end of file diff --git a/models/examples/dummy_model copy.sql b/models/examples/dummy_model copy.sql new file mode 100644 index 0000000..2f643a7 --- /dev/null +++ b/models/examples/dummy_model copy.sql @@ -0,0 +1,20 @@ +MODEL ( + name tcloud_demo.dummy_model_v2, + kind VIEW, + cron '@daily', + grains item_id, + audits ( + UNIQUE_VALUES( + columns = ( + item_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + item_id + )) + ) +); + +/* command to generate unit test code in a yaml file */ /* sqlmesh create_test tcloud_demo.incremental_model --query tcloud_demo.seed_model "select * from tcloud_demo.seed_model limit 5" */ +SELECT + 1 AS item_id \ No newline at end of file diff --git a/models/examples/dummy_model.sql b/models/examples/dummy_model.sql new file mode 100644 index 0000000..2865f10 --- /dev/null +++ b/models/examples/dummy_model.sql @@ -0,0 +1,20 @@ +MODEL ( + name tcloud_demo.dummy_model, + kind VIEW, + cron '@daily', + grain item_id, + audits ( + UNIQUE_VALUES( + columns = ( + item_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + item_id + )) + ) +); + +/* command to generate unit test code in a yaml file */ /* sqlmesh create_test tcloud_demo.incremental_model --query tcloud_demo.seed_model "select * from tcloud_demo.seed_model limit 5" */ +SELECT + 1 AS item_id \ No newline at end of file diff --git a/models/examples/full_model.sql b/models/examples/full_model.sql index 37562f1..a7933c4 100644 --- a/models/examples/full_model.sql +++ b/models/examples/full_model.sql @@ -8,9 +8,7 @@ MODEL ( ) ); --- command to generate unit test code in a yaml file --- sqlmesh create_test tcloud_demo.incremental_model --query tcloud_demo.seed_model "select * from tcloud_demo.seed_model limit 5" - +/* command to generate unit test code in a yaml file */ /* sqlmesh create_test tcloud_demo.incremental_model --query tcloud_demo.seed_model "select * from tcloud_demo.seed_model limit 5" */ SELECT item_id, COUNT(DISTINCT id) AS num_orders diff --git a/models/examples/incremental_model.sql b/models/examples/incremental_model.sql index da25cbe..a51e7dd 100644 --- a/models/examples/incremental_model.sql +++ b/models/examples/incremental_model.sql @@ -10,7 +10,7 @@ MODEL ( ); SELECT - id, + id::STRING AS id, item_id, event_date FROM tcloud_demo.seed_model diff --git a/models/examples/model_1.sql b/models/examples/model_1.sql new file mode 100644 index 0000000..721a2dd --- /dev/null +++ b/models/examples/model_1.sql @@ -0,0 +1,23 @@ +MODEL ( + name tcloud_demo.model_1, + kind VIEW, + cron '@daily', + grain item_id, + audits ( + UNIQUE_VALUES( + columns = ( + item_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + item_id + )) + ) +); + +/* run this command to synchronize multiple repos: tcloud sqlmesh -p . -p repo_b/ plan */ +SELECT + 1 AS item_id, /* breaking change, will backfill repo_b.model_2 but fail */ + 3 AS new_column, + 3 AS hello, /* non breaking change */ + CURRENT_DATE AS event_date \ No newline at end of file diff --git a/models/examples/model_a.sql b/models/examples/model_a.sql new file mode 100644 index 0000000..b770fc6 --- /dev/null +++ b/models/examples/model_a.sql @@ -0,0 +1,22 @@ +MODEL ( + name tcloud_demo.model_a, + kind VIEW, + cron '@daily', + grain item_id, + audits ( + UNIQUE_VALUES( + columns = ( + item_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + item_id + )) + ) +); + +/* command to generate unit test code in a yaml file */ /* sqlmesh create_test tcloud_demo.incremental_model --query tcloud_demo.seed_model "select * from tcloud_demo.seed_model limit 5" */ +SELECT + 1 AS item_id, + 3 AS new_column, + CURRENT_DATE AS event_date \ No newline at end of file diff --git a/models/examples/model_b.sql b/models/examples/model_b.sql new file mode 100644 index 0000000..a3a57fa --- /dev/null +++ b/models/examples/model_b.sql @@ -0,0 +1,20 @@ +MODEL ( + name tcloud_demo.model_b, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) +); + +SELECT + item_id, + event_date, + new_column, + 'new_column_2' AS new_column_2, + 'new_column_3' AS new_column_3, + 'new_column_4' AS new_column_4 +FROM tcloud_demo.model_a +WHERE + event_date BETWEEN @start_date AND @end_date \ No newline at end of file diff --git a/models/examples/snapshot_by_time_model.sql b/models/examples/snapshot_by_time_model.sql index 700fb80..d212e5b 100644 --- a/models/examples/snapshot_by_time_model.sql +++ b/models/examples/snapshot_by_time_model.sql @@ -3,7 +3,8 @@ MODEL ( kind SCD_TYPE_2_BY_TIME ( unique_key id, updated_at_name event_date - ) + ), + enabled false ); SELECT diff --git a/models/incremental_events.sql b/models/incremental_events.sql index 30ff0a3..03a45e9 100644 --- a/models/incremental_events.sql +++ b/models/incremental_events.sql @@ -1,64 +1,59 @@ MODEL ( name tcloud_demo.incremental_events, kind INCREMENTAL_BY_TIME_RANGE ( - time_column (event_timestamp, '%Y-%m-%d'), -- DELETE by time range, then INSERT - lookback 2, -- handle late arriving events for the past 2 (2*1) days based on cron interval - forward_only true -- All changes will be forward only - ), + time_column (event_timestamp, '%Y-%m-%d'), /* DELETE by time range, then INSERT */ + lookback 2 /* handle late arriving events for the past 2 (2*1) days based on cron interval */ + ) /* forward_only true -- All changes will be forward only */, start '2024-06-17', cron '@daily', grain event_id, - stamp 'demo-sung', --should be unique every time - audits (UNIQUE_VALUES(columns = ( -- data audit tests only run for the evaluated intervals - event_id - )), NOT_NULL(columns = ( - event_id - ))) + stamp 'demo-sung', /* should be unique every time */ + audits ( + UNIQUE_VALUES( + columns = ( + event_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + event_id + )) + ) ); --- How to work with incremental forward only models --- step 1: `sqlmesh plan dev` to create this model for the first time and backfill for all of history --- step 2: change the user_intent_level conditional value --- step 3: pick a start date to backfill like: '2024-06-18' --- step 4: validate only a portion of rows were backfilled: sqlmesh fetchdf "select * from tcloud_demo__dev.incremental_events" --- step 5: `sqlmesh plan` to promote to prod with a virtual update, note: the dev backfill preview won't be reused for promotion and is only for dev purposes --- step 6: sqlmesh plan --restate-model "tcloud_demo.incremental_events", to invoke a backfill to mirror dev's data preview --- step 7: pick the same backfill start date for prod as dev's above: '2024-06-18' --- step 8: validate changes to prod: sqlmesh fetchdf "select * from tcloud_demo.incremental_events" --- Note: by default, only complete intervals are processed, so if today was 2024-06-21 and the day isn't over, it would NOT backfill the day's interval of data because it's not complete - +/* How to work with incremental forward only models */ /* step 1: `sqlmesh plan dev` to create this model for the first time and backfill for all of history */ /* step 2: change the user_intent_level conditional value */ /* step 3: pick a start date to backfill like: '2024-06-18' */ /* step 4: validate only a portion of rows were backfilled: sqlmesh fetchdf "select * from tcloud_demo__dev.incremental_events" */ /* step 5: `sqlmesh plan` to promote to prod with a virtual update, note: the dev backfill preview won't be reused for promotion and is only for dev purposes */ /* step 6: sqlmesh plan --restate-model "tcloud_demo.incremental_events", to invoke a backfill to mirror dev's data preview */ /* step 7: pick the same backfill start date for prod as dev's above: '2024-06-18' */ /* step 8: validate changes to prod: sqlmesh fetchdf "select * from tcloud_demo.incremental_events" */ /* Note: by default, only complete intervals are processed, so if today was 2024-06-21 and the day isn't over, it would NOT backfill the day's interval of data because it's not complete */ SELECT event_id, + 5 AS dummy_id, event_name, - event_timestamp::date as event_timestamp, + event_timestamp::DATE AS event_timestamp, user_id, - IF(event_name = 'blog_view', 'high', 'low') AS user_intent_level, -FROM sqlmesh-public-demo.tcloud_raw_data.raw_events --external model, automatically generate yaml using command: `sqlmesh create_external_models` + IF(event_name = 'blog_view', 'high', 'low') AS user_intent_level +FROM sqlmesh-public-demo.tcloud_raw_data.raw_events WHERE - event_timestamp BETWEEN @start_ds AND @end_ds; -- use the correct time format: https://sqlmesh.readthedocs.io/en/stable/concepts/macros/macro_variables/#temporal-variables + event_timestamp BETWEEN @start_ds AND @end_ds; - --- track observer metrics with plain SQL @measure( SELECT - event_timestamp::date AS ts, -- Custom measure time column `ts` - COUNT(*) AS daily_row_count, -- Daily row count - COUNT(DISTINCT event_name) AS unique_event_name_count, -- Count unique event_name values + event_timestamp::DATE AS ts, /* Custom measure time column `ts` */ + COUNT(*) AS daily_row_count, /* Daily row count */ + COUNT(DISTINCT event_name) AS unique_event_name_count /* Count unique event_name values */ FROM tcloud_demo.incremental_events - WHERE event_timestamp BETWEEN @start_ds AND @end_ds -- Filter measure on time - GROUP BY event_timestamp -- Group measure by time -); + WHERE + event_timestamp BETWEEN @start_ds AND @end_ds + GROUP BY + event_timestamp /* Group measure by time */ +) /* track observer metrics with plain SQL */; + +/* you can use macros to dynamically track metrics you care about */ +@DEF(event_names, ['page_view', 'product_view', 'ad_view', 'video_view', 'blog_view']); --- you can use macros to dynamically track metrics you care about -@DEF(event_names, ["page_view", "product_view", "ad_view", "video_view", "blog_view"]); @measure( SELECT - event_timestamp::date as ts, - @EACH( - @event_names, - x -> COUNT(CASE WHEN event_name = x THEN 1 END) AS @{x}_count - ), + event_timestamp::DATE AS ts, + @EACH(@event_names, x -> COUNT(CASE WHEN event_name = x THEN 1 END) AS @{x}_count) FROM tcloud_demo.incremental_events - WHERE event_timestamp::date BETWEEN @start_ds AND @end_ds - group by event_timestamp::date -); + WHERE + event_timestamp::DATE BETWEEN @start_ds AND @end_ds + GROUP BY + event_timestamp::DATE +) \ No newline at end of file diff --git a/models/incremental_events_allow_partials.sql b/models/incremental_events_allow_partials.sql index e52a3f1..169b1fd 100644 --- a/models/incremental_events_allow_partials.sql +++ b/models/incremental_events_allow_partials.sql @@ -1,37 +1,32 @@ MODEL ( name tcloud_demo.incremental_events_allow_partials, kind INCREMENTAL_BY_TIME_RANGE ( - time_column (event_timestamp, '%Y-%m-%d'), -- DELETE by time range, then INSERT - forward_only true, -- All changes will be forward only + time_column (event_timestamp, '%Y-%m-%d'), /* DELETE by time range, then INSERT */ + forward_only TRUE /* All changes will be forward only */ ), start '2024-06-17', cron '@daily', grain event_id, - allow_partials true, --ignores cron - audits (UNIQUE_VALUES(columns = ( -- data audit tests only run for the evaluated intervals - event_id - )), NOT_NULL(columns = ( - event_id - ))) + allow_partials TRUE, /* ignores cron */ + audits ( + UNIQUE_VALUES( + columns = ( + event_id + ) /* data audit tests only run for the evaluated intervals */ + ), + NOT_NULL(columns = ( + event_id + )) + ) ); --- How to work with incremental forward only models --- step 1: `sqlmesh plan dev` to create this model for the first time and backfill for all of history --- step 2: change the user_intent_level conditional value --- step 3: pick a start date to backfill like: '2024-06-18' --- step 4: validate only a portion of rows were backfilled: sqlmesh fetchdf "select * from tcloud_demo__dev.incremental_events" --- step 5: `sqlmesh plan` to promote to prod with a virtual update, note: the dev backfill preview won't be reused for promotion and is only for dev purposes --- step 6: sqlmesh plan --restate-model "tcloud_demo.incremental_events", to invoke a backfill to mirror dev's data preview --- step 7: pick the same backfill start date for prod as dev's above: '2024-06-18' --- step 8: validate changes to prod: sqlmesh fetchdf "select * from tcloud_demo.incremental_events" --- Note: by default, only complete intervals are processed, so if today was 2024-06-21 and the day isn't over, it would NOT backfill the day's interval of data because it's not complete - +/* How to work with incremental forward only models */ /* step 1: `sqlmesh plan dev` to create this model for the first time and backfill for all of history */ /* step 2: change the user_intent_level conditional value */ /* step 3: pick a start date to backfill like: '2024-06-18' */ /* step 4: validate only a portion of rows were backfilled: sqlmesh fetchdf "select * from tcloud_demo__dev.incremental_events" */ /* step 5: `sqlmesh plan` to promote to prod with a virtual update, note: the dev backfill preview won't be reused for promotion and is only for dev purposes */ /* step 6: sqlmesh plan --restate-model "tcloud_demo.incremental_events", to invoke a backfill to mirror dev's data preview */ /* step 7: pick the same backfill start date for prod as dev's above: '2024-06-18' */ /* step 8: validate changes to prod: sqlmesh fetchdf "select * from tcloud_demo.incremental_events" */ /* Note: by default, only complete intervals are processed, so if today was 2024-06-21 and the day isn't over, it would NOT backfill the day's interval of data because it's not complete */ SELECT event_id, event_name, - event_timestamp::date as event_timestamp, + event_timestamp::DATE AS event_timestamp, user_id, - IF(event_name = 'blog_view', 'high', 'low') AS user_intent_level, -FROM sqlmesh-public-demo.tcloud_raw_data.raw_events --external model, automatically generate yaml using command: `sqlmesh create_external_models` + IF(event_name = 'blog_view', 'high', 'low') AS user_intent_level +FROM sqlmesh-public-demo.tcloud_raw_data.raw_events WHERE - event_timestamp BETWEEN @start_ds AND @end_ds -- use the correct time format: https://sqlmesh.readthedocs.io/en/stable/concepts/macros/macro_variables/#temporal-variables + event_timestamp BETWEEN @start_ds AND @end_ds \ No newline at end of file diff --git a/models/orders.sql b/models/orders.sql index ce0ec4d..b40ca10 100644 --- a/models/orders.sql +++ b/models/orders.sql @@ -2,11 +2,14 @@ MODEL ( name tcloud_demo.orders, cron '@daily', grain order_id, - audits (UNIQUE_VALUES(columns = ( - surrogate_key - )), NOT_NULL(columns = ( - surrogate_key - ))) + audits ( + UNIQUE_VALUES(columns = ( + surrogate_key + )), + NOT_NULL(columns = ( + surrogate_key + )) + ) ); @DEF(payment_methods, ['credit_card', 'coupon', 'bank_transfer', 'gift_card']); @@ -20,7 +23,8 @@ WITH orders AS ( payment_id, order_id, payment_method, - amount + amount, + advanced_cll_column FROM tcloud_demo.stg_payments ), order_payments AS ( SELECT diff --git a/models/staging/stg_customers.sql b/models/staging/stg_customers.sql index f007a38..6ba5f07 100644 --- a/models/staging/stg_customers.sql +++ b/models/staging/stg_customers.sql @@ -2,11 +2,14 @@ MODEL ( name tcloud_demo.stg_customers, cron '@daily', grain org_id, - audits (UNIQUE_VALUES(columns = ( - customer_id - )), NOT_NULL(columns = ( - customer_id - ))) + audits ( + UNIQUE_VALUES(columns = ( + customer_id + )), + NOT_NULL(columns = ( + customer_id + )) + ) ); SELECT diff --git a/models/staging/stg_orders.sql b/models/staging/stg_orders.sql index 3e2ddf9..4ad426b 100644 --- a/models/staging/stg_orders.sql +++ b/models/staging/stg_orders.sql @@ -3,9 +3,9 @@ MODEL ( cron '@daily', grain order_id, audits (UNIQUE_VALUES(columns = ( - order_id - )), NOT_NULL(columns = ( - order_id + order_id + )), NOT_NULL(columns = ( + order_id ))) ); diff --git a/models/staging/stg_payments.sql b/models/staging/stg_payments.sql index a56b0f4..04ae12c 100644 --- a/models/staging/stg_payments.sql +++ b/models/staging/stg_payments.sql @@ -1,11 +1,11 @@ MODEL ( name tcloud_demo.stg_payments, cron '@daily', - grain payment_id, + grains payment_id, audits (UNIQUE_VALUES(columns = ( - payment_id - )), NOT_NULL(columns = ( - payment_id + payment_id + )), NOT_NULL(columns = ( + payment_id ))) ); @@ -13,10 +13,11 @@ SELECT id AS payment_id, order_id, payment_method, + 'advanced_cll_column' AS advanced_cll_column, amount / 100 AS amount, /* `amount` is currently stored in cents, so we convert it to dollars */ - -- 'new_column' AS new_column, /* non-breaking change example */ -FROM tcloud_demo.seed_raw_payments - --- how to generate unit test code without manually writing yaml by hand --- this will generate a file in the tests/ folder: test_stg_payments.yaml --- sqlmesh create_test tcloud_demo.stg_payments --query tcloud_demo.seed_raw_payments "select * from tcloud_demo.seed_raw_payments limit 5" \ No newline at end of file + 'new_column' AS new_column_V3, /* non-breaking change example */ + 'new_column' AS new_column_v2, /* non-breaking change example */ + 'new_column' AS new_column_v4, /* non-breaking change example */ + 'new_column' AS new_column_v5 /* non-breaking change example */ +/* 'new_column' AS new_column_v6 /* non-breaking change example */ */ +FROM tcloud_demo.seed_raw_payments \ No newline at end of file diff --git a/packages.txt b/packages.txt new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 827e90c..57a7158 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.12" -tcloud = "2.0.0" +tcloud = "2.5.0" typer="^0.12.3" google-auth="2.34.0" google-cloud-bigquery="3.25.0" diff --git a/repo_b/config.yaml b/repo_b/config.yaml new file mode 100644 index 0000000..c904c0c --- /dev/null +++ b/repo_b/config.yaml @@ -0,0 +1,54 @@ +gateways: + tobiko_cloud: # this will use the config in tcloud.yaml for state_connection + scheduler: # TODO: add the connection info below into the Tobiko Cloud Connections Page with the service account json in plain text for this scheduler + type: cloud + connection: # This connection is used for automatic unit test generation + type: bigquery + method: service-account-json + concurrent_tasks: 5 + register_comments: true + keyfile_json: {{ env_var('GOOGLE_SQLMESH_CREDENTIALS') }} + project: sqlmesh-public-demo + postgres: + connection: + type: postgres + host: {{ env_var('SQLMESH_STATE_HOST') }} + port: 5432 + user: {{ env_var('SQLMESH_STATE_USERNAME') }} + password: {{ env_var('SQLMESH_STATE_PASSWORD') }} + database: sqlmesh_state_demo + +default_gateway: tobiko_cloud +project: repo_b + + +model_defaults: + dialect: bigquery + start: 2024-12-01 + validate_query: true + +# enables synchronized deployments to prod when a PR is merged +cicd_bot: + type: github + merge_method: squash + enable_deploy_command: true + skip_pr_backfill: false + auto_categorize_changes: + external: full + python: full + sql: full + seed: full + +plan: + enable_preview: true + +# list of users that are allowed to approve PRs for synchronized deployments +users: +- username: sung_sqlmesh_demo + github_username: sungchun12 + roles: + - required_approver +- username: afzal_sqlmesh_demo + github_username: afzaljasani + roles: + - required_approver \ No newline at end of file diff --git a/repo_b/models/model_2.sql b/repo_b/models/model_2.sql new file mode 100644 index 0000000..6eb0646 --- /dev/null +++ b/repo_b/models/model_2.sql @@ -0,0 +1,20 @@ +MODEL ( + name repo_b.model_2, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) +); + +SELECT + item_id, + event_date, + new_column, + 'new_column_2' AS new_column_2, + 'new_column_3' AS new_column_3, + 'new_column_4' AS new_column_4 -- repo b only breaking change +FROM tcloud_demo.model_1 +WHERE + event_date BETWEEN @start_date AND @end_date \ No newline at end of file diff --git a/repo_b/tcloud.yaml b/repo_b/tcloud.yaml new file mode 100644 index 0000000..0b2b777 --- /dev/null +++ b/repo_b/tcloud.yaml @@ -0,0 +1,6 @@ +# projects: +# public-demo: +# url: https://cloud.tobikodata.com/sqlmesh/tobiko/public-demo/ +# gateway: tobiko_cloud +# extras: bigquery,web,github,postgres +# default_project: public-demo \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..528f658 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +tcloud +tobiko-cloud-scheduler-facade[airflow,dagster]==1.0.2 +pre-commit +dagster +dagster-webserver \ No newline at end of file diff --git a/sqlmesh-enterprise b/sqlmesh-enterprise deleted file mode 160000 index 62b11ab..0000000 --- a/sqlmesh-enterprise +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 62b11ab76008fdadce8b92b19c7f6c36ec07ff81 diff --git a/tcloud.yaml b/tcloud.yaml index a3aab31..5edd553 100644 --- a/tcloud.yaml +++ b/tcloud.yaml @@ -2,7 +2,7 @@ projects: public-demo: url: https://cloud.tobikodata.com/sqlmesh/tobiko/public-demo/ gateway: tobiko_cloud - extras: bigquery,web,github + extras: bigquery,web,github,postgres default_project: public-demo # export TCLOUD_TOKEN= in your shell and as a secret in github actions # you can alias the tcloud cli in your shell: alias sqlmesh='tcloud sqlmesh' \ No newline at end of file diff --git a/tests/dags/test_dag_example.py b/tests/dags/test_dag_example.py new file mode 100644 index 0000000..6ff3552 --- /dev/null +++ b/tests/dags/test_dag_example.py @@ -0,0 +1,83 @@ +"""Example DAGs test. This test ensures that all Dags have tags, retries set to two, and no import errors. This is an example pytest and may not be fit the context of your DAGs. Feel free to add and remove tests.""" + +import os +import logging +from contextlib import contextmanager +import pytest +from airflow.models import DagBag + + +@contextmanager +def suppress_logging(namespace): + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # prepend "(None,None)" to ensure that a test object is always created even if it's a no op. + return [(None, None)] + [ + (strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items() + ] + + +def get_dags(): + """ + Generate a tuple of dag_id, in the DagBag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + + +@pytest.mark.parametrize( + "rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()] +) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if rel_path and rv: + raise Exception(f"{rel_path} failed to import with message \n {rv}") + + +APPROVED_TAGS = {} + + +@pytest.mark.parametrize( + "dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_tags(dag_id, dag, fileloc): + """ + test if a DAG is tagged and if those TAGs are in the approved list + """ + assert dag.tags, f"{dag_id} in {fileloc} has no tags" + if APPROVED_TAGS: + assert not set(dag.tags) - APPROVED_TAGS + + +@pytest.mark.parametrize( + "dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_retries(dag_id, dag, fileloc): + """ + test if a DAG has retries set + """ + assert ( + dag.default_args.get("retries", None) >= 2 + ), f"{dag_id} in {fileloc} must have task retries >= 2." diff --git a/tests/test_full_model.yaml b/tests/test_full_model.yaml deleted file mode 100644 index d6db9a7..0000000 --- a/tests/test_full_model.yaml +++ /dev/null @@ -1,21 +0,0 @@ -test_example_full_model: - model: tcloud_demo.full_model - inputs: - tcloud_demo.incremental_model: - rows: - - id: 1 - item_id: 1 - event_date: '2020-01-01' - - id: 2 - item_id: 1 - event_date: '2020-01-02' - - id: 3 - item_id: 2 - event_date: '2020-01-03' - outputs: - query: - rows: - - item_id: 1 - num_orders: 2 - - item_id: 2 - num_orders: 1 diff --git a/utils/config.py b/utils/config.py new file mode 100644 index 0000000..dda43b1 --- /dev/null +++ b/utils/config.py @@ -0,0 +1,17 @@ +import os +from tobikodata.sqlmesh_enterprise.config.scheduler import CloudSchedulerConfig +from sqlmesh.core.config import ( + Config, + ModelDefaultsConfig, + GatewayConfig, +) + +config = Config( + model_defaults=ModelDefaultsConfig(dialect="bigquery"), + default_gateway="tobiko_cloud", + gateways={ + "tobiko_cloud": GatewayConfig( + scheduler = CloudSchedulerConfig(), + ) + } +) diff --git a/utils/their_config.py b/utils/their_config.py new file mode 100644 index 0000000..768dff9 --- /dev/null +++ b/utils/their_config.py @@ -0,0 +1,41 @@ +import os + +from hopper.sqlmesh.plus.loader import SqlMeshPlusLoader +from tobikodata.sqlmesh_enterprise.config.scheduler import RemoteCloudSchedulerConfig +from sqlmesh.core.config import Config, GatewayConfig +from sqlmesh.core.config.categorizer import AutoCategorizationMode +from sqlmesh.core.config.categorizer import CategorizerConfig +from sqlmesh.core.config.common import EnvironmentSuffixTarget +from sqlmesh.core.config.model import ModelDefaultsConfig +from sqlmesh.integrations.github.cicd.config import GithubCICDBotConfig +from sqlmesh.integrations.github.cicd.config import MergeMethod + +config = Config( + model_defaults=ModelDefaultsConfig( + dialect="bigquery", + cron="@hourly", + ), + default_gateway="development", + gateways={ + "development": GatewayConfig( + scheduler = RemoteCloudSchedulerConfig(), + ) + }, + default_target_environment=os.getenv("USER", "prod"), + environment_suffix_target=EnvironmentSuffixTarget.TABLE, + cicd_bot=GithubCICDBotConfig( + enable_deploy_command=True, + merge_method=MergeMethod.SQUASH, + auto_categorize_changes=CategorizerConfig( + external=AutoCategorizationMode.FULL, + python=AutoCategorizationMode.OFF, + sql=AutoCategorizationMode.FULL, + seed=AutoCategorizationMode.FULL, + ), + pr_include_unmodified=False, + run_on_deploy_to_prod=False, + skip_pr_backfill=False, + default_pr_start="1 day ago", + ), + loader=SqlMeshPlusLoader, +) diff --git a/utils/xdb_diff_demo.sh b/utils/xdb_diff_demo.sh new file mode 100644 index 0000000..55a1e16 --- /dev/null +++ b/utils/xdb_diff_demo.sh @@ -0,0 +1,3 @@ +# tcloud sqlmesh table_diff 'tobiko_cloud|tcloud_demo__dev_sung.incremental_model:postgres|sqlmesh_state_demo.sqlmesh._intervals' --on id --show-sample +tcloud sqlmesh table_diff 'postgres|sqlmesh_state_demo.sqlmesh._intervals:tobiko_cloud|tcloud_demo__dev_sung.incremental_model' --on id --show-sample +