Skip to content

Commit

Permalink
Add "article_links_nested", "paper_references_merged" to snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
jmelot committed Apr 23, 2021
1 parent cba3577 commit 31e0c32
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,19 @@
)
)

snapshot_table = f"{backup_dataset}.article_links_"+datetime.now().strftime("%Y%m%d")
# mk the snapshot predictions table
snapshot = BigQueryToBigQueryOperator(
task_id="mk_snapshot",
source_project_dataset_tables=[f"{staging_dataset}.article_links"],
destination_project_dataset_table=snapshot_table,
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)
wait_for_production_copy = DummyOperator(task_id="wait_for_production_copy")

snapshots = []
curr_date = datetime.now().strftime("%Y%m%d")
for table in ["article_links", "article_links_nested", "paper_references_merged"]:
# mk the snapshot predictions table
snapshots.append(BigQueryToBigQueryOperator(
task_id=f"snapshot_{table}",
source_project_dataset_tables=[f"{production_dataset}.{table}"],
destination_project_dataset_table=f"{backup_dataset}.{table}_{curr_date}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
))

success_alert = SlackWebhookOperator(
task_id="post_success",
Expand All @@ -468,5 +472,5 @@
(last_combination_query >> heavy_compute_inputs >> gce_instance_start >> [create_cset_ids, run_lid] >>
gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries)

(last_transform_query >> check_queries >> start_production_cp >> push_to_production >> snapshot >>
success_alert >> downstream_tasks)
(last_transform_query >> check_queries >> start_production_cp >> push_to_production >> wait_for_production_copy >>
snapshots >> success_alert >> downstream_tasks)

0 comments on commit 31e0c32

Please sign in to comment.