Skip to content

Commit

Permalink
Revert back to simpler as prepare statement made no difference in speed
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Jun 17, 2024
1 parent 1a6a566 commit e67f774
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions eva-usage-stats/ws_query_fill_in_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from functools import lru_cache

import requests
from ebi_eva_common_pyutils.logger import logging_config

from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query
from psycopg2.extras import execute_batch
from requests import HTTPError
from retry import retry

logger = logging_config.get_logger(__name__)

@retry(tries=5, delay=8, backoff=1.2, jitter=(1, 3))
def _get_location(ip_address):
Expand All @@ -37,56 +38,47 @@ def get_location(ip_address):
return {}


update_query = (
'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=$1, client_country_name=$2, '
'client_city=$3, client_postal=$4, client_latitude=$5, client_longitude=$6, client_state=$7 '
'WHERE client_ip=$8 AND client_country_code is null;'
)


def main():
parser = ArgumentParser(description='')
parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True)
args = parser.parse_args()
print("Job ran at " + str(datetime.datetime.now()))
logging_config.add_stdout_handler()
logger.info("Job ran at " + str(datetime.datetime.now()))

postgres_conn_handle = get_metadata_connection_handle("production_processing", args.private_config_xml_file)
# Get the number of IP to update
query = 'SELECT count(distinct client_ip) FROM eva_web_srvc_stats.ws_traffic where client_country_code is null;'
res = get_all_results_for_query(postgres_conn_handle, query)
nb_ip_address = res[0][0]
ip_updated = 0
chunk_size = 1000
print(f'{nb_ip_address} IP addresses to update the location for {chunk_size} at a time')

while ip_updated < nb_ip_address:
with postgres_conn_handle.cursor(name='fetch_large_result') as cursor:
cursor.itersize = chunk_size
cursor.execute(f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic "
f"where client_country_code is null limit {chunk_size};")

location_dict_list = []
for row in cursor:
ip_address, = row
location_dict = get_location(ip_address)
location_dict['ip_address'] = ip_address
location_dict_list.append(location_dict)

update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100)
ip_updated += len(location_dict_list)
print(f'Updated {chunk_size} ip addresses out of {nb_ip_address}')


def update_many_and_commit(postgres_conn_handle, location_dict_list, page_size=100):
cur = postgres_conn_handle.cursor()
cur.execute(f"PREPARE updateStmt AS {update_query}")
execute_batch(cur,
"EXECUTE updateStmt (%(country_code)s, %(country_name)s, %(city)s, %(postal)s, %(latitude)s, "
"%(longitude)s, %(state)s, %(ip_address)s)",
location_dict_list,
page_size=page_size)
cur.execute("DEALLOCATE updateStmt")
postgres_conn_handle.commit()
update_query = (
'UPDATE eva_web_srvc_stats.ws_traffic SET client_country_code=%s, client_country_name=%s, '
'client_city=%s, client_postal=%s, client_latitude=%s, client_longitude=%s, client_state=%s '
'WHERE client_ip=%s AND client_country_code is null;'
)
nb_ip = 0
nb_row_updated = 0
while True:
query = (f"SELECT distinct client_ip FROM eva_web_srvc_stats.ws_traffic "
f"where client_country_code is null limit {chunk_size};")
ip_addresses = list(get_all_results_for_query(postgres_conn_handle, query))

if len(ip_addresses) == 0:
break
for ip_address, in ip_addresses:
location_dict = get_location(ip_address)
with postgres_conn_handle.cursor() as update_cursor:
# execute the UPDATE statement
update_cursor.execute(
update_query,
(location_dict.get('country_code'), location_dict.get('country_name'), location_dict.get('city'),
location_dict.get('postal'), location_dict.get('latitude'), location_dict.get('longitude'),
location_dict.get('state'), ip_address)
)
updated_row_count = update_cursor.rowcount
logger.info(f'Updated {updated_row_count} record for {ip_address}')
nb_ip += 1
nb_row_updated += updated_row_count
# commit the changes to the database
logger.info(f'Committed {nb_row_updated} record for {nb_ip} IP addresses')
postgres_conn_handle.commit()
logger.info(f'Updated {nb_row_updated} record for {nb_ip} IP addresses')


if __name__ == '__main__':
Expand Down

0 comments on commit e67f774

Please sign in to comment.