Skip to content

Commit

Permalink
Ensure matched, then unlinked ids don't get rematched in create_match…
Browse files Browse the repository at this point in the history
…_keys
  • Loading branch information
jmelot committed Feb 20, 2024
1 parent d16494c commit 48bcd51
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 14 deletions.
7 changes: 7 additions & 0 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/unlink/data*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_ids_to_drop",
source_project_dataset_table=f"{staging_dataset}.ids_to_drop",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/ids_to_drop/data*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
]

# Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the
Expand Down Expand Up @@ -362,6 +368,7 @@
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
]
Expand Down
1 change: 1 addition & 0 deletions sequences/merge_combined_metadata.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ metadata_match
all_match_pairs_with_um
simhash_input
lid_input
ids_to_drop
6 changes: 6 additions & 0 deletions sql/ids_to_drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
select distinct
merged_id
from
literature.sources
where
orig_id in (select id1 from staging_literature.unlink)
1 change: 1 addition & 0 deletions tests/static/test_create_match_keys/ids_to_drop/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"merged_id": "carticle_0000000003"}
2 changes: 2 additions & 0 deletions tests/static/test_create_match_keys/input/input.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
{"orig_id": "D", "merged_id": "carticle_0000000002"}
{"orig_id": "E", "merged_id": "carticle_0000000002"}
{"orig_id": "F", "merged_id": "carticle_0000000001"}
{"orig_id": "I", "merged_id": "carticle_0000000003"}
{"orig_id": "J", "merged_id": "carticle_0000000003"}
26 changes: 16 additions & 10 deletions tests/test_create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,32 @@ def test_skip_matches(self):
)

def test_create_match_keys(self):
# the first set will contain two old elts from the same match set and one new elt; should keep its id
# the next will contain one elt from one match set, two from another; should change ids
# the last will contain only new ids; should get a new id
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}]
# The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id.
# The next (D, E, F) contains one elt from one match set, two from another; should change ids.
# Another (G, H) contains only new ids; should get a new id.
# The last two (I and J) are two different match sets that share an old id and are in ids_to_drop;
# each should get a new id (this is in case of unlinking).
match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}]
out_dir = os.path.join(static_dir, "test_create_match_keys", "output")
if os.path.exists(out_dir):
shutil.rmtree(out_dir)
os.mkdir(out_dir)
out_fi = os.path.join(out_dir, "output.jsonl")
id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input")
ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop")
expected_output = [
{"orig_id": "A", "merged_id": "carticle_0000000001"},
{"orig_id": "B", "merged_id": "carticle_0000000001"},
{"orig_id": "C", "merged_id": "carticle_0000000001"},
{"orig_id": "D", "merged_id": "carticle_0000000003"},
{"orig_id": "E", "merged_id": "carticle_0000000003"},
{"orig_id": "F", "merged_id": "carticle_0000000003"},
{"orig_id": "G", "merged_id": "carticle_0000000004"},
{"orig_id": "H", "merged_id": "carticle_0000000004"},
{"orig_id": "D", "merged_id": "carticle_0000000004"},
{"orig_id": "E", "merged_id": "carticle_0000000004"},
{"orig_id": "F", "merged_id": "carticle_0000000004"},
{"orig_id": "G", "merged_id": "carticle_0000000005"},
{"orig_id": "H", "merged_id": "carticle_0000000005"},
{"orig_id": "I", "merged_id": "carticle_0000000006"},
{"orig_id": "J", "merged_id": "carticle_0000000007"},
]
create_match_keys(match_sets, out_fi, id_mapping_dir)
print(expected_output)
create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir)
out = [json.loads(x) for x in open(out_fi).readlines()]
self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"]))
18 changes: 15 additions & 3 deletions utils/create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ def create_match_sets(


def create_match_keys(
match_sets: list, match_file: str, prev_id_mapping_dir: str = None
match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None
):
"""
Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match
set to that id
:param match_sets: list of match sets
:param match_file: file where id mapping should be written
:param ids_to_drop: directory containing merged ids that should not be used in jsonl form
:param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form
:return: None
"""
Expand All @@ -152,6 +153,12 @@ def create_match_keys(
prev_orig_to_merg[orig_id] = merg_id
if merg_id > max_merg:
max_merg = merg_id
ignore_ids = set()
for fi in os.listdir(ids_to_drop):
with open(os.path.join(ids_to_drop, fi)) as f:
for line in f:
js = json.loads(line.strip())
ignore_ids.add(js["merged_id"])
match_id = int(max_merg.split("carticle_")[1]) + 1
num_new, num_old = 0, 0
for ms in match_sets:
Expand All @@ -162,7 +169,7 @@ def create_match_keys(
existing_ids = set(
[prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg]
)
if len(existing_ids) == 1:
if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids:
cset_article_id = existing_ids.pop()
num_old += 1
else:
Expand All @@ -189,6 +196,11 @@ def create_match_keys(
required=True,
help="directory of article pairs that should not be matched",
)
parser.add_argument(
"--ids_to_drop",
required=True,
help="file containing ids that should not be used",
)
parser.add_argument(
"--merge_file", required=True, help="file where merged ids should be written"
)
Expand All @@ -209,4 +221,4 @@ def create_match_keys(
match_sets = create_match_sets(
args.match_dir, args.current_ids_dir, args.exclude_dir
)
create_match_keys(match_sets, args.merge_file, args.prev_id_mapping_dir)
create_match_keys(match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir)
2 changes: 1 addition & 1 deletion utils/run_ids_scripts.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cd /mnt/disks/data/run
gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done
python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/
/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/
Expand Down

0 comments on commit 48bcd51

Please sign in to comment.