Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG for exporting retool data #550

Merged
merged 19 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/6190849",
"sentry_environment": "development",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary. Once we have build setup for stellar-etl, will switch to stellar workspace.

"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -329,6 +330,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -349,9 +351,11 @@
"create_sandbox": 2400,
"current_state": 720,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"export_retool_data": 720,
"fee_stats": 840,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
Expand Down
4 changes: 4 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/5806618",
"sentry_environment": "production",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -327,6 +328,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -347,9 +349,11 @@
"create_sandbox": 1020,
"current_state": 1200,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"export_retool_data": 720,
"fee_stats": 360,
"history_assets": 360,
"liquidity_pool_trade_volume": 1200,
Expand Down
123 changes: 123 additions & 0 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
The external_data_dag DAG exports data from external sources.
It is scheduled to export information to BigQuery at regular intervals.
"""

from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_del_ins_operator import create_del_ins_task
from stellar_etl_airflow.build_internal_export_task import (
build_export_task,
get_airflow_metadata,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.utils import access_secret

init_sentry()

EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True)
EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project")
EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset")
RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"]
RETOOL_EXPORT_TASK_ID = "export_retool_data"

# Initialize the DAG
dag = DAG(
"external_data_dag",
default_args=get_default_dag_args(),
start_date=datetime(2024, 12, 5, 14, 30),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend updating this start date. Generally because this is set to run "0 22 * * *" you don't want to set the hour::minutes part of the datetime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9a4e60f

description="This DAG exports data from external sources such as retool.",
schedule_interval="0 22 * * *",
params={
"alias": "external",
chowbao marked this conversation as resolved.
Show resolved Hide resolved
},
render_template_as_native_obj=True,
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
user_defined_filters={
"fromjson": lambda s: loads(s),
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
"literal_eval": lambda e: literal_eval(e),
},
)


retool_export_task = build_export_task(
dag,
RETOOL_EXPORT_TASK_ID,
command="export-retool",
cmd_args=[
"--start-time",
"{{ subtract_data_interval(dag, data_interval_start).isoformat() }}",
"--end-time",
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}",
],
use_gcs=True,
env_vars={
"RETOOL_API_KEY": access_secret("retool-api-key", "default"),
chowbao marked this conversation as resolved.
Show resolved Hide resolved
},
)


def get_insert_to_bq_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function different than https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_bq_insert_job_task.py or anything else in ./dags/stellar_etl_ariflow/build_*?

If it's different it might be worthwhile to separate this out into either utils.py or another *.py file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it is different since it is combination of delete and insert. Whereas the one in build_bq_insert_job_task is just insertion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's fine then. Yeah a nit to move to a separate helper file. Also recommend renaming the function to be more descriptive of it deleting and inserting instead of just get_insert_to_bq_task. Maybe just add delete to the function name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I moved it to dags/stellar_etl_airflow/build_del_ins_operator.py in 9a4e60f

Also renamed the method to create_export_del_insert_operator. Essentially create_export_del_insert_operator is specific type of create_del_ins_task

table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task


retool_insert_to_bq_task = get_insert_to_bq_task(
table_name=RETOOL_TABLE_NAME,
project=EXTERNAL_DATA_PROJECT_NAME,
dataset=EXTERNAL_DATA_DATASET_NAME,
export_task_id=RETOOL_EXPORT_TASK_ID,
source_object_suffix="",
partition=False,
cluster=False,
chowbao marked this conversation as resolved.
Show resolved Hide resolved
table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}",
)

retool_export_task >> retool_insert_to_bq_task
12 changes: 1 addition & 11 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
from stellar_etl_airflow.utils import access_secret


def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"):
Expand Down
108 changes: 108 additions & 0 deletions dags/stellar_etl_airflow/build_internal_export_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions.
"""

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.default import alert_after_max_retries


def get_airflow_metadata():
return {
"batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}",
"batch_id": macros.get_batch_id(),
"run_id": "{{ run_id }}",
}


def build_export_task(
dag,
task_name,
command,
cmd_args=[],
env_vars={},
use_gcs=False,
resource_cfg="default",
):
namespace = conf.get("kubernetes", "NAMESPACE")

if namespace == "default":
config_file_location = Variable.get("kube_config_location")
in_cluster = False
else:
config_file_location = None
in_cluster = True

requests = {
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
container_resources = k8s.V1ResourceRequirements(requests=requests)

image = "{{ var.value.stellar_etl_internal_image_name }}"

output_filepath = ""
if use_gcs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: technically the gcs if/case could be split out into a separate function or interface so that it's easy to add support for other cloud storage systems. BUT I don't think we'll ever need support outside of GCS for this so it's fine to leave this as is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I think in general we could try to write the operators as classes, which can help with overriding.. Right now, it is more functional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly one day we'll probably refactor all of stellar-etl-airflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤞

metadata = get_airflow_metadata()
batch_insert_ts = metadata["batch_insert_ts"]
batch_date = metadata["batch_date"]
batch_id = metadata["batch_id"]
run_id = metadata["run_id"]

output_filepath = os.path.join(
Variable.get("gcs_exported_object_prefix"),
run_id,
f"{task_name}-exported-entity.txt",
)

cmd_args = cmd_args + [
"--cloud-storage-bucket",
Variable.get("gcs_exported_data_bucket_name"),
"--cloud-provider",
"gcp",
"--output",
output_filepath,
"-u",
f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'",
]
etl_cmd_string = " ".join(cmd_args)
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json"""
env_vars.update(
{
"EXECUTION_DATE": "{{ ds }}",
"AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}",
}
)

return KubernetesPodOperator(
task_id=task_name,
name=task_name,
namespace=Variable.get("k8s_namespace"),
service_account_name=Variable.get("k8s_service_account"),
env_vars=env_vars,
image=image,
cmds=["bash", "-c"],
arguments=[arguments],
do_xcom_push=True,
dag=dag,
is_delete_operator_pod=True,
startup_timeout_seconds=720,
in_cluster=in_cluster,
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
trigger_rule="all_done",
)
11 changes: 11 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState
from kubernetes import client, config

base_log_folder = conf.get("logging", "base_log_folder")

Expand Down Expand Up @@ -100,3 +102,12 @@ def skip_retry_dbt_errors(context) -> None:
return
else:
return


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
Loading
Loading