From 8f0adbc194fef49833a395cccbb657257a8de2b3 Mon Sep 17 00:00:00 2001 From: mattip Date: Mon, 19 Feb 2024 12:36:45 +0200 Subject: [PATCH 1/5] fix for newer sqalchemy --- airflow/knesset_data_pipelines/committees/common.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/knesset_data_pipelines/committees/common.py b/airflow/knesset_data_pipelines/committees/common.py index f09eb25..4bf990a 100644 --- a/airflow/knesset_data_pipelines/committees/common.py +++ b/airflow/knesset_data_pipelines/committees/common.py @@ -1,5 +1,5 @@ from .. import db - +from sqlalchemy import text def get_committee_parents(committee, committees): parent_committee_id = committee['parent_committee_id'] @@ -21,14 +21,14 @@ def get_committees_tree(): 'parent_committee_id': row.parent_committee_id, 'name': row.name, 'category_desc': row.category_desc, - } for row in conn.execute(''' + } for row in conn.execute(text(''' select "CommitteeID" as committee_id, "ParentCommitteeID" as parent_committee_id, "Name" as name, "CategoryDesc" as category_desc from committees_kns_committee - ''') + ''')) } for committee in committees.values(): committee['parent_committee_ids'] = get_committee_parents(committee, committees) From 9adb9e3d4b0d8b49024a47f1c7069dd3e187c706 Mon Sep 17 00:00:00 2001 From: mattip Date: Mon, 19 Feb 2024 12:37:50 +0200 Subject: [PATCH 2/5] start to add a members_eng pipeline --- airflow/knesset_data_pipelines/cli.py | 1 + .../members_eng/__init__.py | 0 .../knesset_data_pipelines/members_eng/cli.py | 14 ++++++ .../members_eng/members_eng.py | 48 +++++++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 airflow/knesset_data_pipelines/members_eng/__init__.py create mode 100644 airflow/knesset_data_pipelines/members_eng/cli.py create mode 100644 airflow/knesset_data_pipelines/members_eng/members_eng.py diff --git a/airflow/knesset_data_pipelines/cli.py b/airflow/knesset_data_pipelines/cli.py index fb10891..7957acd 100644 --- a/airflow/knesset_data_pipelines/cli.py +++ b/airflow/knesset_data_pipelines/cli.py @@ -16,6 +16,7 @@ def main(load_dotenv): for module_name, function_name in [ ('.google_drive_upload.cli', 'google_drive_upload'), ('.committees.cli', 'committees'), + ( '.members_eng.cli', 'members_eng'), ]: main.add_command(getattr(importlib.import_module(module_name, __package__), function_name)) diff --git a/airflow/knesset_data_pipelines/members_eng/__init__.py b/airflow/knesset_data_pipelines/members_eng/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow/knesset_data_pipelines/members_eng/cli.py b/airflow/knesset_data_pipelines/members_eng/cli.py new file mode 100644 index 0000000..2eeef17 --- /dev/null +++ b/airflow/knesset_data_pipelines/members_eng/cli.py @@ -0,0 +1,14 @@ +import click + + +@click.group() +def members_eng(): + pass + +@members_eng.command() +def members_eng(**kwargs): + '''Create a table of knesset member names in english + ''' + from .members_eng import main + print("running main") + main(**kwargs) diff --git a/airflow/knesset_data_pipelines/members_eng/members_eng.py b/airflow/knesset_data_pipelines/members_eng/members_eng.py new file mode 100644 index 0000000..2630587 --- /dev/null +++ b/airflow/knesset_data_pipelines/members_eng/members_eng.py @@ -0,0 +1,48 @@ +import os +import traceback +from textwrap import dedent + +import dataflows as DF +from pyquery import PyQuery as pq + +from .. import db, config +from ..get_retry_response_content import get_retry_response_content + +URL = "https://main.knesset.gov.il/en/MK/APPS/mk/mk-personal-details" + +def iterate_members(): + for member_id in range(1, 121): + try: + content = get_retry_response_content( + f"{URL}/{member_id}", None, None, None, retry_num=1, + num_retries=10, seconds_between_retries=10, + skip_not_found_errors=True) + except Exception: + treaceback.print_exc() + content = None + print(f'failed to get {URL}/{member_id}') + else: + page = pq(content) + # Hmm, the page is rendered by javascript, so the get_retry_response is empty + import pdb;pdb.set_trace() + +def main(): + table_name = 'member_english_names' + temp_table_name = f'__temp__{table_name}' + DF.Flow( + iterate_members(), + DF.update_resource(-1, name='member_english_name', path='member_english_name.csv'), + DF.dump_to_path(os.path.join(config.KNESSET_PIPELINES_DATA_PATH, 'members', 'memger_english_names')), + DF.dump_to_sql( + {temp_table_name: {'resource-name': 'member_english_names'}}, + db.get_db_engine(), + batch_size=100000, + ), + ).process + with db.get_db_engine().connect() as conn: + with conn.begin(): + conn.execute(dedent(f''' + drop table if exists {table_name}; + alter table {temp_table_name} rename to {table_name}; + ''')) + From 6ac0d63584b376ab1d97467cb9692087bb98158c Mon Sep 17 00:00:00 2001 From: Matti Picus Date: Thu, 22 Feb 2024 21:26:53 +0200 Subject: [PATCH 3/5] data flow starts to work --- .../members_eng/members_eng.py | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/airflow/knesset_data_pipelines/members_eng/members_eng.py b/airflow/knesset_data_pipelines/members_eng/members_eng.py index 2630587..3608e5d 100644 --- a/airflow/knesset_data_pipelines/members_eng/members_eng.py +++ b/airflow/knesset_data_pipelines/members_eng/members_eng.py @@ -1,6 +1,7 @@ import os -import traceback from textwrap import dedent +import traceback +import json import dataflows as DF from pyquery import PyQuery as pq @@ -8,37 +9,46 @@ from .. import db, config from ..get_retry_response_content import get_retry_response_content -URL = "https://main.knesset.gov.il/en/MK/APPS/mk/mk-personal-details" +def get_members_id(): + """Return an iterable of all valid mk_individual_id + """ + return range(1, 6000) def iterate_members(): - for member_id in range(1, 121): + for member_id in get_members_id(): + URL = f"https://knesset.gov.il/WebSiteApi/knessetapi/MKs/GetMkdetailsHeader?mkId={member_id}&languageKey=en" + print(f"getting {URL}") try: content = get_retry_response_content( - f"{URL}/{member_id}", None, None, None, retry_num=1, + URL, None, None, None, retry_num=1, num_retries=10, seconds_between_retries=10, skip_not_found_errors=True) except Exception: - treaceback.print_exc() - content = None - print(f'failed to get {URL}/{member_id}') + traceback.print_exc() + print(f'failed to get {URL}') else: - page = pq(content) - # Hmm, the page is rendered by javascript, so the get_retry_response is empty - import pdb;pdb.set_trace() + data = json.loads(content) + name = data.get('Name', '') + if not name: + continue + yield { + "NameEng": name, + "mk_individual_id": member_id, + } def main(): table_name = 'member_english_names' temp_table_name = f'__temp__{table_name}' DF.Flow( iterate_members(), - DF.update_resource(-1, name='member_english_name', path='member_english_name.csv'), - DF.dump_to_path(os.path.join(config.KNESSET_PIPELINES_DATA_PATH, 'members', 'memger_english_names')), + DF.update_resource(-1, name='member_english_names', path='member_english_names.csv'), + DF.dump_to_path(os.path.join(config.KNESSET_PIPELINES_DATA_PATH, 'members', 'member_english_names')), DF.dump_to_sql( {temp_table_name: {'resource-name': 'member_english_names'}}, db.get_db_engine(), batch_size=100000, ), - ).process + ).process() with db.get_db_engine().connect() as conn: with conn.begin(): conn.execute(dedent(f''' From ea6aeefd4d018c3623066b4fbfd5dd7a068a33e3 Mon Sep 17 00:00:00 2001 From: Matti Picus Date: Tue, 19 Mar 2024 18:21:11 +0200 Subject: [PATCH 4/5] add a slow option --- .../knesset_data_pipelines/members_eng/cli.py | 2 +- .../members_eng/members_eng.py | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/airflow/knesset_data_pipelines/members_eng/cli.py b/airflow/knesset_data_pipelines/members_eng/cli.py index 2eeef17..3ccec6b 100644 --- a/airflow/knesset_data_pipelines/members_eng/cli.py +++ b/airflow/knesset_data_pipelines/members_eng/cli.py @@ -6,9 +6,9 @@ def members_eng(): pass @members_eng.command() +@click.option('--slow', is_flag=True) def members_eng(**kwargs): '''Create a table of knesset member names in english ''' from .members_eng import main - print("running main") main(**kwargs) diff --git a/airflow/knesset_data_pipelines/members_eng/members_eng.py b/airflow/knesset_data_pipelines/members_eng/members_eng.py index 3608e5d..f4188ce 100644 --- a/airflow/knesset_data_pipelines/members_eng/members_eng.py +++ b/airflow/knesset_data_pipelines/members_eng/members_eng.py @@ -1,5 +1,6 @@ import os from textwrap import dedent +import time import traceback import json @@ -12,22 +13,30 @@ def get_members_id(): """Return an iterable of all valid mk_individual_id """ - return range(1, 6000) + return range(1, 1000) -def iterate_members(): +def iterate_members(slow: bool = False): + delay=10 + if slow: + delay = 20 for member_id in get_members_id(): URL = f"https://knesset.gov.il/WebSiteApi/knessetapi/MKs/GetMkdetailsHeader?mkId={member_id}&languageKey=en" print(f"getting {URL}") try: content = get_retry_response_content( URL, None, None, None, retry_num=1, - num_retries=10, seconds_between_retries=10, + num_retries=10, seconds_between_retries=delay, skip_not_found_errors=True) + if slow: + time.sleep(1) except Exception: traceback.print_exc() print(f'failed to get {URL}') else: data = json.loads(content) + if not data: + print(f" failed to parse {content=}") + continue name = data.get('Name', '') if not name: continue @@ -36,11 +45,11 @@ def iterate_members(): "mk_individual_id": member_id, } -def main(): +def main(slow=False): table_name = 'member_english_names' temp_table_name = f'__temp__{table_name}' DF.Flow( - iterate_members(), + iterate_members(slow=slow), DF.update_resource(-1, name='member_english_names', path='member_english_names.csv'), DF.dump_to_path(os.path.join(config.KNESSET_PIPELINES_DATA_PATH, 'members', 'member_english_names')), DF.dump_to_sql( From cc16ece2c050a371510f4f2d4dcfcbcd678e9391 Mon Sep 17 00:00:00 2001 From: mattip Date: Thu, 21 Mar 2024 12:56:18 +0200 Subject: [PATCH 5/5] use query to get mk_individual_id --- airflow/knesset_data_pipelines/members_eng/members_eng.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/knesset_data_pipelines/members_eng/members_eng.py b/airflow/knesset_data_pipelines/members_eng/members_eng.py index f4188ce..504a258 100644 --- a/airflow/knesset_data_pipelines/members_eng/members_eng.py +++ b/airflow/knesset_data_pipelines/members_eng/members_eng.py @@ -1,4 +1,5 @@ import os +import requests from textwrap import dedent import time import traceback @@ -13,7 +14,11 @@ def get_members_id(): """Return an iterable of all valid mk_individual_id """ - return range(1, 1000) + url = "https://backend.oknesset.org/members" + for item in requests.get(f"{url}?is_current=true").json(): + yield item['mk_individual_id'] + for item in requests.get(f"{url}?is_current=false").json(): + yield item['mk_individual_id'] def iterate_members(slow: bool = False): delay=10