Skip to content

Commit

Permalink
Speed up and simplify linking script
Browse files Browse the repository at this point in the history
  • Loading branch information
jmelot committed Aug 23, 2024
1 parent 017ac4d commit 2918847
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 124 deletions.
6 changes: 3 additions & 3 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@
BigQueryToGCSOperator(
task_id="export_article_pairs",
source_project_dataset_table=f"{staging_dataset}.all_match_pairs_with_um",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/article_pairs/article_pairs*.jsonl",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
Expand Down Expand Up @@ -364,7 +364,7 @@
"rm -rf current_ids",
"mkdir input_data",
"mkdir current_ids",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/article_pairs .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
Expand Down Expand Up @@ -483,7 +483,7 @@
import_id_mapping = GCSToBigQueryOperator(
task_id="import_id_mapping",
bucket=bucket,
source_objects=[f"{tmp_dir}/id_mapping.jsonl"],
source_objects=[f"{tmp_dir}/new_id_mappings/*"],
schema_object=f"{schema_dir}/id_mapping.json",
destination_project_dataset_table=f"{staging_dataset}.id_mapping",
source_format="NEWLINE_DELIMITED_JSON",
Expand Down
265 changes: 148 additions & 117 deletions utils/create_merge_ids.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import argparse
import json
import os

"""
Creates match sets from pairs of linked articles, assigns each match set an id, and writes out a mapping from
each id to each article in its match set.
"""

import argparse
import json
import multiprocessing
import os


def create_cset_article_id(idx: int):
"""
Expand Down Expand Up @@ -38,24 +39,6 @@ def get_connected_edges(adj_list: dict, key: str) -> set:
return conn_edges


def get_usable_ids(ids_dir: str) -> set:
"""
Get the set of usable ids from a directory of jsonl, or returns None if ids_dir is None
:param ids_dir: None or name of a directory containing jsonl with one key, "id", which are the ids we want to keep
:return: the set of usable ids, or None
"""
if ids_dir is None:
return None
usable_ids = set()
for fi in os.listdir(ids_dir):
print("reading " + fi)
with open(os.path.join(ids_dir, fi)) as f:
for line in f:
js = json.loads(line)
usable_ids.add(js["id1"])
return usable_ids


def get_exclude_matches(exclude_dir: str) -> dict:
"""
Build dict mapping ids to sets of other ids they should not be matched to
Expand All @@ -79,41 +62,45 @@ def get_exclude_matches(exclude_dir: str) -> dict:


def create_match_sets(
match_dir: str, current_ids_dir: str = None, exclude_dir: str = None
exact_match_dir: str, simhash_match_dir: str, exclude_dir: str = None
) -> list:
"""
Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles,
including "transitive matches".
:param match_dir: directory of exported jsonl files containing article matches
:param current_ids_dir: optional dir containing the current set of ids to use in jsonl form. If None, all ids will be used
including "transitive matches". We will use the ids present in the exact matches to filter the simhash matches,
since it's possible for obsolete ids to live on in the simhash index
:param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: list of sets of matched articles
"""
print("reading pairs to not match")
dont_match = get_exclude_matches(exclude_dir)
print("getting adjacency lists")
adj_list = {}
usable_ids = get_usable_ids(current_ids_dir)
for fi in os.listdir(match_dir):
with open(os.path.join(match_dir, fi)) as f:
for line in f:
js = json.loads(line)
key1 = js["id1"]
key2 = js["id2"]
if (usable_ids is not None) and (
(key1 not in usable_ids) or (key2 not in usable_ids)
):
continue
if key1 not in adj_list:
adj_list[key1] = set()
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
usable_ids = set()
for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]:
for fi in os.listdir(match_dir):
with open(os.path.join(match_dir, fi)) as f:
for line in f:
js = json.loads(line)
key1 = js["id1"]
key2 = js["id2"]
if is_simhash:
if (key1 not in usable_ids) or (key2 not in usable_ids):
continue
else:
usable_ids.add(key1)
usable_ids.add(key2)
if key1 not in adj_list:
adj_list[key1] = set()
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
seen_ids = set()
match_sets = []
for k in adj_list.keys():
Expand All @@ -127,78 +114,125 @@ def create_match_sets(
return match_sets


def create_match_keys(
match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None
):
def create_matches(
match_sets: list, ids_to_drop: str, prev_id_mapping_dir: str = None
) -> iter:
"""
Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match
set to that id
:param match_sets: list of match sets
:param match_file: file where id mapping should be written
:param ids_to_drop: directory containing merged ids that should not be used in jsonl form
:param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form
:return: None
:return: a generator of tuples with two elements: a list of jsons containing orig_id, merged_id matches to be
written, and an identifier for the batch
"""
print("Creating merged ids")
with open(match_file, mode="w") as out:
prev_orig_to_merg = {}
merg_to_orig = {}
max_merg = "carticle_0"
if prev_id_mapping_dir is not None:
for fi in os.listdir(prev_id_mapping_dir):
with open(os.path.join(prev_id_mapping_dir, fi)) as f:
for line in f:
js = json.loads(line.strip())
orig_id = js["orig_id"]
merg_id = js["merged_id"]
assert orig_id not in prev_orig_to_merg
prev_orig_to_merg[orig_id] = merg_id
if merg_id not in merg_to_orig:
merg_to_orig[merg_id] = set()
merg_to_orig[merg_id].add(orig_id)
if merg_id > max_merg:
max_merg = merg_id
ignore_ids = set()
for fi in os.listdir(ids_to_drop):
with open(os.path.join(ids_to_drop, fi)) as f:
prev_orig_to_merg = {}
merg_to_orig = {}
max_merg = "carticle_0"
if prev_id_mapping_dir is not None:
for fi in os.listdir(prev_id_mapping_dir):
with open(os.path.join(prev_id_mapping_dir, fi)) as f:
for line in f:
js = json.loads(line.strip())
ignore_ids.add(js["merged_id"])
match_id = int(max_merg.split("carticle_")[1]) + 1
num_new, num_old = 0, 0
for ms in match_sets:
cset_article_id = None
# if we have exactly one existing id, reuse it, even if new articles are matched to it.
# if two articles that previously had different carticle ids are now in the same match set,
# create a new carticle id
existing_ids = set(
[prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg]
)
if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids:
cset_article_id = existing_ids.pop()
# In some cases, merged ids can "split apart", if their constituent articles no longer
# match. We'll detect this case by checking whether the old set of articles assigned to
# this merged id contain any entries missing from our current set
if cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) == 0):
num_old += 1
else:
cset_article_id = create_cset_article_id(match_id)
num_new += 1
match_id += 1
orig_id = js["orig_id"]
merg_id = js["merged_id"]
assert orig_id not in prev_orig_to_merg
prev_orig_to_merg[orig_id] = merg_id
if merg_id not in merg_to_orig:
merg_to_orig[merg_id] = set()
merg_to_orig[merg_id].add(orig_id)
if merg_id > max_merg:
max_merg = merg_id
ignore_ids = set()
for fi in os.listdir(ids_to_drop):
with open(os.path.join(ids_to_drop, fi)) as f:
for line in f:
js = json.loads(line.strip())
ignore_ids.add(js["merged_id"])
match_id = int(max_merg.split("carticle_")[1]) + 1
num_new, num_old = 0, 0
batch_size = 1_000_000
batch_count = 0
batch = []
for ms in match_sets:
cset_article_id = None
# if we have exactly one existing id, reuse it, even if new articles are matched to it.
# if two articles that previously had different carticle ids are now in the same match set,
# create a new carticle id
existing_ids = set([prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg])
if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids:
cset_article_id = existing_ids.pop()
# In some cases, merged ids can "split apart", if their constituent articles no longer
# match. We'll detect this case by checking whether the old set of articles assigned to
# this merged id contain any entries missing from our current set
if cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) == 0):
num_old += 1
else:
cset_article_id = create_cset_article_id(match_id)
num_new += 1
match_id += 1
for article in ms:
out.write(
json.dumps({"merged_id": cset_article_id, "orig_id": article})
+ "\n"
)
print(f"wrote {num_new} new ids and reused {num_old} ids")
match = {"merged_id": cset_article_id, "orig_id": article}
if len(batch) == batch_size:
yield batch, batch_count
batch = [match]
batch_count += 1
else:
batch.append(match)
yield batch, batch_count


