Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sung/debug #14

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .astro/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
project:
name: tobiko-cloud-demo
1 change: 1 addition & 0 deletions .astro/dag_integrity_exceptions.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Add dag files to exempt from parse test below. ex: dags/<test-file>
141 changes: 141 additions & 0 deletions .astro/test_dag_integrity_default.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**"""

from contextlib import contextmanager
import logging
import os

import pytest

from airflow.models import DagBag, Variable, Connection
from airflow.hooks.base import BaseHook
from airflow.utils.db import initdb

# init airflow database
initdb()

# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables


# =========== MONKEYPATCH BaseHook.get_connection() ===========
def basehook_get_connection_monkeypatch(key: str, *args, **kwargs):
print(
f"Attempted to fetch connection during parse returning an empty Connection object for {key}"
)
return Connection(key)


BaseHook.get_connection = basehook_get_connection_monkeypatch
# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() ===========


# =========== MONKEYPATCH OS.GETENV() ===========
def os_getenv_monkeypatch(key: str, *args, **kwargs):
default = None
if args:
default = args[0] # os.getenv should get at most 1 arg after the key
if kwargs:
default = kwargs.get(
"default", None
) # and sometimes kwarg if people are using the sig

env_value = os.environ.get(key, None)

if env_value:
return env_value # if the env_value is set, return it
if (
key == "JENKINS_HOME" and default is None
): # fix https://github.com/astronomer/astro-cli/issues/601
return None
if default:
return default # otherwise return whatever default has been passed
return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value


os.getenv = os_getenv_monkeypatch
# # =========== /MONKEYPATCH OS.GETENV() ===========

# =========== MONKEYPATCH VARIABLE.GET() ===========


class magic_dict(dict):
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)

def __getitem__(self, key):
return {}.get(key, "MOCKED_KEY_VALUE")


_no_default = object() # allow falsey defaults


def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False):
print(
f"Attempted to get Variable value during parse, returning a mocked value for {key}"
)

if default_var is not _no_default:
return default_var
if deserialize_json:
return magic_dict()
return "NON_DEFAULT_MOCKED_VARIABLE_VALUE"


Variable.get = variable_get_monkeypatch
# # =========== /MONKEYPATCH VARIABLE.GET() ===========


@contextmanager
def suppress_logging(namespace):
"""
Suppress logging within a specific namespace to keep tests "clean" during build
"""
logger = logging.getLogger(namespace)
old_value = logger.disabled
logger.disabled = True
try:
yield
finally:
logger.disabled = old_value


def get_import_errors():
"""
Generate a tuple for import errors in the dag bag, and include DAGs without errors.
"""
with suppress_logging("airflow"):
dag_bag = DagBag(include_examples=False)

def strip_path_prefix(path):
return os.path.relpath(path, os.environ.get("AIRFLOW_HOME"))

# Initialize an empty list to store the tuples
result = []

# Iterate over the items in import_errors
for k, v in dag_bag.import_errors.items():
result.append((strip_path_prefix(k), v.strip()))

# Check if there are DAGs without errors
for file_path in dag_bag.dags:
# Check if the file_path is not in import_errors, meaning no errors
if file_path not in dag_bag.import_errors:
result.append((strip_path_prefix(file_path), "No import errors"))

return result


@pytest.mark.parametrize(
"rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]
)
def test_file_imports(rel_path, rv):
"""Test for import errors on a file"""
if os.path.exists(".astro/dag_integrity_exceptions.txt"):
with open(".astro/dag_integrity_exceptions.txt", "r") as f:
exceptions = f.readlines()
print(f"Exceptions: {exceptions}")
if (rv != "No import errors") and rel_path not in exceptions:
# If rv is not "No import errors," consider it a failed test
raise Exception(f"{rel_path} failed to import with message \n {rv}")
else:
# If rv is "No import errors," consider it a passed test
print(f"{rel_path} passed the import test")
8 changes: 8 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
astro
.git
.env
airflow_settings.yaml
logs/
.venv
airflow.db
airflow.cfg
16 changes: 5 additions & 11 deletions .github/workflows/tcloud_cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ on:
- synchronize
- opened
# Required if using comments to issue commands to the bot
# issue_comment:
# types:
# - created
issue_comment:
types:
- created
# Required if using required approvers to automate deployments
pull_request_review:
types:
Expand Down Expand Up @@ -45,20 +45,14 @@ jobs:
with:
ref: refs/pull/${{ github.event.issue.pull_request && github.event.issue.number || github.event.pull_request.number }}/merge

- name: Authenticate to Google Cloud
uses: 'google-github-actions/auth@v2'
with:
credentials_json: '${{ secrets.GOOGLE_SQLMESH_CREDENTIALS }}'

- name: Install tcloud CLI + Dependencies
run: |
pip install poetry
poetry install --no-root
pip install -r requirements.txt
shell: bash

- name: Run tcloud CI/CD Bot
run: |
poetry run tcloud sqlmesh_cicd -p ${{ github.workspace }} github --token ${{ secrets.GITHUB_TOKEN }} run-all
tcloud sqlmesh_cicd -p ${{ github.workspace }} github --token ${{ secrets.GITHUB_TOKEN }} run-all
env: # TODO: update your GitHub secrets to include TCLOUD_TOKEN, GOOGLE_SQLMESH_CREDENTIALS
TCLOUD_TOKEN: ${{ secrets.TCLOUD_TOKEN }}
GOOGLE_SQLMESH_CREDENTIALS: ${{ secrets.GOOGLE_SQLMESH_CREDENTIALS }}
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos: # List of repository sources for pre-commit hooks
- repo: local # Specifies that these hooks are defined locally (not from external repos)
hooks: # List of hooks to run
- id: sqlmesh-format # Unique identifier for this hook
name: SQLMesh Format # Human-readable name for the hook
entry: tcloud sqlmesh format # The command to run when the hook is triggered
language: system # Specifies that this hook runs using system commands
types: [sql] # File types this hook should run on (only SQL files)
pass_filenames: false # Don't pass the filenames to the command
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM quay.io/astronomer/astro-runtime:12.5.0
25 changes: 25 additions & 0 deletions airflow_settings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This file allows you to configure Airflow Connections, Pools, and Variables in a single place for local development only.
# NOTE: json dicts can be added to the conn_extra field as yaml key value pairs. See the example below.

# For more information, refer to our docs: https://www.astronomer.io/docs/astro/cli/develop-project#configure-airflow_settingsyaml-local-development-only
# For questions, reach out to: https://support.astronomer.io
# For issues create an issue ticket here: https://github.com/astronomer/astro-cli/issues

airflow:
connections:
- conn_id:
conn_type:
conn_host:
conn_schema:
conn_login:
conn_password:
conn_port:
conn_extra:
example_extra_field: example-value
pools:
- pool_name:
pool_slot:
pool_description:
variables:
- variable_name:
variable_value:
7 changes: 4 additions & 3 deletions audits/assert_positive_order_ids.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
AUDIT (
name assert_positive_order_ids,
name assert_positive_order_ids
);

SELECT *
SELECT
*
FROM @this_model
WHERE
item_id < 0
item_id < 0
12 changes: 12 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@ gateways:
register_comments: true
keyfile_json: {{ env_var('GOOGLE_SQLMESH_CREDENTIALS') }}
project: sqlmesh-public-demo
postgres:
connection:
type: postgres
host: {{ env_var('SQLMESH_STATE_HOST') }}
port: 5432
user: {{ env_var('SQLMESH_STATE_USERNAME') }}
password: {{ env_var('SQLMESH_STATE_PASSWORD') }}
database: sqlmesh_state_demo

default_gateway: tobiko_cloud
project: repo_a


model_defaults:
dialect: bigquery
start: 2024-12-01
validate_query: true

# enables synchronized deployments to prod when a PR is merged
cicd_bot:
type: github
merge_method: squash
enable_deploy_command: true
skip_pr_backfill: false
auto_categorize_changes:
external: full
python: full
Expand Down
Empty file added dags/.airflowignore
Empty file.
100 changes: 100 additions & 0 deletions dags/exampledag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
## Astronaut ETL example DAG

This DAG queries the list of astronauts currently in space from the
Open Notify API and prints each astronaut's name and flying craft.

There are two tasks, one to get the data from the API and save the results,
and another to print the results. Both tasks are written in Python using
Airflow's TaskFlow API, which allows you to easily turn Python functions into
Airflow tasks, and automatically infer dependencies and pass data.

The second task uses dynamic task mapping to create a copy of the task for
each Astronaut in the list retrieved from the API. This list will change
depending on how many Astronauts are in space, and the DAG will adjust
accordingly each time it runs.

For more explanation and getting started instructions, see our Write your
first DAG tutorial: https://www.astronomer.io/docs/learn/get-started-with-airflow

![Picture of the ISS](https://www.esa.int/var/esa/storage/images/esa_multimedia/images/2010/02/space_station_over_earth/10293696-3-eng-GB/Space_Station_over_Earth_card_full.jpg)
"""

