From 7a7e6ee7a6030883fa003a7a6468d7d747b55d35 Mon Sep 17 00:00:00 2001 From: tcezard Date: Sat, 15 Jun 2024 10:29:45 +0100 Subject: [PATCH 1/6] Add script to get the location of the existing IP address --- eva-usage-stats/ws_query_analysis.py | 12 ++- eva-usage-stats/ws_query_fill_in_location.py | 77 ++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 eva-usage-stats/ws_query_fill_in_location.py diff --git a/eva-usage-stats/ws_query_analysis.py b/eva-usage-stats/ws_query_analysis.py index c25ad237..083df337 100644 --- a/eva-usage-stats/ws_query_analysis.py +++ b/eva-usage-stats/ws_query_analysis.py @@ -31,7 +31,7 @@ def _get_location(ip_address): # } -@lru_cache +@lru_cache(maxsize=None) def get_location(ip_address): try: return _get_location(ip_address) @@ -104,10 +104,13 @@ def main(): 'client_country_code', 'client_country_name', 'client_city', 'client_postal', 'client_latitude', 'client_longitude', 'client_state' ) + formatted_column_names = ', '.join((f'"{c}"' for c in column_name_tuple)) query = ( - f"insert into eva_web_srvc_stats.ws_traffic values {column_name_tuple} (" - f"%s, %s, %s, %s, %s, %s, %s, %s, %s, cast(%s as timestamp with time zone), %s, %s, %s, %s, %s, %s, " - f"%s, %s, %s, %s, %s, %s, %s, %s, %s);") + f"insert into eva_web_srvc_stats.ws_traffic ({formatted_column_names}) values (" + f"%s, %s, %s, %s, %s, %s, %s, %s, %s, cast(%s as timestamp with time zone), " + f"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, " + f"%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, " + f"%s, %s);") result_cursor.execute(query, ( data["@timestamp"], data["@timestamp"], data["type"], data["host"], @@ -132,6 +135,7 @@ def main(): data["request_query"] if "request_query" in data else '', data["cookie_header"] if "cookie_header" in data else '', tot_segment_length, + location_dict.get('country_code'), location_dict.get('country_name'), location_dict.get('city'), location_dict.get('postal'), diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py new file mode 100644 index 00000000..ad025bca --- /dev/null +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -0,0 +1,77 @@ +#!/usr/bin/python +import datetime +from argparse import ArgumentParser +from functools import lru_cache + +import requests + +from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle +from requests import HTTPError +from retry import retry + + +@retry(tries=5, delay=8, backoff=1.2, jitter=(1, 3)) +def _get_location(ip_address): + response = requests.get('https://geolocation-db.com/json/' + ip_address) + response.raise_for_status() + return response.json() + # { + # "country_code":"NL", + # "country_name":"Netherlands", + # "city":"Amsterdam", + # "postal":"1105", + # "latitude":52.2965, + # "longitude":4.9542, + # "IPv4":"82.196.6.158", + # "state":"North Holland" + # } + + +@lru_cache(maxsize=None) +def get_location(ip_address): + try: + return _get_location(ip_address) + except HTTPError: + return {} + + +def main(): + parser = ArgumentParser(description='') + parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) + args = parser.parse_args() + print("Job ran at " + str(datetime.datetime.now())) + + postgres_conn_handle = get_metadata_connection_handle("production_processing", args.private_config_xml_file) + with postgres_conn_handle.cursor(name='fetch_large_result') as cursor: + chunk_size = 1000 + cursor.itersize = chunk_size + cursor.execute("SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic where client_country_code is null;;") + update_query = ( + 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' + 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' + 'WHERE client_ip=%s AND client_country_code is null;' + ) + nb_ip = 0 + nb_row_updated = 0 + for row in cursor: + ip_address, = row + location_dict = get_location(ip_address) + with postgres_conn_handle.cursor() as update_cursor: + # execute the UPDATE statement + update_cursor.execute( + update_query, + (location_dict.get('country_code'), location_dict.get('country_name'), location_dict.get('city'), + location_dict.get('postal'), location_dict.get('latitude'), location_dict.get('longitude'), + location_dict.get('state'), ip_address) + ) + updated_row_count = update_cursor.rowcount + print(f'Updated {updated_row_count} record for {ip_address}') + nb_ip += 1 + nb_row_updated += updated_row_count + # commit the changes to the database + postgres_conn_handle.commit() + print(f'Updated {nb_row_updated} record for {nb_ip} IP addresses') + + +if __name__ == '__main__': + main() From 6bb0ed6f9082daac717ba29a6eea6c661477e9cd Mon Sep 17 00:00:00 2001 From: tcezard Date: Sat, 15 Jun 2024 10:39:10 +0100 Subject: [PATCH 2/6] commit at the end only --- eva-usage-stats/ws_query_fill_in_location.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py index ad025bca..73d42ff6 100644 --- a/eva-usage-stats/ws_query_fill_in_location.py +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -69,7 +69,7 @@ def main(): nb_ip += 1 nb_row_updated += updated_row_count # commit the changes to the database - postgres_conn_handle.commit() + postgres_conn_handle.commit() print(f'Updated {nb_row_updated} record for {nb_ip} IP addresses') From 6ca4da00148a7e9ab409e5eda9a74676dd686b07 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 17 Jun 2024 10:25:26 +0100 Subject: [PATCH 3/6] Commit for every batch of 1000 and use prepare statement --- eva-usage-stats/ws_query_fill_in_location.py | 72 ++++++++++++-------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py index 73d42ff6..e92abffb 100644 --- a/eva-usage-stats/ws_query_fill_in_location.py +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -6,6 +6,8 @@ import requests from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle +from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query +from psycopg2.extras import execute_batch from requests import HTTPError from retry import retry @@ -34,7 +36,11 @@ def get_location(ip_address): except HTTPError: return {} - +update_query = ( + 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' + 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' + 'WHERE client_ip=%s AND client_country_code is null;' +) def main(): parser = ArgumentParser(description='') parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) @@ -42,35 +48,41 @@ def main(): print("Job ran at " + str(datetime.datetime.now())) postgres_conn_handle = get_metadata_connection_handle("production_processing", args.private_config_xml_file) - with postgres_conn_handle.cursor(name='fetch_large_result') as cursor: - chunk_size = 1000 - cursor.itersize = chunk_size - cursor.execute("SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic where client_country_code is null;;") - update_query = ( - 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' - 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' - 'WHERE client_ip=%s AND client_country_code is null;' - ) - nb_ip = 0 - nb_row_updated = 0 - for row in cursor: - ip_address, = row - location_dict = get_location(ip_address) - with postgres_conn_handle.cursor() as update_cursor: - # execute the UPDATE statement - update_cursor.execute( - update_query, - (location_dict.get('country_code'), location_dict.get('country_name'), location_dict.get('city'), - location_dict.get('postal'), location_dict.get('latitude'), location_dict.get('longitude'), - location_dict.get('state'), ip_address) - ) - updated_row_count = update_cursor.rowcount - print(f'Updated {updated_row_count} record for {ip_address}') - nb_ip += 1 - nb_row_updated += updated_row_count - # commit the changes to the database - postgres_conn_handle.commit() - print(f'Updated {nb_row_updated} record for {nb_ip} IP addresses') + # Get the number of IP to update + query = 'SELECT count(distinct client_ip) FROM eva_web_srvc_stats.ws_traffic where client_country_code is null;' + res = get_all_results_for_query(postgres_conn_handle, query) + nb_ip_address = res[0][0] + ip_updated = 0 + chunk_size = 1000 + + while ip_updated >= nb_ip_address: + with postgres_conn_handle.cursor(name='fetch_large_result') as cursor: + cursor.itersize = chunk_size + cursor.execute(f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic " + f"where client_country_code is null limit {chunk_size};") + + location_dict_list = [] + for row in cursor: + ip_address, = row + location_dict = get_location(ip_address) + location_dict['ip_address'] = ip_address + location_dict_list.append(location_dict) + + update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100) + ip_updated += len(location_dict_list) + print(f'Updated {chunk_size} ip addresses out of {nb_ip_address}') + + +def update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100): + cur = postgres_conn_handle.cursor() + cur.execute(f"PREPARE updateStmt AS {update_query}") + execute_batch(cur, + "EXECUTE updateStmt (%(country_code)s, %(country_name)s, %(city)s, %(postal)s, %(latitude)s, " + "%(longitude)s, %(state)s, %(ip_address)s)", + location_dict_list, + page_size=page_size) + cur.execute("DEALLOCATE updateStmt") + postgres_conn_handle.commit() if __name__ == '__main__': From 1a6a566345136495dbdfbc04acc7b6c4534abc91 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 17 Jun 2024 11:51:21 +0100 Subject: [PATCH 4/6] Use proper syntax for prepare statement --- eva-usage-stats/ws_query_fill_in_location.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py index e92abffb..3cb78227 100644 --- a/eva-usage-stats/ws_query_fill_in_location.py +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -36,11 +36,14 @@ def get_location(ip_address): except HTTPError: return {} + update_query = ( - 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' - 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' - 'WHERE client_ip=%s AND client_country_code is null;' + 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=$1, client_country_name=$2, ' + 'client_city=$3, client_postal=$4, client_latitude=$5, client_longitude=$6, client_state=$7 ' + 'WHERE client_ip=$8 AND client_country_code is null;' ) + + def main(): parser = ArgumentParser(description='') parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) @@ -54,8 +57,9 @@ def main(): nb_ip_address = res[0][0] ip_updated = 0 chunk_size = 1000 + print(f'{nb_ip_address} IP addresses to update the location for {chunk_size} at a time') - while ip_updated >= nb_ip_address: + while ip_updated < nb_ip_address: with postgres_conn_handle.cursor(name='fetch_large_result') as cursor: cursor.itersize = chunk_size cursor.execute(f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic " From e67f774d9fe0f90c4cb1a552f51fc5d66c098432 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 17 Jun 2024 17:16:20 +0100 Subject: [PATCH 5/6] Revert back to simpler as prepare statement made no difference in speed --- eva-usage-stats/ws_query_fill_in_location.py | 80 +++++++++----------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py index 3cb78227..c68b5553 100644 --- a/eva-usage-stats/ws_query_fill_in_location.py +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -4,13 +4,14 @@ from functools import lru_cache import requests +from ebi_eva_common_pyutils.logger import logging_config from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query -from psycopg2.extras import execute_batch from requests import HTTPError from retry import retry +logger = logging_config.get_logger(__name__) @retry(tries=5, delay=8, backoff=1.2, jitter=(1, 3)) def _get_location(ip_address): @@ -37,56 +38,47 @@ def get_location(ip_address): return {} -update_query = ( - 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=$1, client_country_name=$2, ' - 'client_city=$3, client_postal=$4, client_latitude=$5, client_longitude=$6, client_state=$7 ' - 'WHERE client_ip=$8 AND client_country_code is null;' -) - - def main(): parser = ArgumentParser(description='') parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) args = parser.parse_args() - print("Job ran at " + str(datetime.datetime.now())) + logging_config.add_stdout_handler() + logger.info("Job ran at " + str(datetime.datetime.now())) postgres_conn_handle = get_metadata_connection_handle("production_processing", args.private_config_xml_file) - # Get the number of IP to update - query = 'SELECT count(distinct client_ip) FROM eva_web_srvc_stats.ws_traffic where client_country_code is null;' - res = get_all_results_for_query(postgres_conn_handle, query) - nb_ip_address = res[0][0] - ip_updated = 0 chunk_size = 1000 - print(f'{nb_ip_address} IP addresses to update the location for {chunk_size} at a time') - - while ip_updated < nb_ip_address: - with postgres_conn_handle.cursor(name='fetch_large_result') as cursor: - cursor.itersize = chunk_size - cursor.execute(f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic " - f"where client_country_code is null limit {chunk_size};") - - location_dict_list = [] - for row in cursor: - ip_address, = row - location_dict = get_location(ip_address) - location_dict['ip_address'] = ip_address - location_dict_list.append(location_dict) - - update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100) - ip_updated += len(location_dict_list) - print(f'Updated {chunk_size} ip addresses out of {nb_ip_address}') - - -def update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100): - cur = postgres_conn_handle.cursor() - cur.execute(f"PREPARE updateStmt AS {update_query}") - execute_batch(cur, - "EXECUTE updateStmt (%(country_code)s, %(country_name)s, %(city)s, %(postal)s, %(latitude)s, " - "%(longitude)s, %(state)s, %(ip_address)s)", - location_dict_list, - page_size=page_size) - cur.execute("DEALLOCATE updateStmt") - postgres_conn_handle.commit() + update_query = ( + 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' + 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' + 'WHERE client_ip=%s AND client_country_code is null;' + ) + nb_ip = 0 + nb_row_updated = 0 + while True: + query = (f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic " + f"where client_country_code is null limit {chunk_size};") + ip_addresses = list(get_all_results_for_query(postgres_conn_handle, query)) + + if len(ip_addresses) == 0: + break + for ip_address, in ip_addresses: + location_dict = get_location(ip_address) + with postgres_conn_handle.cursor() as update_cursor: + # execute the UPDATE statement + update_cursor.execute( + update_query, + (location_dict.get('country_code'), location_dict.get('country_name'), location_dict.get('city'), + location_dict.get('postal'), location_dict.get('latitude'), location_dict.get('longitude'), + location_dict.get('state'), ip_address) + ) + updated_row_count = update_cursor.rowcount + logger.info(f'Updated {updated_row_count} record for {ip_address}') + nb_ip += 1 + nb_row_updated += updated_row_count + # commit the changes to the database + logger.info(f'Committed {nb_row_updated} record for {nb_ip} IP addresses') + postgres_conn_handle.commit() + logger.info(f'Updated {nb_row_updated} record for {nb_ip} IP addresses') if __name__ == '__main__': From 517b644427417166336e8ad850e5b18bf11e9f11 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 17 Jun 2024 18:07:36 +0100 Subject: [PATCH 6/6] move commit outside the internal look --- eva-usage-stats/ws_query_fill_in_location.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eva-usage-stats/ws_query_fill_in_location.py b/eva-usage-stats/ws_query_fill_in_location.py index c68b5553..6b8a2ca3 100644 --- a/eva-usage-stats/ws_query_fill_in_location.py +++ b/eva-usage-stats/ws_query_fill_in_location.py @@ -46,7 +46,7 @@ def main(): logger.info("Job ran at " + str(datetime.datetime.now())) postgres_conn_handle = get_metadata_connection_handle("production_processing", args.private_config_xml_file) - chunk_size = 1000 + chunk_size = 100 update_query = ( 'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, ' 'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s ' @@ -75,9 +75,9 @@ def main(): logger.info(f'Updated {updated_row_count} record for {ip_address}') nb_ip += 1 nb_row_updated += updated_row_count - # commit the changes to the database - logger.info(f'Committed {nb_row_updated} record for {nb_ip} IP addresses') - postgres_conn_handle.commit() + # commit the changes to the database + logger.info(f'Committed {nb_row_updated} record for {nb_ip} IP addresses') + postgres_conn_handle.commit() logger.info(f'Updated {nb_row_updated} record for {nb_ip} IP addresses')