diff --git a/.github/workflows/ci.yml b/.github/workflows/ci-cd-dev.yml similarity index 99% rename from .github/workflows/ci.yml rename to .github/workflows/ci-cd-dev.yml index 2495befe..06007560 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci-cd-dev.yml @@ -1,4 +1,4 @@ -name: CI +name: CI-CD-DEV on: pull_request: diff --git a/.github/workflows/release.yml b/.github/workflows/ci-cd-prod.yml similarity index 99% rename from .github/workflows/release.yml rename to .github/workflows/ci-cd-prod.yml index 90270fc2..22e37b58 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/ci-cd-prod.yml @@ -1,4 +1,4 @@ -name: release +name: CI-CD-PROD on: pull_request: diff --git a/README.md b/README.md index c125e85d..2964f333 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ This repository contains the Airflow DAGs for the [Stellar ETL](https://github.c - [build_time_task](#build_time_task) - [build_export_task](#build_export_task) - [build_gcs_to_bq_task](#build_gcs_to_bq_task) + - [build_del_ins_from_gcs_to_bq_task](#build_del_ins_from_gcs_to_bq_task) - [build_apply_gcs_changes_to_bq_task](#build_apply_gcs_changes_to_bq_task) - [build_batch_stats](#build_batch_stats) - [bq_insert_job_task](#bq_insert_job_task) @@ -449,25 +450,26 @@ The `airflow_variables_*.txt` files provide a set of default values for variable ### **DBT Variables** -| Variable name | Description | Should be changed? | -| --------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------- | -| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed | -| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version | -| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout | -| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit | -| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name | -| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit | -| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name | -| dbt_target | the `target` that will used to run dbt | No, unless you want a different target | -| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count | -| dbt_tables | name of dbt tables to copy to sandbox | No | -| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. | -| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | -| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. | -| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | -| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. | -| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | -| dbt_elementary_secret | Necessary argument for elementary task | No | +| Variable name | Description | Should be changed? | +| --------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------- | +| dbt_full_refresh_models | JSON object. Each key should be a DBT model, and the value is a boolean controlling if the model should be run with `--full-refresh` | Yes, if desired for models that need to be full-refreshed | +| dbt_image_name | name of the `stellar-dbt` image to use | No, unless you need a specific image version | +| dbt_job_execution_timeout_seconds | timeout for dbt tasks in seconds | No, unless you want a different timeout | +| dbt_job_retries | number of times dbt_jobs will retry | No, unless you want a different retry limit | +| dbt_mart_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) for DBT marts | Yes. Change to your dataset name | +| dbt_maximum_bytes_billed | the max number of BigQuery bytes that can be billed when running DBT | No, unless you want a different limit | +| dbt_project | name of the Biquery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name | +| dbt_target | the `target` that will used to run dbt | No, unless you want a different target | +| dbt_threads | the number of threads that dbt will spawn to build a model | No, unless you want a different thread count | +| dbt_tables | name of dbt tables to copy to sandbox | No | +| dbt_internal_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. | +| dbt_internal_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | +| dbt_public_source_db | Name of the BigQuery [project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#console) | Yes. Change to your project name. | +| dbt_public_source_schema | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | +| dbt_slack_elementary_channel | Name of slack channel to send elementary alerts | Yes. Change to your slack channel name. | +| dbt_elementary_dataset | Name of the BigQuery [dataset](https://cloud.google.com/bigquery/docs/datasets) | Yes. Change to your dataset name. | +| dbt_elementary_secret | Necessary argument for elementary task | No | +| dbt_transient_errors_patterns | Dictionary containing a name of a known dbt transient error as key and a list of string sentences to identify the error pattern as value | Yes, for every known error added | ### **Kubernetes-Specific Variables** @@ -542,6 +544,7 @@ This section contains information about the Airflow setup. It includes our DAG d - [build_export_task](#build_export_task) - [build_gcs_to_bq_task](#build_gcs_to_bq_task) - [build_apply_gcs_changes_to_bq_task](#build_apply_gcs_changes_to_bq_task) + - [build_del_ins_from_gcs_to_bq_task](#build_del_ins_from_gcs_to_bq_task) - [build_batch_stats](#build_batch_stats) - [bq_insert_job_task](#bq_insert_job_task) - [cross_dependency_task](#cross_dependency_task) @@ -668,6 +671,10 @@ This section contains information about the Airflow setup. It includes our DAG d [This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_gcs_to_bq_task.py) contains methods for creating tasks that appends information from a Google Cloud Storage file to a BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history. +### **build_del_ins_from_gcs_to_bq_task** + +[This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py) contains methods for deleting data from a specified BigQuery table according to the batch interval and also imports data from gcs to the corresponding BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history and state data structures, as Stellar wants to keep a complete record of the ledger's entire history. + ### **build_apply_gcs_changes_to_bq_task** [This file](https://github.com/stellar/stellar-etl-airflow/blob/master/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py) contains methods for creating apply tasks. Apply tasks are used to merge a file from Google Cloud Storage into a BigQuery table. Apply tasks differ from the other task that appends in that they apply changes. This means that they update, delete, and insert rows. These tasks are used for accounts, offers, and trustlines, as the BigQuery table represents the point in time state of these data structures. This means that, for example, a merge task could alter the account balance field in the table if a user performed a transaction, delete a row in the table if a user deleted their account, or add a new row if a new account was created. diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index a3777bd5..496e1e3a 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -146,6 +146,12 @@ }, "dbt_target": "test", "dbt_threads": 12, + "dbt_transient_errors_patterns": { + "elementary_concurrent_access": [ + "Could not serialize access to table", + "due to concurrent update" + ] + }, "gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket", "gcs_exported_object_prefix": "dag-exported", "image_name": "stellar/stellar-etl:98bea9a", @@ -334,6 +340,7 @@ "asset_stats": 720, "build_batch_stats": 840, "build_bq_insert_job": 1080, + "build_del_ins_from_gcs_to_bq_task": 2000, "build_delete_data_task": 1020, "build_export_task": 840, "build_gcs_to_bq_task": 960, @@ -367,6 +374,7 @@ "build_bq_insert_job": 180, "build_copy_table": 180, "build_dbt_task": 960, + "build_del_ins_from_gcs_to_bq_task": 400, "build_delete_data_task": 180, "build_export_task": 420, "build_gcs_to_bq_task": 300, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 03fa07fa..b05d2b70 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -147,6 +147,12 @@ }, "dbt_target": "prod", "dbt_threads": 12, + "dbt_transient_errors_patterns": { + "elementary_concurrent_access": [ + "Could not serialize access to table", + "due to concurrent update" + ] + }, "gcs_exported_data_bucket_name": "us-central1-hubble-14c4ca64-bucket", "gcs_exported_object_prefix": "dag-exported", "image_name": "stellar/stellar-etl:98bea9a", @@ -332,6 +338,7 @@ "asset_stats": 420, "build_batch_stats": 600, "build_bq_insert_job": 840, + "build_del_ins_from_gcs_to_bq_task": 2000, "build_delete_data_task": 780, "build_export_task": 600, "build_gcs_to_bq_task": 660, @@ -365,6 +372,7 @@ "build_bq_insert_job": 180, "build_copy_table": 180, "build_dbt_task": 1800, + "build_del_ins_from_gcs_to_bq_task": 400, "build_delete_data_task": 180, "build_export_task": 300, "build_gcs_to_bq_task": 300, diff --git a/dags/history_tables_dag.py b/dags/history_tables_dag.py index a6368c86..68b7cbbd 100644 --- a/dags/history_tables_dag.py +++ b/dags/history_tables_dag.py @@ -1,5 +1,7 @@ """ -The history_archive_export DAG exports operations and trades from the history archives. +The history_table_export DAG exports trades, assets, ledgers, operations, transactions, effects, and contract events +from the history archives and loads the data into the corresponding BigQuery tables. It also performs a Delete operation +based on the batch interval value to perform clean up before the Insert to avoid any potential duplication issues. It is scheduled to export information to BigQuery at regular intervals. """ @@ -14,11 +16,19 @@ from stellar_etl_airflow.build_batch_stats import build_batch_stats from stellar_etl_airflow.build_bq_insert_job_task import build_bq_insert_job from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps +from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( + build_del_ins_from_gcs_to_bq_task, +) +from stellar_etl_airflow.build_del_ins_operator import ( + create_del_ins_task, + initialize_task_vars, +) from stellar_etl_airflow.build_delete_data_task import build_delete_data_task from stellar_etl_airflow.build_export_task import build_export_task from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task from stellar_etl_airflow.build_time_task import build_time_task from stellar_etl_airflow.default import ( + alert_after_max_retries, alert_sla_miss, get_default_dag_args, init_sentry, @@ -26,13 +36,13 @@ init_sentry() - +# Initialize the DAG dag = DAG( "history_table_export", default_args=get_default_dag_args(), - start_date=datetime(2024, 6, 11, 17, 30), + start_date=datetime(2024, 7, 10, 14, 30), catchup=True, - description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.", + description="This DAG exports information for the trades, assets, ledgers, operations, transactions, effects and contract events history tables.", schedule_interval="*/10 * * * *", params={ "alias": "cc", @@ -50,7 +60,11 @@ # sla_miss_callback=alert_sla_miss, ) +# Initialize batch metadata variables +batch_id = macros.get_batch_id() +batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +# Fetch necessary variables table_names = Variable.get("table_ids", deserialize_json=True) public_project = "{{ var.value.public_project }}" public_dataset = "{{ var.value.public_dataset }}" @@ -59,16 +73,14 @@ use_captive_core = literal_eval(Variable.get("use_captive_core")) txmeta_datastore_path = "{{ var.value.txmeta_datastore_path }}" - """ The time task reads in the execution time of the current run, as well as the next execution time. It converts these two times into ledger ranges. """ time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) - """ -The write batch stats task will take a snapshot of the DAG run_id, execution date, +The build batch stats task will take a snapshot of the DAG run_id, execution date, start and end ledgers so that reconciliation and data validation are easier. The record is written to an internal dataset for data eng use only. """ @@ -85,11 +97,11 @@ The results of the command are stored in a file. There is one task for each of the data types that can be exported from the history archives. - The DAG sleeps for 30 seconds after the export_task writes to the file to give the poststart.sh script time to copy the file over to the correct directory. If there is no sleep, the load task starts prematurely and will not load data. """ + op_export_task = build_export_task( dag, "archive", @@ -101,6 +113,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + trade_export_task = build_export_task( dag, "archive", @@ -112,6 +125,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + effects_export_task = build_export_task( dag, "archive", @@ -123,6 +137,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + tx_export_task = build_export_task( dag, "archive", @@ -134,6 +149,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + ledger_export_task = build_export_task( dag, "archive", @@ -145,6 +161,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + asset_export_task = build_export_task( dag, "archive", @@ -156,6 +173,7 @@ use_captive_core=use_captive_core, txmeta_datastore_path=txmeta_datastore_path, ) + contract_events_export_task = build_export_task( dag, "archive", @@ -168,22 +186,45 @@ txmeta_datastore_path=txmeta_datastore_path, ) - """ -The delete partition task checks to see if the given partition/batch id exists in +The delete part of the task checks to see if the given partition/batch id exists in Bigquery. If it does, the records are deleted prior to reinserting the batch. -""" - - -delete_old_op_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["operations"], "pub" -) +The Insert part of the task receive the location of the file in Google Cloud storage through Airflow's XCOM system. +Then, the task merges the unique entries in the file into the corresponding table in BigQuery. -delete_old_trade_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["trades"], "pub" -) +""" +del_ins_tasks = {} + +export_tasks = { + "operations": op_export_task, + "trades": trade_export_task, + "effects": effects_export_task, + "transactions": tx_export_task, + "ledgers": ledger_export_task, + "assets": asset_export_task, + "contract_events": contract_events_export_task, +} + + +for table_id, export_task in export_tasks.items(): + table_name = table_names[table_id] + task_vars = initialize_task_vars( + table_id, + table_name, + export_task.task_id, + batch_id, + batch_date, + public_project, + public_dataset, + ) + del_ins_tasks[table_id] = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task + ) +""" +Delete and Insert operations for the Enriched History Operations table +""" delete_enrich_op_pub_task = build_delete_data_task( dag, @@ -193,201 +234,64 @@ "pub", ) - -delete_old_effects_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["effects"], "pub" -) - - -delete_old_tx_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["transactions"], "pub" -) - - -delete_old_ledger_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["ledgers"], "pub" -) - -delete_old_asset_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["assets"], "pub" -) - -delete_contract_events_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["contract_events"], "pub" -) - - -""" -The send tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. -Then, the task merges the unique entries in the file into the corresponding table in BigQuery. -""" - -""" -Load final public dataset, crypto-stellar -""" -send_ops_to_pub_task = build_gcs_to_bq_task( +insert_enrich_op_pub_task = build_bq_insert_job( dag, - op_export_task.task_id, public_project, public_dataset, - table_names["operations"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) -send_trades_to_pub_task = build_gcs_to_bq_task( - dag, - trade_export_task.task_id, - public_project, - public_dataset, - table_names["trades"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) -send_effects_to_pub_task = build_gcs_to_bq_task( - dag, - effects_export_task.task_id, - public_project, - public_dataset, - table_names["effects"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) -send_txs_to_pub_task = build_gcs_to_bq_task( - dag, - tx_export_task.task_id, - public_project, - public_dataset, - table_names["transactions"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) -send_ledgers_to_pub_task = build_gcs_to_bq_task( - dag, - ledger_export_task.task_id, - public_project, - public_dataset, - table_names["ledgers"], - "", + "enriched_history_operations", partition=True, cluster=True, dataset_type="pub", ) - -send_assets_to_pub_task = build_gcs_to_bq_task( +dedup_assets_pub_task = build_bq_insert_job( dag, - asset_export_task.task_id, public_project, public_dataset, table_names["assets"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) - - -send_contract_events_to_pub_task = build_gcs_to_bq_task( - dag, - contract_events_export_task.task_id, - public_project, - public_dataset, - table_names["contract_events"], - "", - partition=True, - cluster=True, - dataset_type="pub", -) - - -insert_enriched_hist_pub_task = build_bq_insert_job( - dag, - public_project, - public_dataset, - "enriched_history_operations", partition=True, cluster=True, + create=True, dataset_type="pub", ) +# Set Task dependencies ( time_task >> write_op_stats >> op_export_task - >> delete_old_op_pub_task - >> send_ops_to_pub_task + >> del_ins_tasks["operations"] >> delete_enrich_op_pub_task - >> insert_enriched_hist_pub_task + >> insert_enrich_op_pub_task ) - -( - time_task - >> write_trade_stats - >> trade_export_task - >> delete_old_trade_pub_task - >> send_trades_to_pub_task -) - - -( - time_task - >> write_effects_stats - >> effects_export_task - >> delete_old_effects_pub_task - >> send_effects_to_pub_task -) - - +(time_task >> write_trade_stats >> trade_export_task >> del_ins_tasks["trades"]) +(time_task >> write_effects_stats >> effects_export_task >> del_ins_tasks["effects"]) ( time_task >> write_tx_stats >> tx_export_task - >> delete_old_tx_pub_task - >> send_txs_to_pub_task + >> del_ins_tasks["transactions"] >> delete_enrich_op_pub_task ) - -dedup_assets_pub_task = build_bq_insert_job( - dag, - public_project, - public_dataset, - table_names["assets"], - partition=True, - cluster=True, - create=True, - dataset_type="pub", -) ( time_task >> write_ledger_stats >> ledger_export_task - >> delete_old_ledger_pub_task - >> send_ledgers_to_pub_task + >> del_ins_tasks["ledgers"] >> delete_enrich_op_pub_task ) ( time_task >> write_asset_stats >> asset_export_task - >> delete_old_asset_pub_task - >> send_assets_to_pub_task + >> del_ins_tasks["assets"] >> dedup_assets_pub_task ) - ( time_task >> write_contract_events_stats >> contract_events_export_task - >> delete_contract_events_task - >> send_contract_events_to_pub_task + >> del_ins_tasks["contract_events"] ) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 06395991..236027f1 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -12,9 +12,14 @@ from kubernetes.client import models as k8s from stellar_etl_airflow import macros from stellar_etl_airflow.build_batch_stats import build_batch_stats -from stellar_etl_airflow.build_delete_data_task import build_delete_data_task +from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( + build_del_ins_from_gcs_to_bq_task, +) +from stellar_etl_airflow.build_del_ins_operator import ( + create_del_ins_task, + initialize_task_vars, +) from stellar_etl_airflow.build_export_task import build_export_task -from stellar_etl_airflow.build_gcs_to_bq_task import build_gcs_to_bq_task from stellar_etl_airflow.build_time_task import build_time_task from stellar_etl_airflow.default import ( alert_sla_miss, @@ -24,11 +29,11 @@ init_sentry() - +# Initialize the DAG dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime(2024, 6, 11, 17, 30), + start_date=datetime(2024, 7, 10, 14, 30), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/10 * * * *", params={ @@ -48,7 +53,11 @@ # sla_miss_callback=alert_sla_miss, ) +# Initialize batch metadata variables +batch_id = macros.get_batch_id() +batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" +# Fetch necessary variables table_names = Variable.get("table_ids", deserialize_json=True) internal_project = "{{ var.value.bq_project }}" internal_dataset = "{{ var.value.bq_dataset }}" @@ -59,8 +68,30 @@ use_captive_core = literal_eval(Variable.get("use_captive_core")) txmeta_datastore_path = "{{ var.value.txmeta_datastore_path }}" +# Ensure all required keys are present in table_names +required_keys = [ + "accounts", + "claimable_balances", + "offers", + "liquidity_pools", + "signers", + "trustlines", + "contract_data", + "contract_code", + "config_settings", + "ttl", +] + +missing_keys = [key for key in required_keys if key not in table_names] +if missing_keys: + raise KeyError(f"Missing Id in the table_ids Airflow Variable: {missing_keys}") +""" +The date task reads in the execution time of the current run, as well as the next +execution time. It converts these two times into ledger ranges. +""" date_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) + changes_task = build_export_task( dag, "bounded-core", @@ -73,7 +104,6 @@ txmeta_datastore_path=txmeta_datastore_path, ) - """ The write batch stats task will take a snapshot of the DAG run_id, execution date, start and end ledgers so that reconciliation and data validation are easier. The @@ -90,240 +120,69 @@ write_config_settings_stats = build_batch_stats(dag, table_names["config_settings"]) write_ttl_stats = build_batch_stats(dag, table_names["ttl"]) - """ -The delete partition task checks to see if the given partition/batch id exists in +The delete part of the task checks to see if the given partition/batch id exists in Bigquery. If it does, the records are deleted prior to reinserting the batch. -""" -delete_acc_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["accounts"], "pub" -) -delete_bal_pub_task = build_delete_data_task( - dag, - public_project, - public_dataset, - table_names["claimable_balances"], - "pub", -) -delete_off_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["offers"], "pub" -) -delete_pool_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["liquidity_pools"], "pub" -) -delete_sign_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["signers"], "pub" -) -delete_trust_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["trustlines"], "pub" -) -delete_contract_data_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["contract_data"], "pub" -) -delete_contract_code_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["contract_code"], "pub" -) -delete_config_settings_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["config_settings"], "pub" -) -delete_ttl_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["ttl"], "pub" -) - -""" -The apply tasks receive the location of the file in Google Cloud storage through Airflow's XCOM system. +The Insert part of the task receives the location of the file in Google Cloud storage through Airflow's XCOM system. Then, the task merges the entries in the file with the entries in the corresponding table in the public dataset. Entries are updated, deleted, or inserted as needed. """ -send_acc_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["accounts"], - "/*-accounts.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_bal_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["claimable_balances"], - "/*-claimable_balances.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_off_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["offers"], - "/*-offers.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_pool_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["liquidity_pools"], - "/*-liquidity_pools.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_sign_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["signers"], - "/*-signers.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_trust_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["trustlines"], - "/*-trustlines.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_contract_data_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["contract_data"], - "/*-contract_data.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_contract_code_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["contract_code"], - "/*-contract_code.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_config_settings_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["config_settings"], - "/*-config_settings.txt", - partition=True, - cluster=True, - dataset_type="pub", -) -send_ttl_to_pub_task = build_gcs_to_bq_task( - dag, - changes_task.task_id, - public_project, - public_dataset, - table_names["ttl"], - "/*-ttl.txt", - partition=True, - cluster=True, - dataset_type="pub", -) - -( - date_task - >> changes_task - >> write_acc_stats - >> delete_acc_pub_task - >> send_acc_to_pub_task -) - - -( - date_task - >> changes_task - >> write_bal_stats - >> delete_bal_pub_task - >> send_bal_to_pub_task -) - - -( - date_task - >> changes_task - >> write_off_stats - >> delete_off_pub_task - >> send_off_to_pub_task -) - - -( - date_task - >> changes_task - >> write_pool_stats - >> delete_pool_pub_task - >> send_pool_to_pub_task -) - - -( - date_task - >> changes_task - >> write_sign_stats - >> delete_sign_pub_task - >> send_sign_to_pub_task -) - - -( - date_task - >> changes_task - >> write_trust_stats - >> delete_trust_pub_task - >> send_trust_to_pub_task -) +del_ins_tasks = {} + +# Define the suffixes for the DAG related tables +source_object_suffix_mapping = { + "accounts": "/*-accounts.txt", + "claimable_balances": "/*-claimable_balances.txt", + "offers": "/*-offers.txt", + "liquidity_pools": "/*-liquidity_pools.txt", + "signers": "/*-signers.txt", + "trustlines": "/*-trustlines.txt", + "contract_data": "/*-contract_data.txt", + "contract_code": "/*-contract_code.txt", + "config_settings": "/*-config_settings.txt", + "ttl": "/*-ttl.txt", +} + +for table_id, source_object_suffix in source_object_suffix_mapping.items(): + table_name = table_names[table_id] # Get the expanded table name + task_vars = initialize_task_vars( + table_id, + table_name, + changes_task.task_id, + batch_id, + batch_date, + public_project, + public_dataset, + source_object_suffix=source_object_suffix, + ) + del_ins_tasks[table_id] = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task + ) + +# Set task dependencies +(date_task >> changes_task >> write_acc_stats >> del_ins_tasks["accounts"]) +(date_task >> changes_task >> write_bal_stats >> del_ins_tasks["claimable_balances"]) +(date_task >> changes_task >> write_off_stats >> del_ins_tasks["offers"]) +(date_task >> changes_task >> write_pool_stats >> del_ins_tasks["liquidity_pools"]) +(date_task >> changes_task >> write_sign_stats >> del_ins_tasks["signers"]) +(date_task >> changes_task >> write_trust_stats >> del_ins_tasks["trustlines"]) ( date_task >> changes_task >> write_contract_data_stats - >> delete_contract_data_task - >> send_contract_data_to_pub_task + >> del_ins_tasks["contract_data"] ) ( date_task >> changes_task >> write_contract_code_stats - >> delete_contract_code_task - >> send_contract_code_to_pub_task + >> del_ins_tasks["contract_code"] ) ( date_task >> changes_task >> write_config_settings_stats - >> delete_config_settings_task - >> send_config_settings_to_pub_task -) -( - date_task - >> changes_task - >> write_ttl_stats - >> delete_ttl_task - >> send_ttl_to_pub_task + >> del_ins_tasks["config_settings"] ) +(date_task >> changes_task >> write_ttl_stats >> del_ins_tasks["ttl"]) diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index df34f292..bb8667a9 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -8,6 +8,7 @@ ) from kubernetes.client import models as k8s from stellar_etl_airflow.default import alert_after_max_retries +from stellar_etl_airflow.utils import skip_retry_dbt_errors def create_dbt_profile(project="prod"): @@ -142,6 +143,7 @@ def dbt_task( config_file=config_file_location, container_resources=container_resources, on_failure_callback=alert_after_max_retries, + on_retry_callback=skip_retry_dbt_errors, image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], sla=timedelta( @@ -223,6 +225,7 @@ def build_dbt_task( config_file=config_file_location, container_resources=resources_requests, on_failure_callback=alert_after_max_retries, + on_retry_callback=skip_retry_dbt_errors, image_pull_policy="IfNotPresent", image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], sla=timedelta( diff --git a/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py b/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py new file mode 100644 index 00000000..4d76a2ee --- /dev/null +++ b/dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py @@ -0,0 +1,204 @@ +from datetime import timedelta + +from airflow.models import Variable +from airflow.operators.python import PythonOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator, +) +from sentry_sdk import capture_message, push_scope +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_apply_gcs_changes_to_bq_task import read_local_schema +from stellar_etl_airflow.default import alert_after_max_retries + + +class CustomGCSToBigQueryOperator(GCSToBigQueryOperator): + template_fields = list(GCSToBigQueryOperator.template_fields) + [ + "failed_transforms", + "max_failed_transforms", + "export_task_id", + ] + + def __init__( + self, failed_transforms, max_failed_transforms, export_task_id, **kwargs + ): + self.failed_transforms = failed_transforms + self.max_failed_transforms = max_failed_transforms + self.export_task_id = export_task_id + super().__init__(**kwargs) + + def pre_execute(self, context, **kwargs): + # Expand the template fields using Jinja2 templating from the context + self.failed_transforms = self.render_template(self.failed_transforms, context) + self.max_failed_transforms = self.render_template( + self.max_failed_transforms, context + ) + self.export_task_id = self.render_template(self.export_task_id, context) + + if int(self.failed_transforms) > self.max_failed_transforms: + with push_scope() as scope: + scope.set_tag("data-quality", "max-failed-transforms") + scope.set_extra("failed_transforms", self.failed_transforms) + scope.set_extra("file", f"gs://{self.bucket}/{self.source_objects[0]}") + scope.set_extra("task_id", self.task_id) + scope.set_extra("export_task_id", self.export_task_id) + scope.set_extra( + "destination_table", self.destination_project_dataset_table + ) + capture_message( + f"failed_transforms ({self.failed_transforms}) has exceeded the max value ({self.max_failed_transforms})", + "fatal", + ) + super().pre_execute(context, **kwargs) + + +def build_del_ins_from_gcs_to_bq_task( + project, + dataset, + table_id, + table_name, + export_task_id, + source_object_suffix, + partition, + cluster, + batch_id, + batch_date, + source_objects, + **context, +): + dag = context["dag"] + + # Delete operation + + DELETE_ROWS_QUERY = ( + f"DELETE FROM {dataset}.{table_name} " + f"WHERE batch_run_date = '{batch_date}'" + f"AND batch_id = '{batch_id}';" + ) + delete_task = BigQueryInsertJobOperator( + project_id=project, + task_id=f"delete_old_partition_{table_name}_bq", + execution_timeout=timedelta( + seconds=Variable.get("task_timeout", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + configuration={ + "query": { + "query": DELETE_ROWS_QUERY, + "useLegacySql": False, + } + }, + ) + delete_task.execute(context) + + # Insert operation + + bucket_name = Variable.get("gcs_exported_data_bucket_name") + history_tables = [ + "ledgers", + "assets", + "transactions", + "operations", + "trades", + "effects", + "contract_events", + ] + + if cluster: + cluster_fields = Variable.get("cluster_fields", deserialize_json=True) + cluster_fields = ( + cluster_fields[f"{table_name}"] + if table_id in history_tables + else cluster_fields[table_name] + ) + else: + cluster_fields = None + + time_partition = {} + if partition: + partition_fields = Variable.get("partition_fields", deserialize_json=True) + partition_fields = ( + partition_fields[f"{table_name}"] + if table_id in history_tables + else partition_fields[table_name] + ) + time_partition["type"] = partition_fields["type"] + time_partition["field"] = partition_fields["field"] + + staging_table_suffix = "" + if table_name == "history_assets": + staging_table_suffix = "_staging" + + schema_fields = read_local_schema(f"{table_name}") + + if table_id in history_tables: + gcs_to_bq_operator = CustomGCSToBigQueryOperator( + task_id=f"send_{table_name}_to_bq", + execution_timeout=timedelta( + seconds=Variable.get("task_timeout", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + bucket=bucket_name, + schema_fields=schema_fields, + schema_update_options=["ALLOW_FIELD_ADDITION"], + autodetect=False, + source_format="NEWLINE_DELIMITED_JSON", + source_objects=source_objects, + destination_project_dataset_table=f"{project}.{dataset}.{table_name}{staging_table_suffix}", + write_disposition="WRITE_APPEND", + create_disposition="CREATE_IF_NEEDED", + # schema_update_option="ALLOW_FIELD_ADDITION", + max_bad_records=0, + time_partitioning=time_partition, + cluster_fields=cluster_fields, + export_task_id=export_task_id, + failed_transforms="{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["failed_transforms"] }}', + max_failed_transforms=0, + on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + dag=dag, + ) + else: + gcs_to_bq_operator = GCSToBigQueryOperator( + task_id=f"send_{table_name}_to_bq", + execution_timeout=timedelta( + seconds=Variable.get("task_timeout", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + bucket=bucket_name, + schema_fields=schema_fields, + schema_update_options=["ALLOW_FIELD_ADDITION"], + autodetect=False, + source_format="NEWLINE_DELIMITED_JSON", + source_objects=source_objects, + destination_project_dataset_table=f"{project}.{dataset}.{table_name}{staging_table_suffix}", + write_disposition="WRITE_APPEND", + create_disposition="CREATE_IF_NEEDED", + max_bad_records=0, + time_partitioning=time_partition, + cluster_fields=cluster_fields, + on_failure_callback=alert_after_max_retries, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_del_ins_from_gcs_to_bq_task.__name__ + ] + ), + dag=dag, + ) + + gcs_to_bq_operator.execute(context) diff --git a/dags/stellar_etl_airflow/build_del_ins_operator.py b/dags/stellar_etl_airflow/build_del_ins_operator.py new file mode 100644 index 00000000..2e1b7156 --- /dev/null +++ b/dags/stellar_etl_airflow/build_del_ins_operator.py @@ -0,0 +1,74 @@ +from airflow.operators.python import PythonOperator + + +def initialize_task_vars( + table_id, + table_name, + export_task_id, + batch_id, + batch_date, + public_project, + public_dataset, + source_object_suffix="", + source_objects=None, +): + """ + Initialize task variables for data export and import. + + Args: + table_id (str): The ID of the table (json key). + table_name (str): The name of the table (json value). + export_task_id (str): Task ID of the export task. + batch_id (str): Batch ID. + batch_date (str): Batch date. + public_project (str): Public project name. + public_dataset (str): Public dataset name. + source_object_suffix (str): Suffix for source objects. + source_objects (list): List of source objects. + + Returns: + dict: Task variables. + """ + if source_objects is None: + source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix + ] + task_id = f"del_ins_{table_name}_task" + return { + "task_id": task_id, + "project": public_project, + "dataset": public_dataset, + "table_name": table_name, + "export_task_id": export_task_id, + "source_object_suffix": source_object_suffix, + "partition": True, + "cluster": True, + "batch_id": batch_id, + "batch_date": batch_date, + "source_objects": source_objects, + "table_id": table_id, + } + + +def create_del_ins_task(dag, task_vars, del_ins_callable): + """ + Create a PythonOperator for delete and insert tasks. + + Args: + dag (DAG): The DAG to which the task belongs. + task_vars (dict): Task variables. + del_ins_callable (callable): The callable function to be used in the PythonOperator. + + Returns: + PythonOperator: The created PythonOperator. + """ + return PythonOperator( + task_id=task_vars["task_id"], + python_callable=del_ins_callable, + op_kwargs=task_vars, + provide_context=True, + dag=dag, + ) diff --git a/dags/stellar_etl_airflow/utils.py b/dags/stellar_etl_airflow/utils.py new file mode 100644 index 00000000..a260fd9b --- /dev/null +++ b/dags/stellar_etl_airflow/utils.py @@ -0,0 +1,102 @@ +import logging +import re +import time + +from airflow.configuration import conf +from airflow.models import Variable +from airflow.utils.state import TaskInstanceState + +base_log_folder = conf.get("logging", "base_log_folder") + + +def get_log_file_name(context): + ti = context["ti"] + log_params = [ + base_log_folder, + f"dag_id={ti.dag_id}", + f"run_id={ti.run_id}", + f"task_id={ti.task_id}", + f"attempt={ti.try_number - 1}.log", + ] + return "/".join(log_params) + + +def check_dbt_transient_errors(context): + """ + Searches through the logs to find failure messages + and returns True if the errors found are transient. + """ + log_file_path = get_log_file_name(context) + log_contents = read_log_file(log_file_path) + + dbt_transient_error_patterns = Variable.get( + "dbt_transient_errors_patterns", deserialize_json=True + ) + + dbt_summary_line = None + for line in log_contents: + # Check for transient errors message patterns + for transient_error, patterns in dbt_transient_error_patterns.items(): + if all(sentence in line for sentence in patterns): + logging.info( + f"Found {transient_error} dbt transient error, proceeding to retry" + ) + return True + elif "Done. PASS=" in line: + dbt_summary_line = line + break + # Check if dbt summary has been logged + if dbt_summary_line: + match = re.search(r"ERROR=(\d+)", dbt_summary_line) + if match: + dbt_errors = int(match.group(1)) + # Check if dbt pipeline returned errors + if dbt_errors > 0: + logging.info("Could not find dbt transient errors, skipping retry") + return False + else: + logging.info( + "dbt pipeline finished without errors, task failed but will not retry" + ) + return False + # Logic could not identify the error and assumes it is transient + logging.info("Task failed due to unforeseen error, proceeding to retry") + return True + + +def read_log_file(log_file_path): + max_retries = 3 + retry_delay = 30 + log_contents = [] + + for attempt in range(max_retries): + try: + with open(log_file_path, "r") as log_file: + log_contents = log_file.readlines() + break + except FileNotFoundError as file_error: + if attempt < max_retries - 1: + logging.warn( + f"Log file {log_file_path} not found retrying in {retry_delay} seconds..." + ) + time.sleep(retry_delay) + else: + logging.error(file_error) + raise + return log_contents + + +def skip_retry_dbt_errors(context) -> None: + """ + Set task state to SKIPPED in case errors found in dbt are not transient. + """ + if not check_dbt_transient_errors(context): + ti = context["ti"] + logging.info( + f"Set task instance {ti} state to \ + {TaskInstanceState.SKIPPED} to skip retrying" + ) + ti.set_state(TaskInstanceState.SKIPPED) + return + else: + return diff --git a/documentation/images/history_table_export.png b/documentation/images/history_table_export.png index e02ce0ca..16c2952b 100644 Binary files a/documentation/images/history_table_export.png and b/documentation/images/history_table_export.png differ diff --git a/documentation/images/state_table_export.png b/documentation/images/state_table_export.png index 55683987..f1bef0d5 100644 Binary files a/documentation/images/state_table_export.png and b/documentation/images/state_table_export.png differ