Skip to content

Commit

Permalink
Move generic method to utils and update date
Browse files Browse the repository at this point in the history
  • Loading branch information
amishas157 committed Dec 16, 2024
1 parent 9497ebb commit 9a4e60f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
52 changes: 5 additions & 47 deletions dags/external_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_del_ins_operator import create_del_ins_task
from stellar_etl_airflow.build_internal_export_task import (
build_export_task,
get_airflow_metadata,
)
from stellar_etl_airflow.build_del_ins_operator import create_export_del_insert_operator
from stellar_etl_airflow.build_internal_export_task import build_export_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.utils import access_secret

Expand All @@ -36,7 +30,7 @@
dag = DAG(
"external_data_dag",
default_args=get_default_dag_args(),
start_date=datetime(2024, 12, 5, 14, 30),
start_date=datetime(2024, 12, 16, 0, 0),
description="This DAG exports data from external sources such as retool.",
schedule_interval="0 22 * * *",
params={
Expand Down Expand Up @@ -72,44 +66,8 @@
)


def get_insert_to_bq_task(
table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task


retool_insert_to_bq_task = get_insert_to_bq_task(
retool_insert_to_bq_task = create_export_del_insert_operator(
dag,
table_name=RETOOL_TABLE_NAME,
project=EXTERNAL_DATA_PROJECT_NAME,
dataset=EXTERNAL_DATA_DATASET_NAME,
Expand Down
42 changes: 42 additions & 0 deletions dags/stellar_etl_airflow/build_del_ins_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from airflow.operators.python import PythonOperator
from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import (
build_del_ins_from_gcs_to_bq_task,
)
from stellar_etl_airflow.build_internal_export_task import get_airflow_metadata
from stellar_etl_airflow.default import alert_after_max_retries


Expand Down Expand Up @@ -74,3 +78,41 @@ def create_del_ins_task(dag, task_vars, del_ins_callable):
on_failure_callback=alert_after_max_retries,
dag=dag,
)


def create_export_del_insert_operator(
dag,
table_name: str,
project: str,
dataset: str,
export_task_id: str,
source_object_suffix: str,
partition: bool,
cluster: bool,
table_id: str,
):
metadata = get_airflow_metadata()
source_objects = [
"{{ task_instance.xcom_pull(task_ids='"
+ export_task_id
+ '\')["output"] }}'
+ source_object_suffix
]
task_vars = {
"task_id": f"del_ins_{table_name}_task",
"project": project,
"dataset": dataset,
"table_name": table_name,
"export_task_id": export_task_id,
"source_object_suffix": source_object_suffix,
"partition": partition,
"cluster": cluster,
"batch_id": metadata["batch_id"],
"batch_date": metadata["batch_date"],
"source_objects": source_objects,
"table_id": table_id,
}
insert_to_bq_task = create_del_ins_task(
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)
return insert_to_bq_task

0 comments on commit 9a4e60f

Please sign in to comment.