diff --git a/8Knot/cache_manager/cache_facade.py b/8Knot/cache_manager/cache_facade.py new file mode 100644 index 00000000..672005e9 --- /dev/null +++ b/8Knot/cache_manager/cache_facade.py @@ -0,0 +1,128 @@ +import psycopg2 as pg +from psycopg2.extras import execute_values +from psycopg2 import sql as pg_sql +import logging +from uuid import uuid4 + + +def cache_query_results( + db_connection_string: str, + query: str, + vars: tuple[tuple], + target_table: str, + bookkeeping_data: tuple[dict], + server_pagination=2000, + client_pagination=2000, +) -> None: + """Runs {query} against primary database specified by {db_connection_string} with variables {vars}. + Retrieves results from db with paginations {server_pagination} and {client_pagination}. + + Args: + db_connection_string (str): _description_ + query (str): _description_ + vars (tuple(tuple)): _description_ + target_table (str): _description_ + bookkeeping_data (tuple(dict)): _description_ + server_pagination (int, optional): _description_. Defaults to 2000. + client_pagination (int, optional): _description_. Defaults to 2000. + """ + logging.warning("TESTING -- CACHE_QUERY_RESULTS BEGIN") + with pg.connect(db_connection_string) as augur_conn: + with augur_conn.cursor(name=f"{target_table}-{uuid4()}") as augur_cur: + # set number of rows we want from primary db at a time + augur_cur.itersize = server_pagination + + logging.warning("TESTING -- EXECUTING QUERY") + # execute query + logging.warning(vars) + augur_cur.execute(query, vars) + + logging.warning("TESTING -- STARTING TRANSACTION") + # connect to cache + with pg.connect( + dbname="augur_cache", + user="postgres", + password="password", + host="postgres-cache", + port="5432", + ) as cache_conn: + logging.warning("TESTING -- COMPOSING SQL") + # compose SQL w/ table name + # ref: https://www.psycopg.org/docs/sql.html + composed_query = pg_sql.SQL( + "INSERT INTO {tbl_name} VALUES %s ON CONFLICT DO NOTHING".format( + tbl_name=target_table + ) + ).as_string(cache_conn) + + # iterate through pages of rows from server. + logging.warning("TESTING -- FETCHING AND STORING ROWS") + while rows := augur_cur.fetchmany(client_pagination): + if not rows: + # we're out of rows + break + + # write available rows to cache. + with cache_conn.cursor() as cache_cur: + execute_values( + cur=cache_cur, + sql=composed_query, + argslist=rows, + page_size=client_pagination, + ) + + # after all data has successfully been written to cache from the primary db, + # insert record of existence for each (cache_func, repo_id) pair. + logging.warning("TESTING -- UPDATING BOOKKEEPING") + with cache_conn.cursor() as cache_cur: + execute_values( + cur=cache_cur, + sql=""" + INSERT INTO cache_bookkeeping (cache_func, repo_id) + VALUES %s + """, + template="(%(cache_func)s, %(repo_id)s)", + argslist=bookkeeping_data, + ) + + logging.warning("TESTING -- COMMITTING TRANSACTION") + # TODO: end of context block, on success, should commit. On failure, should rollback. Need to write test for this. + + # don't need to commit on primary db + logging.warning("TESTING -- SUCCESS") + + +def get_uncached(func_name: str, repolist: list[int]) -> list[int]: # or None + with pg.connect( + dbname="augur_cache", + user="postgres", + password="password", + host="postgres-cache", + port="5432", + ) as cache_conn: + with cache_conn.cursor() as cache_cur: + composed_query = pg_sql.SQL( + """ + SELECT cb.repo_id + FROM cache_bookkeeping cb + WHERE cb.cache_func = '{cache_func_name}' AND cb.repo_id in %s + """.format( + cache_func_name=func_name + ) + ).as_string(cache_conn) + + # exec query + cache_cur.execute(query=composed_query, vars=(tuple(repolist),)) + + # get list of cached repos + already_cached: list[tuple] = cache_cur.fetchall() + + # process list of single-value tuples to get list of values. + # looks like: [(val1,), (val2,), ...] + already_cached: set[int] = set([v[0] for v in already_cached]) + + # repos that are already cached will be removed from repolist set, + # leaving uncached remaining. + not_cached: list[int] = list(set(repolist) - already_cached) + + return not_cached diff --git a/8Knot/pages/contributions/visualizations/commits_over_time.py b/8Knot/pages/contributions/visualizations/commits_over_time.py index 7578bd3a..1c071ca3 100644 --- a/8Knot/pages/contributions/visualizations/commits_over_time.py +++ b/8Knot/pages/contributions/visualizations/commits_over_time.py @@ -12,6 +12,8 @@ from pages.utils.job_utils import nodata_graph import io import time +import psycopg2 +import cache_manager.cache_facade as cf PAGE = "contributions" VIZ_ID = "commits-over-time" @@ -116,16 +118,57 @@ def toggle_popover(n, is_open): ) def commits_over_time_graph(repolist, interval): # wait for data to asynchronously download and become available. - cache = cm() - df = cache.grabm(func=cmq, repos=repolist) - while df is None: - time.sleep(1.0) - df = cache.grabm(func=cmq, repos=repolist) + while not_cached := cf.get_uncached(func_name=cmq.__name__, repolist=repolist): + logging.warning(f"COMMITS_OVER_TIME_VIZ - WAITING ON DATA TO BECOME AVAILABLE") + time.sleep(0.5) # data ready. start = time.perf_counter() logging.warning("COMMITS_OVER_TIME_VIZ - START") + # GET ALL DATA FROM POSTGRES CACHE + df = None + with psycopg2.connect( + dbname="augur_cache", + user="postgres", + password="password", + host="postgres-cache", + port="5432", + ) as cache_conn: + with cache_conn.cursor() as cache_cur: + logging.critical( + cache_cur.mogrify( + """ + SELECT * + FROM commits c + WHERE c.repo_id IN %s; + """, + (tuple(repolist),), + ) + ) + cache_cur.execute( + """ + SELECT * + FROM commits c + WHERE c.repo_id IN %s; + """, + (tuple(repolist),), + ) + + logging.warning("COMMITS OVER TIME - LOADING DATA FROM CACHE") + df = pd.DataFrame( + cache_cur.fetchall(), + columns=[ + "id", + "commits", + "author_email", + "date", + "author_timestamp", + "committer_timestamp", + ], + ) + logging.warning(f"COMMITS OVER TIME - DATA LOADED {df.shape}, {df.head()}") + # test if there is data if df.empty: logging.warning("COMMITS OVER TIME - NO DATA AVAILABLE") @@ -162,7 +205,9 @@ def process_data(df: pd.DataFrame, interval): # converts date column to a datetime object, converts to string first to handle period information # the period slice is to handle weekly corner case - df_created["Date"] = pd.to_datetime(df_created["Date"].astype(str).str[:period_slice]) + df_created["Date"] = pd.to_datetime( + df_created["Date"].astype(str).str[:period_slice] + ) return df_created diff --git a/8Knot/queries/commits_query.py b/8Knot/queries/commits_query.py index 7f9b0a86..14dc109d 100644 --- a/8Knot/queries/commits_query.py +++ b/8Knot/queries/commits_query.py @@ -6,6 +6,9 @@ import io import datetime as dt from sqlalchemy.exc import SQLAlchemyError +import psycopg2 as pg +from psycopg2.extras import execute_values +import cache_manager.cache_facade as cf # DEBUGGING import os @@ -41,7 +44,7 @@ def commits_query(self, repos): # commenting-outunused query components. only need the repo_id and the # authorship date for our current queries. remove the '--' to re-add # the now-removed values. - query_string = f""" + query_string = """ SELECT distinct r.repo_id AS id, @@ -56,64 +59,146 @@ def commits_query(self, repos): c.cmt_committer_timestamp AS committer_timestamp FROM - repo r - JOIN commits c + augur_data.repo r + JOIN augur_data.commits c ON r.repo_id = c.repo_id WHERE - c.repo_id in ({str(repos)[1:-1]}) + c.repo_id in %s """ + # tuple of dictionaries + func_name = commits_query.__name__ + try: - dbm = AugurManager() - engine = dbm.get_engine() - except KeyError: - # noack, data wasn't successfully set. - logging.error(f"{QUERY_NAME}_DATA_QUERY - INCOMPLETE ENVIRONMENT") - return False - except SQLAlchemyError: - logging.error(f"{QUERY_NAME}_DATA_QUERY - COULDN'T CONNECT TO DB") - # allow retry via Celery rules. - raise SQLAlchemyError("DBConnect failed") - - df = dbm.run_query(query_string) - - # change to compatible type and remove all data that has been incorrectly formated - df["author_timestamp"] = pd.to_datetime(df["author_timestamp"], utc=True).dt.date - df = df[df.author_timestamp < dt.date.today()] - - # break apart returned data per repo - # and temporarily store in List to be - # stored in Redis. - pic = [] - for r in repos: - # convert series to a dataframe - # once we've stored the data by ID we no longer need the column. - c_df = pd.DataFrame(df.loc[df["id"] == r].drop(columns=["id"])).reset_index(drop=True) - - # bytes buffer to be written to - b = io.BytesIO() - - # write dataframe in feather format to BytesIO buffer - bs = c_df.to_feather(b) - - # move head of buffer to the beginning - b.seek(0) - - # write the bytes of the buffer into the array - bs = b.read() - pic.append(bs) - - del df - - # store results in Redis - cm_o = cm() - - # 'ack' is a boolean of whether data was set correctly or not. - ack = cm_o.setm( - func=commits_query, - repos=repos, - datas=pic, - ) + # STEP 1: Which repos need to be queried for? + # some might already be in cache. + uncached_repos: list[int] | None = cf.get_uncached( + func_name=func_name, repolist=repos + ) + if not uncached_repos: + logging.warning(f"{QUERY_NAME}_DATA_QUERY - ALL REQUESTED REPOS IN CACHE") + return 0 + else: + logging.warning( + f"{QUERY_NAME}_DATA_QUERY - CACHING {len(uncached_repos)} NEW REPOS" + ) + uncached_repos: tuple[tuple] = tuple([tuple(uncached_repos)]) + + # STEP 2: Query for those repos + logging.warning(f"{QUERY_NAME}_DATA_QUERY - EXECUTING CACHING QUERY") + cf.cache_query_results( + db_connection_string="dbname=padres user=cali password=!baseball21 host=chaoss.tv port=5432", + query=query_string, + vars=uncached_repos, + target_table="commits", + bookkeeping_data=tuple( + {"cache_func": func_name, "repo_id": r} for r in repos + ), + ) + except Exception as e: + logging.critical(f"{QUERY_NAME}_POSTGRES ERROR: {e}") + return 1 + + # try: + # dbm = AugurManager() + # engine = dbm.get_engine() + # except KeyError: + # # noack, data wasn't successfully set. + # logging.error(f"{QUERY_NAME}_DATA_QUERY - INCOMPLETE ENVIRONMENT") + # return False + # except SQLAlchemyError: + # logging.error(f"{QUERY_NAME}_DATA_QUERY - COULDN'T CONNECT TO DB") + # # allow retry via Celery rules. + # raise SQLAlchemyError("DBConnect failed") + + # df = dbm.run_query(query_string) + + # # change to compatible type and remove all data that has been incorrectly formated + # df["author_timestamp"] = pd.to_datetime(df["author_timestamp"], utc=True).dt.date + # df = df[df.author_timestamp < dt.date.today()] + + # # break apart returned data per repo + # # and temporarily store in List to be + # # stored in Redis. + # pic = [] + # for r in repos: + # # convert series to a dataframe + # # once we've stored the data by ID we no longer need the column. + # c_df = pd.DataFrame(df.loc[df["id"] == r].drop(columns=["id"])).reset_index( + # drop=True + # ) + + # # bytes buffer to be written to + # b = io.BytesIO() + + # # write dataframe in feather format to BytesIO buffer + # bs = c_df.to_feather(b) + + # # move head of buffer to the beginning + # b.seek(0) + + # # write the bytes of the buffer into the array + # bs = b.read() + # pic.append(bs) + + # del df + + # # store results in Redis + # cm_o = cm() + + # # 'ack' is a boolean of whether data was set correctly or not. + # ack = cm_o.setm( + # func=commits_query, + # repos=repos, + # datas=pic, + # ) logging.warning(f"{QUERY_NAME}_DATA_QUERY - END") - return ack + return 0 + + +def _query_write( + server_sql, query_args, server_pagination=2000, client_pagination=2000 +): + # CONNECT TO PRIMARY DB + # WITH AUTOCOMMIT OFF, TRANSACTION COMMITS AT END OF CONTEXT + # BLOCK, BUT CONNECTION DOESN'T CLOSE. + # IF THIS IS PARALLELIZED, DON'T NEED TO WORRY ABOUT REENTRANCY. + with pg.connect( + dbname="padres", + user="cali", + password="!baseball21", + host="chaoss.tv", + port="5432", + ) as augur_conn: + with augur_conn.cursor("commits-query") as augur_cur: + # CONFIGURE PRIMARY DB AND EXECUTE QUERY + augur_cur.itersize = server_pagination + if query_args: + augur_cur.execute(server_sql, (query_args,)) + else: + augur_cur.execute(server_sql) + + # CONNECT TO CACHE DB, CONFIGURE w/ AUTOCOMMIT + with pg.connect( + dbname="augur_cache", + user="postgres", + password="password", + host="postgres-cache", + port="5432", + ) as cache_conn: + # ITERATE THROUGH BUFFERED ROWS + while rows := augur_cur.fetchmany(client_pagination): + if not rows: + break + + # WRITE ROWS TO CACHE + with cache_conn.cursor() as cache_cur: + execute_values( + cache_cur, + "INSERT INTO commits VALUES %s ON CONFLICT DO NOTHING", + rows, + ) + + augur_conn.close() + cache_conn.close()