Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCTION] Update production Airflow environment #426

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: CI
name: CI-CD-DEV

on:
pull_request:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: release
name: CI-CD-PROD

on:
pull_request:
Expand Down
39 changes: 20 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,25 +449,26 @@ The `airflow_variables_*.txt` files provide a set of default values for variable

### **DBT Variables**

| Variable name | Description | Should be changed? |
| --------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------- |
| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed |
| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version |
| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout |
| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit |
| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name |
| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit |
| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name |
| dbt_target | the `target` that will used to run dbt | No, unless you want a different target |
| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count |
| dbt_tables | name of dbt tables to copy to sandbox | No |
| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. |
| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_elementary_secret | Necessary argument for elementary task | No |
| Variable name | Description | Should be changed? |
| --------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------- |
| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed |
| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version |
| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout |
| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit |
| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name |
| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit |
| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name |
| dbt_target | the `target` that will used to run dbt | No, unless you want a different target |
| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count |
| dbt_tables | name of dbt tables to copy to sandbox | No |
| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. |
| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. |
| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. |
| dbt_elementary_secret | Necessary argument for elementary task | No |
| dbt_transient_errors_patterns | Dictionary containing a name of a known dbt transient error as key and a list of string sentences to identify the error pattern as value | Yes, for every known error added |

### **Kubernetes-Specific Variables**

Expand Down
6 changes: 6 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
},
"dbt_target": "test",
"dbt_threads": 12,
"dbt_transient_errors_patterns": {
"elementary_concurrent_access": [
"Could not serialize access to table",
"due to concurrent update"
]
},
"gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
Expand Down
6 changes: 6 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@
},
"dbt_target": "prod",
"dbt_threads": 12,
"dbt_transient_errors_patterns": {
"elementary_concurrent_access": [
"Could not serialize access to table",
"due to concurrent update"
]
},
"gcs_exported_data_bucket_name": "us-central1-hubble-14c4ca64-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:98bea9a",
Expand Down
3 changes: 3 additions & 0 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries
from stellar_etl_airflow.utils import skip_retry_dbt_errors


def create_dbt_profile(project="prod"):
Expand Down Expand Up @@ -142,6 +143,7 @@ def dbt_task(
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
on_retry_callback=skip_retry_dbt_errors,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
Expand Down Expand Up @@ -223,6 +225,7 @@ def build_dbt_task(
config_file=config_file_location,
container_resources=resources_requests,
on_failure_callback=alert_after_max_retries,
on_retry_callback=skip_retry_dbt_errors,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
Expand Down
102 changes: 102 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState

base_log_folder = conf.get("logging", "base_log_folder")


def get_log_file_name(context):
ti = context["ti"]
log_params = [
base_log_folder,
f"dag_id={ti.dag_id}",
f"run_id={ti.run_id}",
f"task_id={ti.task_id}",
f"attempt={ti.try_number - 1}.log",
]
return "/".join(log_params)


def check_dbt_transient_errors(context):
"""
Searches through the logs to find failure messages
and returns True if the errors found are transient.
"""
log_file_path = get_log_file_name(context)
log_contents = read_log_file(log_file_path)

dbt_transient_error_patterns = Variable.get(
"dbt_transient_errors_patterns", deserialize_json=True
)

dbt_summary_line = None
for line in log_contents:
# Check for transient errors message patterns
for transient_error, patterns in dbt_transient_error_patterns.items():
if all(sentence in line for sentence in patterns):
logging.info(
f"Found {transient_error} dbt transient error, proceeding to retry"
)
return True
elif "Done. PASS=" in line:
dbt_summary_line = line
break
# Check if dbt summary has been logged
if dbt_summary_line:
match = re.search(r"ERROR=(\d+)", dbt_summary_line)
if match:
dbt_errors = int(match.group(1))
# Check if dbt pipeline returned errors
if dbt_errors > 0:
logging.info("Could not find dbt transient errors, skipping retry")
return False
else:
logging.info(
"dbt pipeline finished without errors, task failed but will not retry"
)
return False
# Logic could not identify the error and assumes it is transient
logging.info("Task failed due to unforeseen error, proceeding to retry")
return True


def read_log_file(log_file_path):
max_retries = 3
retry_delay = 30
log_contents = []

for attempt in range(max_retries):
try:
with open(log_file_path, "r") as log_file:
log_contents = log_file.readlines()
break
except FileNotFoundError as file_error:
if attempt < max_retries - 1:
logging.warn(
f"Log file {log_file_path} not found retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
logging.error(file_error)
raise
return log_contents


def skip_retry_dbt_errors(context) -> None:
"""
Set task state to SKIPPED in case errors found in dbt are not transient.
"""
if not check_dbt_transient_errors(context):
ti = context["ti"]
logging.info(
f"Set task instance {ti} state to \
{TaskInstanceState.SKIPPED} to skip retrying"
)
ti.set_state(TaskInstanceState.SKIPPED)
return
else:
return
Loading