Skip to content

Commit

Permalink
Remove simhash
Browse files Browse the repository at this point in the history
  • Loading branch information
jmelot committed Sep 25, 2024
1 parent 125e966 commit c132c82
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 485 deletions.
37 changes: 2 additions & 35 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,6 @@
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_simhash_input",
source_project_dataset_table=f"{staging_dataset}.simhash_input",
destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl",
export_format="NEWLINE_DELIMITED_JSON",
),
BigQueryToGCSOperator(
task_id="export_lid_input",
source_project_dataset_table=f"{staging_dataset}.lid_input",
Expand All @@ -344,8 +338,7 @@
),
]

# Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the
# merge ids
# Start up godzilla of article linkage, create the merged ids
gce_instance_start = ComputeEngineStartInstanceOperator(
project_id=project_id,
zone=gce_zone,
Expand All @@ -365,13 +358,9 @@
"mkdir input_data",
"mkdir current_ids",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .",
f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .",
"mkdir new_simhash_indexes",
]
prep_environment_vm_script = " && ".join(prep_environment_script_sequence)

Expand All @@ -380,24 +369,9 @@
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"',
)

update_simhash_index = BashOperator(
task_id="update_simhash_index",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"',
)

wait_for_simhash_index = GCSObjectExistenceSensor(
task_id="wait_for_simhash_index",
bucket=DATA_BUCKET,
object=f"{tmp_dir}/done_files/simhash_is_done",
deferrable=True,
)

