From c132c8210ce9ea9242181326ee973e7ec1697cf1 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 11 Sep 2024 21:41:06 -0700 Subject: [PATCH] Remove simhash --- linkage_dag.py | 37 +-- sequences/merge_combined_metadata.tsv | 1 - sql/simhash_input.sql | 18 -- tests/static/simhash_empty/empty.jsonl | 0 .../match_pairs/file1 | 4 - .../simhash_match_pairs/ids1.jsonl | 4 - .../simhash_match_pairs/ids2.jsonl | 2 - tests/test_create_merge_ids.py | 47 +--- utils/create_merge_ids.py | 67 +++--- utils/my_simhash.py | 210 ------------------ utils/run_ids_scripts.sh | 5 +- utils/run_simhash.py | 121 ---------- utils/run_simhash_scripts.sh | 5 - 13 files changed, 36 insertions(+), 485 deletions(-) delete mode 100644 sql/simhash_input.sql delete mode 100644 tests/static/simhash_empty/empty.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 delete mode 100644 tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl delete mode 100644 tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl delete mode 100644 utils/my_simhash.py delete mode 100644 utils/run_simhash.py delete mode 100644 utils/run_simhash_scripts.sh diff --git a/linkage_dag.py b/linkage_dag.py index 9485326..64c7dbf 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -318,12 +318,6 @@ destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/exact_matches/article_pairs*.jsonl", export_format="NEWLINE_DELIMITED_JSON", ), - BigQueryToGCSOperator( - task_id="export_simhash_input", - source_project_dataset_table=f"{staging_dataset}.simhash_input", - destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl", - export_format="NEWLINE_DELIMITED_JSON", - ), BigQueryToGCSOperator( task_id="export_lid_input", source_project_dataset_table=f"{staging_dataset}.lid_input", @@ -344,8 +338,7 @@ ), ] - # Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the - # merge ids + # Start up godzilla of article linkage, create the merged ids gce_instance_start = ComputeEngineStartInstanceOperator( project_id=project_id, zone=gce_zone, @@ -365,13 +358,9 @@ "mkdir input_data", "mkdir current_ids", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/exact_matches .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .", - f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .", - "mkdir new_simhash_indexes", ] prep_environment_vm_script = " && ".join(prep_environment_script_sequence) @@ -380,24 +369,9 @@ bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"', ) - update_simhash_index = BashOperator( - task_id="update_simhash_index", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"', - ) - - wait_for_simhash_index = GCSObjectExistenceSensor( - task_id="wait_for_simhash_index", - bucket=DATA_BUCKET, - object=f"{tmp_dir}/done_files/simhash_is_done", - deferrable=True, - ) - create_cset_ids = BashOperator( task_id="create_cset_ids", bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', - # These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task - - # but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files - # outside the VM it runs on and this task depends on all of these things, directly or indirectly inlets=[ BigQueryTable( project_id=project_id, dataset_id=production_dataset, table_id="sources" @@ -407,11 +381,6 @@ dataset_id=staging_dataset, table_id="all_match_pairs_with_um", ), - BigQueryTable( - project_id=project_id, - dataset_id=staging_dataset, - table_id="simhash_input", - ), BigQueryTable( project_id=project_id, dataset_id=staging_dataset, table_id="unlink" ), @@ -577,7 +546,7 @@ BigQueryCheckOperator( task_id="all_trivial_matches_survived", sql=f""" - -- check that all article pairs generated by exact matches make it through the simhash and + -- check that all article pairs generated by exact matches make it through the -- merged id assignment, except ones we've deliberately unlinked select count(0) = 0 @@ -696,8 +665,6 @@ >> heavy_compute_inputs >> gce_instance_start >> prep_environment - >> update_simhash_index - >> wait_for_simhash_index >> create_cset_ids >> wait_for_cset_ids >> gce_instance_stop diff --git a/sequences/merge_combined_metadata.tsv b/sequences/merge_combined_metadata.tsv index 73b4071..09bf778 100644 --- a/sequences/merge_combined_metadata.tsv +++ b/sequences/merge_combined_metadata.tsv @@ -1,6 +1,5 @@ arxiv_id_match metadata_match all_match_pairs_with_um -simhash_input lid_input ids_to_drop diff --git a/sql/simhash_input.sql b/sql/simhash_input.sql deleted file mode 100644 index 3a04e6f..0000000 --- a/sql/simhash_input.sql +++ /dev/null @@ -1,18 +0,0 @@ --- get input for simhash, which is articles with not null titles and abstracts that have not already been matched -SELECT - id, - year, - concat(title_norm, abstract_norm) AS normalized_text -FROM {{ staging_dataset }}.all_metadata_norm_filt -WHERE - (year IS NOT NULL) - AND (title_norm IS NOT NULL) AND (title_norm != "") - AND (abstract_norm IS NOT NULL) AND (abstract_norm != "") - AND id NOT IN ( - SELECT a.id FROM {{ staging_dataset }}.all_metadata_norm_filt AS a - LEFT JOIN - {{ staging_dataset }}.all_metadata_with_cld2_lid_last_run AS b - ON a.id = b.id - WHERE (a.title = b.title) AND (a.abstract = b.abstract) AND (a.year = b.year) AND (a.title != "") - AND (a.title IS NOT NULL) AND (a.abstract != "") AND (a.abstract IS NOT NULL) AND (a.year IS NOT NULL) - ) diff --git a/tests/static/simhash_empty/empty.jsonl b/tests/static/simhash_empty/empty.jsonl deleted file mode 100644 index e69de29..0000000 diff --git a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 b/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 deleted file mode 100644 index 61d3beb..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/match_pairs/file1 +++ /dev/null @@ -1,4 +0,0 @@ -{"id1": "A", "id2": "B"} -{"id1": "B", "id2": "C"} -{"id1": "J", "id2": "I"} -{"id1": "E", "id2": "E"} diff --git a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl deleted file mode 100644 index 12baa68..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids1.jsonl +++ /dev/null @@ -1,4 +0,0 @@ -{"id1": "D", "id2": "B"} -{"id1": "D", "id2": "E"} -{"id1": "E", "id2": "A"} -{"id1": "B", "id2": "C"} diff --git a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl b/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl deleted file mode 100644 index 7ce7f16..0000000 --- a/tests/static/test_get_match_sets_with_extra_id/simhash_match_pairs/ids2.jsonl +++ /dev/null @@ -1,2 +0,0 @@ -{"id1": "F", "id2": "G"} -{"id1": "F", "id2": "H"} diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index 30a6c96..4dac11a 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -6,7 +6,6 @@ from utils.create_merge_ids import create_match_sets, create_matches static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") -empty_simhash_dir = os.path.join(static_dir, "simhash_empty") class TestGetCombinedMap(unittest.TestCase): @@ -16,30 +15,22 @@ class TestGetCombinedMap(unittest.TestCase): def test_get_combined_map1(self): match_dir = os.path.join(static_dir, "test_get_combined_map1") expected_result = [{"A", "B", "C"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map2(self): match_dir = os.path.join(static_dir, "test_get_combined_map2") expected_result = [{"A", "B", "C", "D"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map3(self): match_dir = os.path.join(static_dir, "test_get_combined_map3") expected_result = [{"A", "B", "C", "D", "E"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map4(self): match_dir = os.path.join(static_dir, "test_get_combined_map4") expected_result = [{"A", "B", "C", "D", "E", "F", "G", "H"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result - ) + self.assertEqual(create_match_sets(match_dir), expected_result) def test_get_combined_map5(self): # test with two disconnected sets @@ -49,43 +40,19 @@ def test_get_combined_map5(self): expected_result = sorted( [result_set_small, result_set_large], key=lambda k: len(k) ) - actual_result = sorted( - create_match_sets(match_dir, empty_simhash_dir), key=lambda k: len(k) - ) - self.assertEqual(actual_result, expected_result) - - def test_get_match_sets_with_extra_id(self): - # test with three disconnected sets. The set A - E will have one simhash match (E-A) that should be included, - # and matches involving the obsolete id D that should be filtered. The "small" set F-H contains simhash-only - # ids and should be filtered. The other small set I-J should be included. - match_dir = os.path.join( - static_dir, "test_get_match_sets_with_extra_id", "match_pairs" - ) - simhash_match_dir = os.path.join( - static_dir, "test_get_match_sets_with_extra_id", "simhash_match_pairs" - ) - result_set_large = {"A", "B", "C", "E"} - result_set_small = {"I", "J"} - expected_result = sorted( - [result_set_small, result_set_large], key=lambda k: len(k) - ) - actual_result = sorted( - create_match_sets(match_dir, simhash_match_dir), key=lambda k: len(k) - ) + actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k)) self.assertEqual(actual_result, expected_result) def test_skip_matches(self): # test without matches excluded match_dir = os.path.join(static_dir, "test_skip_matches_ids") expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}] - self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir), expected_result_no_excludes - ) + self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes) # test with matches excluded exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip") expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}] self.assertEqual( - create_match_sets(match_dir, empty_simhash_dir, exclude_dir=exclude_dir), + create_match_sets(match_dir, exclude_dir=exclude_dir), expected_result_excludes, ) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index deed02d..fd3d7a5 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -61,15 +61,11 @@ def get_exclude_matches(exclude_dir: str) -> dict: return dont_match -def create_match_sets( - exact_match_dir: str, simhash_match_dir: str, exclude_dir: str = None -) -> list: +def create_match_sets(match_dir: str, exclude_dir: str = None) -> list: """ Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles, - including "transitive matches". We will use the ids present in the exact matches to filter the simhash matches, - since it's possible for obsolete ids to live on in the simhash index - :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match - :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash + including "transitive matches". + :param match_dir: directory of jsonls containing matched orig_ids from exact metadata match :param exclude_dir: directory of jsonl files containing article pairs that should not be matched together :return: list of sets of matched articles """ @@ -77,26 +73,22 @@ def create_match_sets( dont_match = get_exclude_matches(exclude_dir) print("getting adjacency lists") adj_list = {} - for match_dir, is_simhash in [(exact_match_dir, False), (simhash_match_dir, True)]: - for fi in os.listdir(match_dir): - with open(os.path.join(match_dir, fi)) as f: - for line in f: - js = json.loads(line) - key1 = js["id1"] - key2 = js["id2"] - if is_simhash: - if (key1 not in adj_list) or (key2 not in adj_list): - continue - if key1 not in adj_list: - adj_list[key1] = set() - if key2 not in dont_match.get(key1, set()): - adj_list[key1].add(key2) - # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, - # this will ensure they get added to the same match set - if key2 not in adj_list: - adj_list[key2] = set() - if key1 not in dont_match.get(key2, set()): - adj_list[key2].add(key1) + for fi in os.listdir(match_dir): + with open(os.path.join(match_dir, fi)) as f: + for line in f: + js = json.loads(line) + key1 = js["id1"] + key2 = js["id2"] + if key1 not in adj_list: + adj_list[key1] = set() + if key2 not in dont_match.get(key1, set()): + adj_list[key1].add(key2) + # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, + # this will ensure they get added to the same match set + if key2 not in adj_list: + adj_list[key2] = set() + if key1 not in dont_match.get(key2, set()): + adj_list[key2].add(key1) print("getting connected articles") seen_ids = set() match_sets = [] @@ -178,13 +170,13 @@ def create_matches( yield batch, batch_count -def write_batch(match_batch: tuple, output_dir: str) -> None: +def write_batch(match_batch_with_output_dir: tuple) -> None: """ Write a batch of matches to disk - :param match_batch: tuple of a list of jsons containing a merged id and orig id, and an identifier for the batch - :param output_dir: directory where matches should be written + :param match_batch: tuple of (a tuple containing a list of jsons containing a merged id and orig id, and an identifier for the batch), and a directory where matches should be written :return: None """ + match_batch, output_dir = match_batch_with_output_dir matches, batch_id = match_batch with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f: for match in matches: @@ -193,7 +185,6 @@ def write_batch(match_batch: tuple, output_dir: str) -> None: def write_matches( exact_match_dir, - simhash_match_dir, exclude_dir, ids_to_drop, prev_id_mapping_dir, @@ -202,19 +193,19 @@ def write_matches( """ Generate merged id-orig id pairs and write them out as a directory of jsonls :param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match - :param simhash_match_dir: directory of jsonls containing matched orig_ids from simhash :param exclude_dir: directory of article pairs that should not be matched :param ids_to_drop: file containing ids that should not be used :param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids :param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written :return: None """ - match_sets = create_match_sets(exact_match_dir, simhash_match_dir, exclude_dir) - match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir) if not os.path.exists(output_dir): os.makedirs(output_dir) with multiprocessing.Pool() as p: - p.starmap(write_batch, ((mb, output_dir) for mb in match_batches)) + match_sets = create_match_sets(exact_match_dir, exclude_dir) + match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir) + output_batches = ((mb, output_dir) for mb in match_batches) + list(p.imap(write_batch, output_batches)) if __name__ == "__main__": @@ -224,11 +215,6 @@ def write_matches( required=True, help="directory of jsonls containing matched orig_ids from exact metadata match", ) - parser.add_argument( - "--simhash_match_dir", - required=True, - help="directory of jsonls containing matched orig_ids from simhash", - ) parser.add_argument( "--exclude_dir", required=True, @@ -253,7 +239,6 @@ def write_matches( write_matches( args.exact_match_dir, - args.simhash_match_dir, args.exclude_dir, args.ids_to_drop, args.prev_id_mapping_dir, diff --git a/utils/my_simhash.py b/utils/my_simhash.py deleted file mode 100644 index 7a15493..0000000 --- a/utils/my_simhash.py +++ /dev/null @@ -1,210 +0,0 @@ -# Created by 1e0n in 2013 -# this file is identical to https://github.com/leonsim/simhash/commit/7337c9ae353dbdc32666e88ba07a75170c19b79c, except -# it has some logging which was overwhelming airflow removed -from __future__ import division, unicode_literals - -import collections -import hashlib -import logging -import numbers -import re -import sys -from itertools import groupby - -if sys.version_info[0] >= 3: - basestring = str - unicode = str - long = int - - -def _hashfunc(x): - return int(hashlib.md5(x).hexdigest(), 16) - - -class Simhash(object): - def __init__(self, value, f=64, reg=r"[\w\u4e00-\u9fcc]+", hashfunc=None, log=None): - """ - `f` is the dimensions of fingerprints - - `reg` is meaningful only when `value` is basestring and describes - what is considered to be a letter inside parsed string. Regexp - object can also be specified (some attempt to handle any letters - is to specify reg=re.compile(r'\w', re.UNICODE)) # noqa: W605 - - `hashfunc` accepts a utf-8 encoded string and returns a unsigned - integer in at least `f` bits. - """ - - self.f = f - self.reg = reg - self.value = None - - if hashfunc is None: - self.hashfunc = _hashfunc - else: - self.hashfunc = hashfunc - - if log is None: - self.log = logging.getLogger("simhash") - else: - self.log = log - - if isinstance(value, Simhash): - self.value = value.value - elif isinstance(value, basestring): - self.build_by_text(unicode(value)) - elif isinstance(value, collections.Iterable): - self.build_by_features(value) - elif isinstance(value, numbers.Integral): - self.value = value - else: - raise Exception("Bad parameter with type {}".format(type(value))) - - def __eq__(self, other): - """ - Compare two simhashes by their value. - - :param Simhash other: The Simhash object to compare to - """ - return self.value == other.value - - def _slide(self, content, width=4): - return [content[i : i + width] for i in range(max(len(content) - width + 1, 1))] - - def _tokenize(self, content): - content = content.lower() - content = "".join(re.findall(self.reg, content)) - ans = self._slide(content) - return ans - - def build_by_text(self, content): - features = self._tokenize(content) - features = {k: sum(1 for _ in g) for k, g in groupby(sorted(features))} - return self.build_by_features(features) - - def build_by_features(self, features): - """ - `features` might be a list of unweighted tokens (a weight of 1 - will be assumed), a list of (token, weight) tuples or - a token -> weight dict. - """ - v = [0] * self.f - masks = [1 << i for i in range(self.f)] - if isinstance(features, dict): - features = features.items() - for f in features: - if isinstance(f, basestring): - h = self.hashfunc(f.encode("utf-8")) - w = 1 - else: - assert isinstance(f, collections.Iterable) - h = self.hashfunc(f[0].encode("utf-8")) - w = f[1] - for i in range(self.f): - v[i] += w if h & masks[i] else -w - # use reversed binary str to keep the backward compatibility - binary_str = "".join(["0" if i <= 0 else "1" for i in v[::-1]]) - self.value = int(binary_str, 2) - - def distance(self, another): - assert self.f == another.f - x = (self.value ^ another.value) & ((1 << self.f) - 1) - ans = 0 - while x: - ans += 1 - x &= x - 1 - return ans - - -class SimhashIndex(object): - def __init__(self, objs, f=64, k=2, log=None): - """ - `objs` is a list of (obj_id, simhash) - obj_id is a string, simhash is an instance of Simhash - `f` is the same with the one for Simhash - `k` is the tolerance - """ - self.k = k - self.f = f - count = len(objs) - - if log is None: - self.log = logging.getLogger("simhash") - else: - self.log = log - - self.log.info("Initializing %s data.", count) - - self.bucket = collections.defaultdict(set) - - for i, q in enumerate(objs): - if i % 10000 == 0 or i == count - 1: - self.log.info("%s/%s", i + 1, count) - - self.add(*q) - - def get_near_dups(self, simhash): - """ - `simhash` is an instance of Simhash - return a list of obj_id, which is in type of str - """ - assert simhash.f == self.f - - ans = set() - - for key in self.get_keys(simhash): - dups = self.bucket[key] - self.log.debug("key:%s", key) - # if len(dups) > 200: - # self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) - - for dup in dups: - sim2, obj_id = dup.split(",", 1) - sim2 = Simhash(long(sim2, 16), self.f) - - d = simhash.distance(sim2) - if d <= self.k: - ans.add(obj_id) - return list(ans) - - def add(self, obj_id, simhash): - """ - `obj_id` is a string - `simhash` is an instance of Simhash - """ - assert simhash.f == self.f - - for key in self.get_keys(simhash): - v = "%x,%s" % (simhash.value, obj_id) - self.bucket[key].add(v) - - def delete(self, obj_id, simhash): - """ - `obj_id` is a string - `simhash` is an instance of Simhash - """ - assert simhash.f == self.f - - for key in self.get_keys(simhash): - v = "%x,%s" % (simhash.value, obj_id) - if v in self.bucket[key]: - self.bucket[key].remove(v) - - @property - def offsets(self): - """ - You may optimize this method according to - """ - return [self.f // (self.k + 1) * i for i in range(self.k + 1)] - - def get_keys(self, simhash): - for i, offset in enumerate(self.offsets): - if i == (len(self.offsets) - 1): - m = 2 ** (self.f - offset) - 1 - else: - m = 2 ** (self.offsets[i + 1] - offset) - 1 - c = simhash.value >> offset & m - yield "%x:%x" % (c, i) - - def bucket_size(self): - return len(self.bucket) diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh index e1751c4..6380930 100644 --- a/utils/run_ids_scripts.sh +++ b/utils/run_ids_scripts.sh @@ -1,9 +1,6 @@ cd /mnt/disks/data/run gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done -python3 create_merge_ids.py --exact_match_dir exact_matches --simhash_match_dir simhash_results --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings +python3 create_merge_ids.py --exact_match_dir exact_matches --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings /snap/bin/gsutil -m cp -r new_id_mappings gs://airflow-data-exchange/article_linkage/tmp/ -/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ -/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/ -/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/ touch ids_are_done gsutil cp ids_are_done gs://airflow-data-exchange/article_linkage/tmp/done_files/ diff --git a/utils/run_simhash.py b/utils/run_simhash.py deleted file mode 100644 index 3ca56c0..0000000 --- a/utils/run_simhash.py +++ /dev/null @@ -1,121 +0,0 @@ -import argparse -import json -import multiprocessing -import os -import pickle -import re -from datetime import datetime - -from my_simhash import Simhash, SimhashIndex - - -def get_features(s: str) -> list: - """ - The default feature extraction method, from https://github.com/leonsim/simhash - """ - width = 3 - s = s.lower() - s = re.sub(r"[^\w]+", "", s) - return [s[i : i + width] for i in range(max(len(s) - width + 1, 1))] - - -def write_sim_strings( - data_fi: str, output_fi: str, input_index: str = None, output_index: str = None -) -> None: - """ - Does the similarity matching and writes out the outputs. Basic method from from https://github.com/leonsim/simhash - """ - data_ids_and_values = [ - line.strip().split("\t") for line in open(data_fi).readlines() - ] - objs = [ - (article_id, Simhash(get_features(article_text))) - for article_id, article_text in data_ids_and_values - ] - index = None - if (input_index is None) or not os.path.exists(input_index): - index = SimhashIndex(objs, k=3) - else: - index = pickle.load(open(input_index, mode="rb")) - for obj_id, obj in objs: - index.add(obj_id, obj) - print("writing updated index to " + output_index) - pickle.dump(index, open(output_index, mode="wb")) - - with open(output_fi, mode="w") as out: - for article_id, article_text in data_ids_and_values: - feats = Simhash(get_features(article_text)) - dup_ids = index.get_near_dups(feats) - for dup_id in dup_ids: - if dup_id != article_id: - out.write(json.dumps({"id1": article_id, "id2": dup_id}) + "\n") - - -def get_year_partition(input_dir: str, output_dir: str) -> list: - """ - Takes an input directory of jsonl containing three fields: id, year, and normalized_text. Constructs a map - mapping year to tuples of id, normalized_text, and writes each year's data as a tsv - - Initially I tried passing the arrays of id, normalized text for each year around in memory. However, - the multiprocessing library pickles its inputs and some years' data exceeded the maximum pickle size. - For the same reason, we write to tsv instead of pickling here. - - :param input_dir: directory of jsonl - :param output_dir: dir where each year's worth of pairs should be written as pkl - :return: list of years - """ - print("getting year partition") - year_to_outfi = {} - if not os.path.exists(output_dir): - os.mkdir(output_dir) - for fi in os.listdir(input_dir): - for line in open(os.path.join(input_dir, fi)): - js = json.loads(line) - year = js["year"] - if year not in year_to_outfi: - year_to_outfi[year] = open( - os.path.join(output_dir, year + ".tsv"), mode="w" - ) - year_to_outfi[year].write(f"{js['id']}\t{js['normalized_text']}\n") - for year in year_to_outfi: - year_to_outfi[year].close() - return list(year_to_outfi.keys()) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("input_dir", help="directory of jsonl") - parser.add_argument("--tmp_dir", default="simhash-tmp") - parser.add_argument("--simhash_indexes", help="current simhash indexes") - parser.add_argument( - "--new_simhash_indexes", help="location where updated indexes should be written" - ) - parser.add_argument( - "output_dir", - help=( - "directory where output matches should be written. " - "Outputs will be in the form `year`.jsonl" - ), - ) - args = parser.parse_args() - - years = get_year_partition(args.input_dir, args.tmp_dir) - print("running simhash") - day = datetime.now().strftime("%Y-%m-%d") - with multiprocessing.Pool() as p: - p.starmap( - write_sim_strings, - [ - ( - os.path.join(args.tmp_dir, year + ".tsv"), - os.path.join(args.output_dir, f"{year}_{day}.jsonl"), - None - if args.simhash_indexes is None - else os.path.join(args.simhash_indexes, f"{year}.pkl"), - None - if args.new_simhash_indexes is None - else os.path.join(args.new_simhash_indexes, f"{year}.pkl"), - ) - for year in years - ], - ) diff --git a/utils/run_simhash_scripts.sh b/utils/run_simhash_scripts.sh deleted file mode 100644 index 95650e9..0000000 --- a/utils/run_simhash_scripts.sh +++ /dev/null @@ -1,5 +0,0 @@ -cd /mnt/disks/data/run -gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/simhash_is_done -python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes -touch simhash_is_done -gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/