-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DAG for exporting retool data #550
Changes from all commits
30ee494
ef7e871
47ca1d2
3e56ccb
1102681
718238e
c632660
fe6647d
6378f51
a2ec946
e15108f
52ca776
3d3fc6a
7ee9e0d
ab8be7a
06884ed
5920807
9497ebb
9a4e60f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -315,6 +315,7 @@ | |
"schema_filepath": "/home/airflow/gcs/dags/schemas/", | ||
"sentry_dsn": "https://[email protected]/6190849", | ||
"sentry_environment": "development", | ||
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:cd53bcf70", | ||
"table_ids": { | ||
"accounts": "accounts", | ||
"assets": "history_assets", | ||
|
@@ -329,6 +330,7 @@ | |
"liquidity_pools": "liquidity_pools", | ||
"offers": "offers", | ||
"operations": "history_operations", | ||
"retool_entity_data": "retool_entity_data", | ||
"signers": "account_signers", | ||
"trades": "history_trades", | ||
"transactions": "history_transactions", | ||
|
@@ -349,9 +351,11 @@ | |
"create_sandbox": 2400, | ||
"current_state": 720, | ||
"default": 60, | ||
"del_ins_retool_entity_data_task": 720, | ||
"elementary_dbt_data_quality": 1620, | ||
"elementary_generate_report": 1200, | ||
"enriched_history_operations": 780, | ||
"export_retool_data": 720, | ||
"fee_stats": 840, | ||
"history_assets": 720, | ||
"liquidity_pool_trade_volume": 1140, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -313,6 +313,7 @@ | |
"schema_filepath": "/home/airflow/gcs/dags/schemas/", | ||
"sentry_dsn": "https://[email protected]/5806618", | ||
"sentry_environment": "production", | ||
"stellar_etl_internal_image_name": "amishastellar/stellar-etl-internal:e3b9a2ea7", | ||
"table_ids": { | ||
"accounts": "accounts", | ||
"assets": "history_assets", | ||
|
@@ -327,6 +328,7 @@ | |
"liquidity_pools": "liquidity_pools", | ||
"offers": "offers", | ||
"operations": "history_operations", | ||
"retool_entity_data": "retool_entity_data", | ||
"signers": "account_signers", | ||
"trades": "history_trades", | ||
"transactions": "history_transactions", | ||
|
@@ -347,9 +349,11 @@ | |
"create_sandbox": 1020, | ||
"current_state": 1200, | ||
"default": 60, | ||
"del_ins_retool_entity_data_task": 720, | ||
"elementary_dbt_data_quality": 2100, | ||
"elementary_generate_report": 1200, | ||
"enriched_history_operations": 1800, | ||
"export_retool_data": 720, | ||
"fee_stats": 360, | ||
"history_assets": 360, | ||
"liquidity_pool_trade_volume": 1200, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
""" | ||
The external_data_dag DAG exports data from external sources. | ||
It is scheduled to export information to BigQuery at regular intervals. | ||
""" | ||
|
||
from ast import literal_eval | ||
from datetime import datetime | ||
from json import loads | ||
|
||
from airflow import DAG | ||
from airflow.configuration import conf | ||
from airflow.models.variable import Variable | ||
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator | ||
from kubernetes.client import models as k8s | ||
from stellar_etl_airflow import macros | ||
from stellar_etl_airflow.build_del_ins_operator import create_export_del_insert_operator | ||
from stellar_etl_airflow.build_internal_export_task import build_export_task | ||
from stellar_etl_airflow.default import get_default_dag_args, init_sentry | ||
from stellar_etl_airflow.utils import access_secret | ||
|
||
init_sentry() | ||
|
||
EXTERNAL_DATA_TABLE_NAMES = Variable.get("table_ids", deserialize_json=True) | ||
EXTERNAL_DATA_PROJECT_NAME = Variable.get("bq_project") | ||
EXTERNAL_DATA_DATASET_NAME = Variable.get("bq_dataset") | ||
RETOOL_TABLE_NAME = EXTERNAL_DATA_TABLE_NAMES["retool_entity_data"] | ||
RETOOL_EXPORT_TASK_ID = "export_retool_data" | ||
|
||
# Initialize the DAG | ||
dag = DAG( | ||
"external_data_dag", | ||
default_args=get_default_dag_args(), | ||
start_date=datetime(2024, 12, 16, 0, 0), | ||
description="This DAG exports data from external sources such as retool.", | ||
schedule_interval="0 22 * * *", | ||
params={ | ||
"alias": "external", | ||
chowbao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}, | ||
render_template_as_native_obj=True, | ||
user_defined_macros={ | ||
"subtract_data_interval": macros.subtract_data_interval, | ||
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, | ||
}, | ||
user_defined_filters={ | ||
"fromjson": lambda s: loads(s), | ||
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), | ||
"literal_eval": lambda e: literal_eval(e), | ||
}, | ||
) | ||
|
||
|
||
retool_export_task = build_export_task( | ||
dag, | ||
RETOOL_EXPORT_TASK_ID, | ||
command="export-retool", | ||
cmd_args=[ | ||
"--start-time", | ||
"{{ subtract_data_interval(dag, data_interval_start).isoformat() }}", | ||
"--end-time", | ||
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}", | ||
], | ||
use_gcs=True, | ||
env_vars={ | ||
"RETOOL_API_KEY": access_secret("retool-api-key", "default"), | ||
chowbao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}, | ||
) | ||
|
||
|
||
retool_insert_to_bq_task = create_export_del_insert_operator( | ||
dag, | ||
table_name=RETOOL_TABLE_NAME, | ||
project=EXTERNAL_DATA_PROJECT_NAME, | ||
dataset=EXTERNAL_DATA_DATASET_NAME, | ||
export_task_id=RETOOL_EXPORT_TASK_ID, | ||
source_object_suffix="", | ||
partition=False, | ||
cluster=False, | ||
chowbao marked this conversation as resolved.
Show resolved
Hide resolved
|
||
table_id=f"{EXTERNAL_DATA_PROJECT_NAME}.{EXTERNAL_DATA_DATASET_NAME}.{RETOOL_TABLE_NAME}", | ||
) | ||
|
||
retool_export_task >> retool_insert_to_bq_task |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
""" | ||
This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions. | ||
""" | ||
|
||
import os | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.configuration import conf | ||
from airflow.models.variable import Variable | ||
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator | ||
from kubernetes.client import models as k8s | ||
from stellar_etl_airflow import macros | ||
from stellar_etl_airflow.default import alert_after_max_retries | ||
|
||
|
||
def get_airflow_metadata(): | ||
return { | ||
"batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), | ||
"batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}", | ||
"batch_id": macros.get_batch_id(), | ||
"run_id": "{{ run_id }}", | ||
} | ||
|
||
|
||
def build_export_task( | ||
dag, | ||
task_name, | ||
command, | ||
cmd_args=[], | ||
env_vars={}, | ||
use_gcs=False, | ||
resource_cfg="default", | ||
): | ||
namespace = conf.get("kubernetes", "NAMESPACE") | ||
|
||
if namespace == "default": | ||
config_file_location = Variable.get("kube_config_location") | ||
in_cluster = False | ||
else: | ||
config_file_location = None | ||
in_cluster = True | ||
|
||
requests = { | ||
"cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", | ||
"memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", | ||
} | ||
container_resources = k8s.V1ResourceRequirements(requests=requests) | ||
|
||
image = "{{ var.value.stellar_etl_internal_image_name }}" | ||
|
||
output_filepath = "" | ||
if use_gcs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: technically the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. I think in general we could try to write the operators as classes, which can help with overriding.. Right now, it is more functional. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly one day we'll probably refactor all of stellar-etl-airflow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤞 |
||
metadata = get_airflow_metadata() | ||
batch_insert_ts = metadata["batch_insert_ts"] | ||
batch_date = metadata["batch_date"] | ||
batch_id = metadata["batch_id"] | ||
run_id = metadata["run_id"] | ||
|
||
output_filepath = os.path.join( | ||
Variable.get("gcs_exported_object_prefix"), | ||
run_id, | ||
f"{task_name}-exported-entity.txt", | ||
) | ||
|
||
cmd_args = cmd_args + [ | ||
"--cloud-storage-bucket", | ||
Variable.get("gcs_exported_data_bucket_name"), | ||
"--cloud-provider", | ||
"gcp", | ||
"--output", | ||
output_filepath, | ||
"-u", | ||
f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'", | ||
] | ||
etl_cmd_string = " ".join(cmd_args) | ||
arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json""" | ||
env_vars.update( | ||
{ | ||
"EXECUTION_DATE": "{{ ds }}", | ||
"AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", | ||
} | ||
) | ||
|
||
return KubernetesPodOperator( | ||
task_id=task_name, | ||
name=task_name, | ||
namespace=Variable.get("k8s_namespace"), | ||
service_account_name=Variable.get("k8s_service_account"), | ||
env_vars=env_vars, | ||
image=image, | ||
cmds=["bash", "-c"], | ||
arguments=[arguments], | ||
do_xcom_push=True, | ||
dag=dag, | ||
is_delete_operator_pod=True, | ||
startup_timeout_seconds=720, | ||
in_cluster=in_cluster, | ||
config_file=config_file_location, | ||
container_resources=container_resources, | ||
on_failure_callback=alert_after_max_retries, | ||
image_pull_policy="IfNotPresent", | ||
image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], | ||
sla=timedelta( | ||
seconds=Variable.get("task_sla", deserialize_json=True)[task_name] | ||
), | ||
trigger_rule="all_done", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary. Once we have build setup for stellar-etl, will switch to stellar workspace.