from airflow import Dataset
from airflow.decorators import dag, task
from pendulum import datetime
import requests


# Define the basic parameters of the DAG, like schedule and start_date
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
doc_md=__doc__,
default_args={"owner": "Astro", "retries": 3},
tags=["example"],
)
def example_astronauts():
# Define tasks
@task(
# Define a dataset outlet for the task. This can be used to schedule downstream DAGs when this task has run.
outlets=[Dataset("current_astronauts")]
) # Define that this task updates the `current_astronauts` Dataset
def get_astronauts(**context) -> list[dict]:
"""
This task uses the requests library to retrieve a list of Astronauts
currently in space. The results are pushed to XCom with a specific key
so they can be used in a downstream pipeline. The task returns a list
of Astronauts to be used in the next task.
"""
try:
r = requests.get("http://api.open-notify.org/astros.json")
r.raise_for_status()
number_of_people_in_space = r.json()["number"]
list_of_people_in_space = r.json()["people"]
except:
print("API currently not available, using hardcoded data instead.")
number_of_people_in_space = 12
list_of_people_in_space = [
{"craft": "ISS", "name": "Oleg Kononenko"},
{"craft": "ISS", "name": "Nikolai Chub"},
{"craft": "ISS", "name": "Tracy Caldwell Dyson"},
{"craft": "ISS", "name": "Matthew Dominick"},
{"craft": "ISS", "name": "Michael Barratt"},
{"craft": "ISS", "name": "Jeanette Epps"},
{"craft": "ISS", "name": "Alexander Grebenkin"},
{"craft": "ISS", "name": "Butch Wilmore"},
{"craft": "ISS", "name": "Sunita Williams"},
{"craft": "Tiangong", "name": "Li Guangsu"},
{"craft": "Tiangong", "name": "Li Cong"},
{"craft": "Tiangong", "name": "Ye Guangfu"},
]

