Skip to content

Commit

Permalink
Updated the Documentation images
Browse files Browse the repository at this point in the history
Removed redundant code
added checks for missing key values
  • Loading branch information
harsha-stellar-data committed Jul 12, 2024
1 parent d0a2c2f commit 1475901
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
27 changes: 19 additions & 8 deletions dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@
use_captive_core = literal_eval(Variable.get("use_captive_core"))
txmeta_datastore_path = "{{ var.value.txmeta_datastore_path }}"

# Ensure all required keys are present in table_names
required_keys = [
"accounts",
"claimable_balances",
"offers",
"liquidity_pools",
"signers",
"trustlines",
"contract_data",
"contract_code",
"config_settings",
"ttl",
]

missing_keys = [key for key in required_keys if key not in table_names]
if missing_keys:
raise KeyError(f"Missing Id in the table_ids Airflow Variable: {missing_keys}")

"""
The date task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
Expand Down Expand Up @@ -126,14 +144,7 @@
"ttl": "/*-ttl.txt",
}

# filter for only the required tables pertaining to the DAG
table_id_and_suffixes = {
key: suffix
for key, suffix in source_object_suffix_mapping.items()
if key in table_names
}

for table_id, source_object_suffix in table_id_and_suffixes.items():
for table_id, source_object_suffix in source_object_suffix_mapping.items():
table_name = table_names[table_id] # Get the expanded table name
task_vars = initialize_task_vars(
table_id,
Expand Down
4 changes: 2 additions & 2 deletions dags/stellar_etl_airflow/build_del_ins_from_gcs_to_bq_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ def build_del_ins_from_gcs_to_bq_task(
if table_name == "history_assets":
staging_table_suffix = "_staging"

schema_fields = read_local_schema(f"{table_name}")

if table_id in history_tables:
schema_fields = read_local_schema(f"history_{table_id}")
gcs_to_bq_operator = CustomGCSToBigQueryOperator(
task_id=f"send_{table_name}_to_bq",
execution_timeout=timedelta(
Expand Down Expand Up @@ -172,7 +173,6 @@ def build_del_ins_from_gcs_to_bq_task(
dag=dag,
)
else:
schema_fields = read_local_schema(f"{table_name}")
gcs_to_bq_operator = GCSToBigQueryOperator(
task_id=f"send_{table_name}_to_bq",
execution_timeout=timedelta(
Expand Down
Binary file modified documentation/images/history_table_export.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified documentation/images/state_table_export.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 1475901

Please sign in to comment.