Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split merged ids if they lose orig ids #46

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 5 additions & 38 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,7 @@
BigQueryToGCSOperator(
task_id="export_article_pairs",
source_project_dataset_table=f"{staging_dataset}.all_match_pairs_with_um",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/article_pairs/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_simhash_input",
source_project_dataset_table=f"{staging_dataset}.simhash_input",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
Expand All @@ -344,8 +338,7 @@
),
]

# Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the
# merge ids
# Start up godzilla of article linkage, create the merged ids
gce_instance_start = ComputeEngineStartInstanceOperator(
project_id=project_id,
zone=gce_zone,
Expand All @@ -364,14 +357,10 @@
"rm -rf current_ids",
"mkdir input_data",
"mkdir current_ids",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/article_pairs .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
]
prep_environment_vm_script = " && ".join(prep_environment_script_sequence)

Expand All @@ -380,24 +369,9 @@
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"',
)

update_simhash_index = BashOperator(
task_id="update_simhash_index",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"',
)

wait_for_simhash_index = GCSObjectExistenceSensor(
task_id="wait_for_simhash_index",
bucket=DATA_BUCKET,
object=f"{tmp_dir}/done_files/simhash_is_done",
deferrable=True,
)

create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
# These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task -
# but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files
# outside the VM it runs on and this task depends on all of these things, directly or indirectly
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=production_dataset, table_id="sources"
Expand All @@ -407,11 +381,6 @@
dataset_id=staging_dataset,
table_id="all_match_pairs_with_um",
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="simhash_input",
),
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="unlink"
),
Expand Down Expand Up @@ -483,7 +452,7 @@
import_id_mapping = GCSToBigQueryOperator(
task_id="import_id_mapping",
bucket=bucket,
source_objects=[f"{tmp_dir}/id_mapping.jsonl"],
source_objects=[f"{tmp_dir}/new_id_mappings/*"],
brianlove marked this conversation as resolved.
Show resolved Hide resolved
schema_object=f"{schema_dir}/id_mapping.json",
destination_project_dataset_table=f"{staging_dataset}.id_mapping",
source_format="NEWLINE_DELIMITED_JSON",
Expand Down Expand Up @@ -577,7 +546,7 @@
BigQueryCheckOperator(
task_id="all_trivial_matches_survived",
sql=f"""
-- check that all article pairs generated by exact matches make it through the simhash and
-- check that all article pairs generated by exact matches make it through the
-- merged id assignment, except ones we've deliberately unlinked
select
count(0) = 0
Expand Down Expand Up @@ -696,8 +665,6 @@
>> heavy_compute_inputs
>> gce_instance_start
>> prep_environment
>> update_simhash_index
>> wait_for_simhash_index
>> create_cset_ids
>> wait_for_cset_ids
>> gce_instance_stop
Expand Down
1 change: 0 additions & 1 deletion sequences/merge_combined_metadata.tsv
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
arxiv_id_match
metadata_match
all_match_pairs_with_um
simhash_input
lid_input
ids_to_drop
18 changes: 0 additions & 18 deletions sql/simhash_input.sql

This file was deleted.

4 changes: 4 additions & 0 deletions tests/static/test_create_match_keys/input/input.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
{"orig_id": "F", "merged_id": "carticle_0000000001"}
{"orig_id": "I", "merged_id": "carticle_0000000003"}
{"orig_id": "J", "merged_id": "carticle_0000000003"}
{"orig_id": "K", "merged_id": "carticle_0000000004"}
{"orig_id": "L", "merged_id": "carticle_0000000004"}
{"orig_id": "M", "merged_id": "carticle_0000000005"}
{"orig_id": "N", "merged_id": "carticle_0000000005"}
3 changes: 0 additions & 3 deletions tests/static/test_get_match_sets_with_extra_id/ids/ids1.jsonl

This file was deleted.

3 changes: 0 additions & 3 deletions tests/static/test_get_match_sets_with_extra_id/ids/ids2.jsonl

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

83 changes: 39 additions & 44 deletions tests/test_create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest

from utils.create_merge_ids import create_match_keys, create_match_sets
from utils.create_merge_ids import create_match_sets, create_matches

static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")

Expand Down Expand Up @@ -43,24 +43,6 @@ def test_get_combined_map5(self):
actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k))
self.assertEqual(actual_result, expected_result)

def test_get_match_sets_with_extra_id(self):
brianlove marked this conversation as resolved.
Show resolved Hide resolved
# test with three disconnected sets. The set A - E will have one extra id (E) that should get filtered, and
# the "small" set F-H will all be extra ids that should be filtered. The other small set I-J will have ids
# distributed across two id files, but the set should be included.
match_dir = os.path.join(
static_dir, "test_get_match_sets_with_extra_id", "match_pairs"
)
ids_dir = os.path.join(static_dir, "test_get_match_sets_with_extra_id", "ids")
result_set_large = {"A", "B", "C", "D"}
result_set_small = {"I", "J"}
expected_result = sorted(
[result_set_small, result_set_large], key=lambda k: len(k)
)
actual_result = sorted(
create_match_sets(match_dir, ids_dir), key=lambda k: len(k)
)
self.assertEqual(actual_result, expected_result)

def test_skip_matches(self):
# test without matches excluded
match_dir = os.path.join(static_dir, "test_skip_matches_ids")
Expand All @@ -74,33 +56,46 @@ def test_skip_matches(self):
expected_result_excludes,
)

def test_create_match_keys(self):
# The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id.
# The next (D, E, F) contains one elt from one match set, two from another; should change ids.
# Another (G, H) contains only new ids; should get a new id.
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id (this is in case of unlinking).
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}]
out_dir = os.path.join(static_dir, "test_create_match_keys", "output")
if os.path.exists(out_dir):
shutil.rmtree(out_dir)
os.mkdir(out_dir)
out_fi = os.path.join(out_dir, "output.jsonl")
def test_create_matches(self):
match_sets = [
{"A", "B", "C"},
{"D", "E", "F"},
{"G", "H"},
{"I"},
{"J"},
{"K", "L"},
{"M", "N", "O"},
]
id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input")
ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop")
expected_output = [
{"orig_id": "A", "merged_id": "carticle_0000000001"},
{"orig_id": "B", "merged_id": "carticle_0000000001"},
{"orig_id": "C", "merged_id": "carticle_0000000001"},
{"orig_id": "D", "merged_id": "carticle_0000000004"},
{"orig_id": "E", "merged_id": "carticle_0000000004"},
{"orig_id": "F", "merged_id": "carticle_0000000004"},
{"orig_id": "G", "merged_id": "carticle_0000000005"},
{"orig_id": "H", "merged_id": "carticle_0000000005"},
{"orig_id": "I", "merged_id": "carticle_0000000006"},
{"orig_id": "J", "merged_id": "carticle_0000000007"},
# F was removed from this match set so A B and C should get a new merged id
{"orig_id": "A", "merged_id": "carticle_0000000006"},
{"orig_id": "B", "merged_id": "carticle_0000000006"},
{"orig_id": "C", "merged_id": "carticle_0000000006"},
# D, E, F contains one elt from one match set, two from another; should change ids
{"orig_id": "D", "merged_id": "carticle_0000000007"},
{"orig_id": "E", "merged_id": "carticle_0000000007"},
{"orig_id": "F", "merged_id": "carticle_0000000007"},
# G, H is a completely new match set with new ids, should get a new id
{"orig_id": "G", "merged_id": "carticle_0000000008"},
{"orig_id": "H", "merged_id": "carticle_0000000008"},
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id
{"orig_id": "I", "merged_id": "carticle_0000000009"},
{"orig_id": "J", "merged_id": "carticle_0000000010"},
# Nothing changed for this match set so the merged id stays the same
{"orig_id": "K", "merged_id": "carticle_0000000004"},
{"orig_id": "L", "merged_id": "carticle_0000000004"},
# This match set got one new article so the merged id stays the same
{"orig_id": "M", "merged_id": "carticle_0000000005"},
{"orig_id": "N", "merged_id": "carticle_0000000005"},
{"orig_id": "O", "merged_id": "carticle_0000000005"},
]
print(expected_output)
create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir)
out = [json.loads(x) for x in open(out_fi).readlines()]
self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"]))
match_batches = create_matches(match_sets, ids_to_drop, id_mapping_dir)
matches = []
for match_batch, batch_id in match_batches:
matches.extend(match_batch)
print(sorted(matches, key=lambda x: x["orig_id"]))
self.assertEqual(expected_output, sorted(matches, key=lambda x: x["orig_id"]))
Loading
Loading