def write_batch(match_batch: tuple, output_dir: str) -> None:
"""
Write a batch of matches to disk
:param match_batch: tuple of a list of jsons containing a merged id and orig id, and an identifier for the batch
:param output_dir: directory where matches should be written
:return: None
"""
matches, batch_id = match_batch
with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f:
for match in matches:
f.write(json.dumps(match) + "\n")


def write_matches(
exact_match_dir,
simhash_match_dir,
exclude_dir,
ids_to_drop,
prev_id_mapping_dir,
output_dir,
) -> None:
"""
Generate merged id-orig id pairs and write them out as a directory of jsonls
:param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash
:param exclude_dir: directory of article pairs that should not be matched
:param ids_to_drop: file containing ids that should not be used
:param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids
:param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written
:return: None
"""
match_sets = create_match_sets(exact_match_dir, simhash_match_dir, exclude_dir)
match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with multiprocessing.Pool() as p:
p.starmap(write_batch, ((mb, output_dir) for mb in match_batches))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--match_dir",
"--exact_match_dir",
required=True,
help="directory of jsonls containing matched orig_ids from exact metadata match",
)
parser.add_argument(
"--simhash_match_dir",
required=True,
help="directory of exported jsonl from bigquery containing pairs of article matches",
help="directory of jsonls containing matched orig_ids from simhash",
)
parser.add_argument(
"--exclude_dir",
Expand All @@ -210,26 +244,23 @@ def create_match_keys(
required=True,
help="file containing ids that should not be used",
)
parser.add_argument(
"--merge_file", required=True, help="file where merged ids should be written"
)
parser.add_argument(
"--prev_id_mapping_dir",
help="directory of exported jsonl from bigquery containing pairs of article matches",
help="directory of jsonl containing previous mapping between orig ids and merged ids",
)
parser.add_argument(
"--current_ids_dir",
help=(
"directory containing jsonl with one key, 'id'. "
"These are the ids that should be included in output. "
"If None, no ids will be filtered."
),
"--output_dir",
required=True,
help="directory where jsonls containing new mappings between orig ids and "
"merged ids should be written",
)
args = parser.parse_args()

match_sets = create_match_sets(
args.match_dir, args.current_ids_dir, args.exclude_dir
)
create_match_keys(
match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir
write_matches(
args.exact_match_dir,
args.simhash_match_dir,
args.exclude_dir,
args.ids_to_drop,
args.prev_id_mapping_dir,
args.output_dir,
)
4 changes: 2 additions & 2 deletions utils/run_ids_scripts.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cd /mnt/disks/data/run
gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done
python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs
/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/
python3 create_merge_ids.py --exact_match_dir exact_matches --simhash_match_dir simhash_results --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings
/snap/bin/gsutil -m cp -r new_id_mappings gs://airflow-data-exchange/article_linkage/tmp/
/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/
/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/
Expand Down
2 changes: 0 additions & 2 deletions utils/run_simhash_scripts.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
cd /mnt/disks/data/run
gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/simhash_is_done
python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes
cp -r article_pairs usable_ids
cp simhash_results/* article_pairs/
touch simhash_is_done
gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/

0 comments on commit 2918847

Please sign in to comment.