Skip to content

Commit

Permalink
Add DAG for exporting retool data (#550)
Browse files Browse the repository at this point in the history
* Add DAG for exporting retool data

* udpate image

udpate image

udpate image

udpate image

udpate image

* Pass api key

* send cloud flags

* Add task to write to bq

Add alias

* rename

* Add batch_run_date

update xcom value

update

set xcom input

simplify

rename

* update suffix

* update image

* Add metadata fields

* Change schedule and update start/end time

* Add macro

* Refactor into methods

* moar refactor

update image

* update env_vars early on

* Extract secret from gcs

update

move

update

update

* Remove extranneous vars

* Move generic method to utils and update date
  • Loading branch information
amishas157 authored Dec 16, 2024
1 parent 042908c commit afc5bb1
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 11 deletions.
4 changes: 4 additions & 0 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/6190849",
"sentry_environment": "development",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -329,6 +330,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -349,9 +351,11 @@
"create_sandbox": 2400,
"current_state": 720,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 1620,
"elementary_generate_report": 1200,
"enriched_history_operations": 780,
"export_retool_data": 720,
"fee_stats": 840,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
Expand Down
4 changes: 4 additions & 0 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
"schema_filepath": "/home/airflow/gcs/dags/schemas/",
"sentry_dsn": "https://[email protected]/5806618",
"sentry_environment": "production",
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7",
"table_ids": {
"accounts": "accounts",
"assets": "history_assets",
Expand All @@ -327,6 +328,7 @@
"liquidity_pools": "liquidity_pools",
"offers": "offers",
"operations": "history_operations",
"retool_entity_data": "retool_entity_data",
"signers": "account_signers",
"trades": "history_trades",
"transactions": "history_transactions",
Expand All @@ -347,9 +349,11 @@
"create_sandbox": 1020,
"current_state": 1200,
"default": 60,
"del_ins_retool_entity_data_task": 720,
"elementary_dbt_data_quality": 2100,
"elementary_generate_report": 1200,
"enriched_history_operations": 1800,
"export_retool_data": 720,
"fee_stats": 360,
"history_assets": 360,
"liquidity_pool_trade_volume": 1200,
Expand Down
81 changes: 81 additions & 0 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
The external_data_dag DAG exports data from external sources.
It is scheduled to export information to BigQuery at regular intervals.
"""

from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_operator import create_export_del_insert_operator
from stellar_etl_airflow.build_internal_export_task import build_export_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.utils import access_secret

init_sentry()

EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True)
EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project")
EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset")
RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"]
RETOOL_EXPORT_TASK_ID = "export_retool_data"

# Initialize the DAG
dag = DAG(
"external_data_dag",
default_args=get_default_dag_args(),
start_date=datetime(2024, 12, 16, 0, 0),
description="This DAG exports data from external sources such as retool.",
schedule_interval="0 22 * * *",
params={
"alias": "external",
},
render_template_as_native_obj=True,
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
user_defined_filters={
"fromjson": lambda s: loads(s),
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
"literal_eval": lambda e: literal_eval(e),
},
)


retool_export_task = build_export_task(
dag,
RETOOL_EXPORT_TASK_ID,
command="export-retool",
cmd_args=[
"--start-time",
"{{ subtract_data_interval(dag, data_interval_start).isoformat() }}",
"--end-time",
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}",
],
use_gcs=True,
env_vars={
"RETOOL_API_KEY": access_secret("retool-api-key", "default"),
},
)


retool_insert_to_bq_task = create_export_del_insert_operator(
dag,
table_name=RETOOL_TABLE_NAME,
project=EXTERNAL_DATA_PROJECT_NAME,
dataset=EXTERNAL_DATA_DATASET_NAME,
export_task_id=RETOOL_EXPORT_TASK_ID,
source_object_suffix="",
partition=False,
cluster=False,
table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}",
)

retool_export_task >> retool_insert_to_bq_task
42 changes: 42 additions & 0 deletions dags/stellar_etl_airflow/build_del_ins_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from airflow.operators.python import PythonOperator
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_internal_export_task import get_airflow_metadata
from stellar_etl_airflow.default import alert_after_max_retries


Expand Down Expand Up @@ -74,3 +78,41 @@ def create_del_ins_task(dag, task_vars, del_ins_callable):
on_failure_callback=alert_after_max_retries,
dag=dag,
)


def create_export_del_insert_operator(
dag,
table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task
12 changes: 1 addition & 11 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
from stellar_etl_airflow.utils import access_secret


def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"):
Expand Down
108 changes: 108 additions & 0 deletions dags/stellar_etl_airflow/build_internal_export_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions.
"""

import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.configuration import conf
from airflow.models.variable import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.default import alert_after_max_retries


def get_airflow_metadata():
return {
"batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}",
"batch_id": macros.get_batch_id(),
"run_id": "{{ run_id }}",
}


def build_export_task(
dag,
task_name,
command,
cmd_args=[],
env_vars={},
use_gcs=False,
resource_cfg="default",
):
namespace = conf.get("kubernetes", "NAMESPACE")

if namespace == "default":
config_file_location = Variable.get("kube_config_location")
in_cluster = False
else:
config_file_location = None
in_cluster = True

requests = {
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}",
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}",
}
container_resources = k8s.V1ResourceRequirements(requests=requests)

image = "{{ var.value.stellar_etl_internal_image_name }}"

output_filepath = ""
if use_gcs:
metadata = get_airflow_metadata()
batch_insert_ts = metadata["batch_insert_ts"]
batch_date = metadata["batch_date"]
batch_id = metadata["batch_id"]
run_id = metadata["run_id"]

output_filepath = os.path.join(
Variable.get("gcs_exported_object_prefix"),
run_id,
f"{task_name}-exported-entity.txt",
)

cmd_args = cmd_args + [
"--cloud-storage-bucket",
Variable.get("gcs_exported_data_bucket_name"),
"--cloud-provider",
"gcp",
"--output",
output_filepath,
"-u",
f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'",
]
etl_cmd_string = " ".join(cmd_args)
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json"""
env_vars.update(
{
"EXECUTION_DATE": "{{ ds }}",
"AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}",
}
)

return KubernetesPodOperator(
task_id=task_name,
name=task_name,
namespace=Variable.get("k8s_namespace"),
service_account_name=Variable.get("k8s_service_account"),
env_vars=env_vars,
image=image,
cmds=["bash", "-c"],
arguments=[arguments],
do_xcom_push=True,
dag=dag,
is_delete_operator_pod=True,
startup_timeout_seconds=720,
in_cluster=in_cluster,
config_file=config_file_location,
container_resources=container_resources,
on_failure_callback=alert_after_max_retries,
image_pull_policy="IfNotPresent",
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")],
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[task_name]
),
trigger_rule="all_done",
)
11 changes: 11 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState
from kubernetes import client, config

base_log_folder = conf.get("logging", "base_log_folder")

Expand Down Expand Up @@ -100,3 +102,12 @@ def skip_retry_dbt_errors(context) -> None:
return
else:
return


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
Loading

0 comments on commit afc5bb1

Please sign in to comment.