diff --git a/airflow/knesset_data_pipelines/cli.py b/airflow/knesset_data_pipelines/cli.py index fb10891..7957acd 100644 --- a/airflow/knesset_data_pipelines/cli.py +++ b/airflow/knesset_data_pipelines/cli.py @@ -16,6 +16,7 @@ def main(load_dotenv): for module_name, function_name in [ ('.google_drive_upload.cli', 'google_drive_upload'), ('.committees.cli', 'committees'), + ( '.members_eng.cli', 'members_eng'), ]: main.add_command(getattr(importlib.import_module(module_name, __package__), function_name)) diff --git a/airflow/knesset_data_pipelines/committees/common.py b/airflow/knesset_data_pipelines/committees/common.py index f09eb25..4bf990a 100644 --- a/airflow/knesset_data_pipelines/committees/common.py +++ b/airflow/knesset_data_pipelines/committees/common.py @@ -1,5 +1,5 @@ from .. import db - +from sqlalchemy import text def get_committee_parents(committee, committees): parent_committee_id = committee['parent_committee_id'] @@ -21,14 +21,14 @@ def get_committees_tree(): 'parent_committee_id': row.parent_committee_id, 'name': row.name, 'category_desc': row.category_desc, - } for row in conn.execute(''' + } for row in conn.execute(text(''' select "CommitteeID" as committee_id, "ParentCommitteeID" as parent_committee_id, "Name" as name, "CategoryDesc" as category_desc from committees_kns_committee - ''') + ''')) } for committee in committees.values(): committee['parent_committee_ids'] = get_committee_parents(committee, committees) diff --git a/airflow/knesset_data_pipelines/members_eng/__init__.py b/airflow/knesset_data_pipelines/members_eng/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow/knesset_data_pipelines/members_eng/cli.py b/airflow/knesset_data_pipelines/members_eng/cli.py new file mode 100644 index 0000000..3ccec6b --- /dev/null +++ b/airflow/knesset_data_pipelines/members_eng/cli.py @@ -0,0 +1,14 @@ +import click + + +@click.group() +def members_eng(): + pass + +@members_eng.command() +@click.option('--slow', is_flag=True) +def members_eng(**kwargs): + '''Create a table of knesset member names in english + ''' + from .members_eng import main + main(**kwargs) diff --git a/airflow/knesset_data_pipelines/members_eng/members_eng.py b/airflow/knesset_data_pipelines/members_eng/members_eng.py new file mode 100644 index 0000000..504a258 --- /dev/null +++ b/airflow/knesset_data_pipelines/members_eng/members_eng.py @@ -0,0 +1,72 @@ +import os +import requests +from textwrap import dedent +import time +import traceback +import json + +import dataflows as DF +from pyquery import PyQuery as pq + +from .. import db, config +from ..get_retry_response_content import get_retry_response_content + +def get_members_id(): + """Return an iterable of all valid mk_individual_id + """ + url = "https://backend.oknesset.org/members" + for item in requests.get(f"{url}?is_current=true").json(): + yield item['mk_individual_id'] + for item in requests.get(f"{url}?is_current=false").json(): + yield item['mk_individual_id'] + +def iterate_members(slow: bool = False): + delay=10 + if slow: + delay = 20 + for member_id in get_members_id(): + URL = f"https://knesset.gov.il/WebSiteApi/knessetapi/MKs/GetMkdetailsHeader?mkId={member_id}&languageKey=en" + print(f"getting {URL}") + try: + content = get_retry_response_content( + URL, None, None, None, retry_num=1, + num_retries=10, seconds_between_retries=delay, + skip_not_found_errors=True) + if slow: + time.sleep(1) + except Exception: + traceback.print_exc() + print(f'failed to get {URL}') + else: + data = json.loads(content) + if not data: + print(f" failed to parse {content=}") + continue + name = data.get('Name', '') + if not name: + continue + yield { + "NameEng": name, + "mk_individual_id": member_id, + } + +def main(slow=False): + table_name = 'member_english_names' + temp_table_name = f'__temp__{table_name}' + DF.Flow( + iterate_members(slow=slow), + DF.update_resource(-1, name='member_english_names', path='member_english_names.csv'), + DF.dump_to_path(os.path.join(config.KNESSET_PIPELINES_DATA_PATH, 'members', 'member_english_names')), + DF.dump_to_sql( + {temp_table_name: {'resource-name': 'member_english_names'}}, + db.get_db_engine(), + batch_size=100000, + ), + ).process() + with db.get_db_engine().connect() as conn: + with conn.begin(): + conn.execute(dedent(f''' + drop table if exists {table_name}; + alter table {temp_table_name} rename to {table_name}; + ''')) +