diff --git a/linkage_dag.py b/linkage_dag.py index 20c0359..adf3a8b 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -41,6 +41,7 @@ get_post_success, ) from dataloader.airflow_utils.utils import clear_gcs_dir +from dataloader.scripts.clean_backups import clean_backups from dataloader.scripts.populate_documentation import update_table_descriptions production_dataset = "literature" @@ -629,18 +630,23 @@ # We're done! Checks passed, so copy to production and post success to slack start_production_cp = DummyOperator(task_id="start_production_cp") + update_archive = PythonOperator( + task_id="update_archive", + op_kwargs={"dataset": backup_dataset, "backup_prefix": production_dataset}, + python_callable=clean_backups, + ) success_alert = get_post_success("Article linkage update succeeded!", dag) + trigger_org_fixes = TriggerDagRunOperator( + task_id="trigger_org_fixes", + trigger_dag_id="org_fixes", + ) + curr_date = datetime.now().strftime("%Y%m%d") with open( f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json" ) as f: table_desc = json.loads(f.read()) - trigger_org_fixes = TriggerDagRunOperator( - task_id="trigger_org_fixes", - trigger_dag_id="org_fixes", - ) - for table in production_tables: push_to_production = BigQueryToBigQueryOperator( task_id="copy_" + table.lower(), @@ -670,6 +676,7 @@ >> push_to_production >> snapshot >> pop_descriptions + >> update_archive >> success_alert >> trigger_org_fixes ) @@ -691,7 +698,7 @@ create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", ) - start_production_cp >> copy_cld2 >> snapshot_cld2 >> success_alert + start_production_cp >> copy_cld2 >> snapshot_cld2 >> update_archive # task structure clear_tmp_dir >> metadata_sequences_start