context["ti"].xcom_push(
key="number_of_people_in_space", value=number_of_people_in_space
)
return list_of_people_in_space

@task
def print_astronaut_craft(greeting: str, person_in_space: dict) -> None:
"""
This task creates a print statement with the name of an
Astronaut in space and the craft they are flying on from
the API request results of the previous task, along with a
greeting which is hard-coded in this example.
"""
craft = person_in_space["craft"]
name = person_in_space["name"]

print(f"{name} is currently in space flying on the {craft}! {greeting}")

# Use dynamic task mapping to run the print_astronaut_craft task for each
# Astronaut in space
print_astronaut_craft.partial(greeting="Hello! :)").expand(
person_in_space=get_astronauts() # Define dependencies using TaskFlow API syntax
)


# Instantiate the DAG
example_astronauts()
5 changes: 5 additions & 0 deletions dags/tcloud_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from tobikodata.scheduler_facades.airflow import SQLMeshEnterpriseAirflow

tobiko_cloud = SQLMeshEnterpriseAirflow(conn_id="tobiko_cloud")

first_task, last_task, dag = tobiko_cloud.create_cadence_dag(environment="prod")
5 changes: 5 additions & 0 deletions dagster-quickstart/data/processed_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name,age,city,age_group
1,Alice,28,New York,Young
2,Bob,35,San Francisco,Middle
3,Charlie,42,Chicago,Senior
4,Diana,31,Los Angeles,Middle
5 changes: 5 additions & 0 deletions dagster-quickstart/data/sample_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,name,age,city
1,Alice,28,New York
2,Bob,35,San Francisco
3,Charlie,42,Chicago
4,Diana,31,Los Angeles
Empty file.
Loading