create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
# These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task -
# but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files
# outside the VM it runs on and this task depends on all of these things, directly or indirectly
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=production_dataset, table_id="sources"
Expand All @@ -407,11 +381,6 @@
dataset_id=staging_dataset,
table_id="all_match_pairs_with_um",
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="simhash_input",
),
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="unlink"
),
Expand Down Expand Up @@ -577,7 +546,7 @@
BigQueryCheckOperator(
task_id="all_trivial_matches_survived",
sql=f"""
-- check that all article pairs generated by exact matches make it through the simhash and
-- check that all article pairs generated by exact matches make it through the
-- merged id assignment, except ones we've deliberately unlinked
select
count(0) = 0
Expand Down Expand Up @@ -696,8 +665,6 @@
>> heavy_compute_inputs
>> gce_instance_start
>> prep_environment
>> update_simhash_index
>> wait_for_simhash_index
>> create_cset_ids
>> wait_for_cset_ids
>> gce_instance_stop
Expand Down
1 change: 0 additions & 1 deletion sequences/merge_combined_metadata.tsv
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
arxiv_id_match
metadata_match
all_match_pairs_with_um
simhash_input
lid_input
ids_to_drop
18 changes: 0 additions & 18 deletions sql/simhash_input.sql

This file was deleted.

Empty file.

This file was deleted.

This file was deleted.

This file was deleted.

47 changes: 7 additions & 40 deletions tests/test_create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from utils.create_merge_ids import create_match_sets, create_matches

static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
empty_simhash_dir = os.path.join(static_dir, "simhash_empty")


class TestGetCombinedMap(unittest.TestCase):
Expand All @@ -16,30 +15,22 @@ class TestGetCombinedMap(unittest.TestCase):
def test_get_combined_map1(self):
match_dir = os.path.join(static_dir, "test_get_combined_map1")
expected_result = [{"A", "B", "C"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir), expected_result
)
self.assertEqual(create_match_sets(match_dir), expected_result)

def test_get_combined_map2(self):
match_dir = os.path.join(static_dir, "test_get_combined_map2")
expected_result = [{"A", "B", "C", "D"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir), expected_result
)
self.assertEqual(create_match_sets(match_dir), expected_result)

def test_get_combined_map3(self):
match_dir = os.path.join(static_dir, "test_get_combined_map3")
expected_result = [{"A", "B", "C", "D", "E"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir), expected_result
)
self.assertEqual(create_match_sets(match_dir), expected_result)

def test_get_combined_map4(self):
match_dir = os.path.join(static_dir, "test_get_combined_map4")
expected_result = [{"A", "B", "C", "D", "E", "F", "G", "H"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir), expected_result
)
self.assertEqual(create_match_sets(match_dir), expected_result)

def test_get_combined_map5(self):
# test with two disconnected sets
Expand All @@ -49,43 +40,19 @@ def test_get_combined_map5(self):
expected_result = sorted(
[result_set_small, result_set_large], key=lambda k: len(k)
)
actual_result = sorted(
create_match_sets(match_dir, empty_simhash_dir), key=lambda k: len(k)
)
self.assertEqual(actual_result, expected_result)

def test_get_match_sets_with_extra_id(self):
# test with three disconnected sets. The set A - E will have one simhash match (E-A) that should be included,
# and matches involving the obsolete id D that should be filtered. The "small" set F-H contains simhash-only
# ids and should be filtered. The other small set I-J should be included.
match_dir = os.path.join(
static_dir, "test_get_match_sets_with_extra_id", "match_pairs"
)
simhash_match_dir = os.path.join(
static_dir, "test_get_match_sets_with_extra_id", "simhash_match_pairs"
)
result_set_large = {"A", "B", "C", "E"}
result_set_small = {"I", "J"}
expected_result = sorted(
[result_set_small, result_set_large], key=lambda k: len(k)
)
actual_result = sorted(
create_match_sets(match_dir, simhash_match_dir), key=lambda k: len(k)
)
actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k))
self.assertEqual(actual_result, expected_result)

def test_skip_matches(self):
# test without matches excluded
match_dir = os.path.join(static_dir, "test_skip_matches_ids")
expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir), expected_result_no_excludes
)
self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes)
# test with matches excluded
exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip")
expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}]
self.assertEqual(
create_match_sets(match_dir, empty_simhash_dir, exclude_dir=exclude_dir),
create_match_sets(match_dir, exclude_dir=exclude_dir),
expected_result_excludes,
)

Expand Down
67 changes: 26 additions & 41 deletions utils/create_merge_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,34 @@ def get_exclude_matches(exclude_dir: str) -> dict:
return dont_match


def create_match_sets(
exact_match_dir: str, simhash_match_dir: str, exclude_dir: str = None
) -> list:
def create_match_sets(match_dir: str, exclude_dir: str = None) -> list:
"""
Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles,
including "transitive matches". We will use the ids present in the exact matches to filter the simhash matches,
since it's possible for obsolete ids to live on in the simhash index
:param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash
including "transitive matches".
:param match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: list of sets of matched articles
"""
print("reading pairs to not match")
dont_match = get_exclude_matches(exclude_dir)
print("getting adjacency lists")
adj_list = {}
for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]:
for fi in os.listdir(match_dir):
with open(os.path.join(match_dir, fi)) as f:
for line in f:
js = json.loads(line)
key1 = js["id1"]
key2 = js["id2"]
if is_simhash:
if (key1 not in adj_list) or (key2 not in adj_list):
continue
if key1 not in adj_list:
adj_list[key1] = set()
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
for fi in os.listdir(match_dir):
with open(os.path.join(match_dir, fi)) as f:
for line in f:
js = json.loads(line)
key1 = js["id1"]
key2 = js["id2"]
if key1 not in adj_list:
adj_list[key1] = set()
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
print("getting connected articles")
seen_ids = set()
match_sets = []
Expand Down Expand Up @@ -178,13 +170,13 @@ def create_matches(
yield batch, batch_count


def write_batch(match_batch: tuple, output_dir: str) -> None:
def write_batch(match_batch_with_output_dir: tuple) -> None:
"""
Write a batch of matches to disk
:param match_batch: tuple of a list of jsons containing a merged id and orig id, and an identifier for the batch
:param output_dir: directory where matches should be written
:param match_batch: tuple of (a tuple containing a list of jsons containing a merged id and orig id, and an identifier for the batch), and a directory where matches should be written
:return: None
"""
match_batch, output_dir = match_batch_with_output_dir
matches, batch_id = match_batch
with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f:
for match in matches:
Expand All @@ -193,7 +185,6 @@ def write_batch(match_batch: tuple, output_dir: str) -> None:

def write_matches(
exact_match_dir,
simhash_match_dir,
exclude_dir,
ids_to_drop,
prev_id_mapping_dir,
Expand All @@ -202,19 +193,19 @@ def write_matches(
"""
Generate merged id-orig id pairs and write them out as a directory of jsonls
:param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash
:param exclude_dir: directory of article pairs that should not be matched
:param ids_to_drop: file containing ids that should not be used
:param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids
:param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written
:return: None
"""
match_sets = create_match_sets(exact_match_dir, simhash_match_dir, exclude_dir)
match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with multiprocessing.Pool() as p:
p.starmap(write_batch, ((mb, output_dir) for mb in match_batches))
match_sets = create_match_sets(exact_match_dir, exclude_dir)
match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir)
output_batches = ((mb, output_dir) for mb in match_batches)
list(p.imap(write_batch, output_batches))


if __name__ == "__main__":
Expand All @@ -224,11 +215,6 @@ def write_matches(
required=True,
help="directory of jsonls containing matched orig_ids from exact metadata match",
)
parser.add_argument(
"--simhash_match_dir",
required=True,
help="directory of jsonls containing matched orig_ids from simhash",
)
parser.add_argument(
"--exclude_dir",
required=True,
Expand All @@ -253,7 +239,6 @@ def write_matches(

write_matches(
args.exact_match_dir,
args.simhash_match_dir,
args.exclude_dir,
args.ids_to_drop,
args.prev_id_mapping_dir,
Expand Down
Loading

0 comments on commit c132c82

Please sign in to comment.