Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG for exporting retool data #550

Merged
merged 19 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/6190849",
"sentry_environment": "development",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary. Once we have build setup for stellar-etl, will switch to stellar workspace.

"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -329,6 +330,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -349,9 +351,11 @@
"create_sandbox": 2400,
"current_state": 720,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"export_retool_data": 720,
"fee_stats": 840,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
Expand Down
4 changes: 4 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/5806618",
"sentry_environment": "production",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -327,6 +328,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -347,9 +349,11 @@
"create_sandbox": 1020,
"current_state": 1200,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"export_retool_data": 720,
"fee_stats": 360,
"history_assets": 360,
"liquidity_pool_trade_volume": 1200,
Expand Down
81 changes: 81 additions & 0 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
The external_data_dag DAG exports data from external sources.
It is scheduled to export information to BigQuery at regular intervals.
"""

from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_operator import create_export_del_insert_operator
from stellar_etl_airflow.build_internal_export_task import build_export_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.utils import access_secret

init_sentry()

EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True)
EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project")
EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset")
RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"]
RETOOL_EXPORT_TASK_ID = "export_retool_data"

# Initialize the DAG
dag = DAG(
"external_data_dag",
default_args=get_default_dag_args(),
start_date=datetime(2024, 12, 16, 0, 0),
description="This DAG exports data from external sources such as retool.",
schedule_interval="0 22 * * *",
params={
"alias": "external",
chowbao marked this conversation as resolved.
Show resolved Hide resolved
},
render_template_as_native_obj=True,
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
user_defined_filters={
"fromjson": lambda s: loads(s),
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
"literal_eval": lambda e: literal_eval(e),
},
)


retool_export_task = build_export_task(
dag,
RETOOL_EXPORT_TASK_ID,
command="export-retool",
cmd_args=[
"--start-time",
"{{ subtract_data_interval(dag, data_interval_start).isoformat() }}",
"--end-time",
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}",
],
use_gcs=True,
env_vars={
"RETOOL_API_KEY": access_secret("retool-api-key", "default"),
chowbao marked this conversation as resolved.
Show resolved Hide resolved
},
)


retool_insert_to_bq_task = create_export_del_insert_operator(
dag,
table_name=RETOOL_TABLE_NAME,
project=EXTERNAL_DATA_PROJECT_NAME,
dataset=EXTERNAL_DATA_DATASET_NAME,
export_task_id=RETOOL_EXPORT_TASK_ID,
source_object_suffix="",
partition=False,
cluster=False,
chowbao marked this conversation as resolved.
Show resolved Hide resolved
table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}",
)

retool_export_task >> retool_insert_to_bq_task
42 changes: 42 additions & 0 deletions dags/stellar_etl_airflow/build_del_ins_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from airflow.operators.python import PythonOperator
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_internal_export_task import get_airflow_metadata
from stellar_etl_airflow.default import alert_after_max_retries


Expand Down Expand Up @@ -74,3 +78,41 @@ def create_del_ins_task(dag, task_vars, del_ins_callable):
on_failure_callback=alert_after_max_retries,
dag=dag,
)


def create_export_del_insert_operator(
dag,
table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task
12 changes: 1 addition & 11 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
from stellar_etl_airflow.utils import access_secret


def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"):
Expand Down
108 changes: 108 additions & 0 deletions dags/stellar_etl_airflow/build_internal_export_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions.
"""

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.default import alert_after_max_retries


def get_airflow_metadata():
return {
"batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}",
"batch_id": macros.get_batch_id(),
"run_id": "{{ run_id }}",
}


def build_export_task(
dag,
task_name,
command,
cmd_args=[],
env_vars={},
use_gcs=False,
resource_cfg="default",
):
namespace = conf.get("kubernetes", "NAMESPACE")

if namespace == "default":
config_file_location = Variable.get("kube_config_location")
in_cluster = False
else:
config_file_location = None
in_cluster = True

requests = {
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
container_resources = k8s.V1ResourceRequirements(requests=requests)

image = "{{ var.value.stellar_etl_internal_image_name }}"

output_filepath = ""
if use_gcs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: technically the gcs if/case could be split out into a separate function or interface so that it's easy to add support for other cloud storage systems. BUT I don't think we'll ever need support outside of GCS for this so it's fine to leave this as is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I think in general we could try to write the operators as classes, which can help with overriding.. Right now, it is more functional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly one day we'll probably refactor all of stellar-etl-airflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤞

metadata = get_airflow_metadata()
batch_insert_ts = metadata["batch_insert_ts"]
batch_date = metadata["batch_date"]
batch_id = metadata["batch_id"]
run_id = metadata["run_id"]

output_filepath = os.path.join(
Variable.get("gcs_exported_object_prefix"),
run_id,
f"{task_name}-exported-entity.txt",
)

cmd_args = cmd_args + [
"--cloud-storage-bucket",
Variable.get("gcs_exported_data_bucket_name"),
"--cloud-provider",
"gcp",
"--output",
output_filepath,
"-u",
f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'",
]
etl_cmd_string = " ".join(cmd_args)
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json"""
env_vars.update(
{
"EXECUTION_DATE": "{{ ds }}",
"AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}",
}
)

return KubernetesPodOperator(
task_id=task_name,
name=task_name,
namespace=Variable.get("k8s_namespace"),
service_account_name=Variable.get("k8s_service_account"),
env_vars=env_vars,
image=image,
cmds=["bash", "-c"],
arguments=[arguments],
do_xcom_push=True,
dag=dag,
is_delete_operator_pod=True,
startup_timeout_seconds=720,
in_cluster=in_cluster,
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
trigger_rule="all_done",
)
11 changes: 11 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState
from kubernetes import client, config

base_log_folder = conf.get("logging", "base_log_folder")

Expand Down Expand Up @@ -100,3 +102,12 @@ def skip_retry_dbt_errors(context) -> None:
return
else:
return


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
Loading
Loading