Skip to content

Commit

Permalink
Refactor simhash and ids update into multiple tasks; add completion s…
Browse files Browse the repository at this point in the history
…ensors for long-running tasks
  • Loading branch information
jmelot committed Jan 12, 2024
1 parent 5f6a524 commit 66d1e5c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
62 changes: 44 additions & 18 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
Expand Down Expand Up @@ -337,7 +338,7 @@
task_id="start-" + gce_resource_id,
)

vm_script_sequence = [
prep_environment_script_sequence = [
"cd /mnt/disks/data",
"rm -rf run",
"mkdir run",
Expand All @@ -353,12 +354,27 @@
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
(
"python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes "
"--new_simhash_indexes new_simhash_indexes"
),
"cp -r article_pairs usable_ids",
"cp simhash_results/* article_pairs/",
]
prep_environment_vm_script = " && ".join(prep_environment_script_sequence)

prep_environment = BashOperator(
task_id="prep_environment",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"',
)

update_simhash_index = BashOperator(
task_id="update_simhash_index",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &"',
)

wait_for_simhash_index = GCSObjectExistenceSensor(
task_id="wait_for_simhash_index",
bucket=DATA_BUCKET,
object=f"{tmp_dir}/done_files/simhash_is_done",
deferrable=True
)

ids_script_sequence = [
(
"python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping "
"--merge_file id_mapping.jsonl --current_ids_dir article_pairs"
Expand All @@ -367,11 +383,23 @@
f"/snap/bin/gsutil -m cp simhash_results/* gs://{bucket}/{gcs_folder}/simhash_results/",
f"/snap/bin/gsutil -m cp new_simhash_indexes/* gs://{bucket}/{gcs_folder}/simhash_indexes/",
]
vm_script = " && ".join(vm_script_sequence)
ids_vm_script = " && ".join(ids_script_sequence)

create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{vm_script}"',
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{ids_vm_script}"',
)

wait_for_cset_ids = GCSObjectExistenceSensor(
task_id="wait_for_cset_ids",
bucket=DATA_BUCKET,
object=f"{tmp_dir}/done_files/ids_are_done",
deferrable=True
)

push_to_gcs = BashOperator(
task_id="push_to_gcs",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "run_ids_script.sh &"',
)

# while the carticle ids are updating, run lid on the titles and abstracts
Expand Down Expand Up @@ -614,14 +642,12 @@
>> wait_for_combine
)

(
last_combination_query
>> heavy_compute_inputs
>> gce_instance_start
>> [create_cset_ids, run_lid]
>> gce_instance_stop
>> [import_id_mapping, import_lid]
>> start_final_transform_queries
)
(last_combination_query >> heavy_compute_inputs >> gce_instance_start >> prep_environment >>
update_simhash_index >> wait_for_simhash_index >> create_cset_ids >> wait_for_cset_ids >> push_to_gcs >>
gce_instance_stop)

gce_instance_start >> run_lid >> gce_instance_stop

gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries

last_transform_query >> check_queries >> start_production_cp
7 changes: 7 additions & 0 deletions utils/run_ids_scripts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/
/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/
touch ids_are_done
gsutil cp ids_are_done gs://airflow-data-exchange/article_linkage/tmp/done_files/
5 changes: 5 additions & 0 deletions utils/run_simhash_scripts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes
cp -r article_pairs usable_ids
cp simhash_results/* article_pairs/
touch simhash_is_done
gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/

0 comments on commit 66d1e5c

Please sign in to comment.