Skip to content

Commit

Permalink
step forward- test caching w/ postgres implemented
Browse files Browse the repository at this point in the history
Signed-off-by: James Kunstle <[email protected]>
  • Loading branch information
JamesKunstle committed Nov 1, 2023
1 parent f0a5c89 commit e5fbc79
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 61 deletions.
128 changes: 128 additions & 0 deletions 8Knot/cache_manager/cache_facade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import psycopg2 as pg
from psycopg2.extras import execute_values
from psycopg2 import sql as pg_sql
import logging
from uuid import uuid4


def cache_query_results(
db_connection_string: str,
query: str,
vars: tuple[tuple],
target_table: str,
bookkeeping_data: tuple[dict],
server_pagination=2000,
client_pagination=2000,
) -> None:
"""Runs {query} against primary database specified by {db_connection_string} with variables {vars}.
Retrieves results from db with paginations {server_pagination} and {client_pagination}.
Args:
db_connection_string (str): _description_
query (str): _description_
vars (tuple(tuple)): _description_
target_table (str): _description_
bookkeeping_data (tuple(dict)): _description_
server_pagination (int, optional): _description_. Defaults to 2000.
client_pagination (int, optional): _description_. Defaults to 2000.
"""
logging.warning("TESTING -- CACHE_QUERY_RESULTS BEGIN")
with pg.connect(db_connection_string) as augur_conn:
with augur_conn.cursor(name=f"{target_table}-{uuid4()}") as augur_cur:
# set number of rows we want from primary db at a time
augur_cur.itersize = server_pagination

logging.warning("TESTING -- EXECUTING QUERY")
# execute query
logging.warning(vars)
augur_cur.execute(query, vars)

logging.warning("TESTING -- STARTING TRANSACTION")
# connect to cache
with pg.connect(
dbname="augur_cache",
user="postgres",
password="password",
host="postgres-cache",
port="5432",
) as cache_conn:
logging.warning("TESTING -- COMPOSING SQL")
# compose SQL w/ table name
# ref: https://www.psycopg.org/docs/sql.html
composed_query = pg_sql.SQL(
"INSERT INTO {tbl_name} VALUES %s ON CONFLICT DO NOTHING".format(
tbl_name=target_table
)
).as_string(cache_conn)

# iterate through pages of rows from server.
logging.warning("TESTING -- FETCHING AND STORING ROWS")
while rows := augur_cur.fetchmany(client_pagination):
if not rows:
# we're out of rows
break

# write available rows to cache.
with cache_conn.cursor() as cache_cur:
execute_values(
cur=cache_cur,
sql=composed_query,
argslist=rows,
page_size=client_pagination,
)

# after all data has successfully been written to cache from the primary db,
# insert record of existence for each (cache_func, repo_id) pair.
logging.warning("TESTING -- UPDATING BOOKKEEPING")
with cache_conn.cursor() as cache_cur:
execute_values(
cur=cache_cur,
sql="""
INSERT INTO cache_bookkeeping (cache_func, repo_id)
VALUES %s
""",
template="(%(cache_func)s, %(repo_id)s)",
argslist=bookkeeping_data,
)

logging.warning("TESTING -- COMMITTING TRANSACTION")
# TODO: end of context block, on success, should commit. On failure, should rollback. Need to write test for this.

# don't need to commit on primary db
logging.warning("TESTING -- SUCCESS")


def get_uncached(func_name: str, repolist: list[int]) -> list[int]: # or None
with pg.connect(
dbname="augur_cache",
user="postgres",
password="password",
host="postgres-cache",
port="5432",
) as cache_conn:
with cache_conn.cursor() as cache_cur:
composed_query = pg_sql.SQL(
"""
SELECT cb.repo_id
FROM cache_bookkeeping cb
WHERE cb.cache_func = '{cache_func_name}' AND cb.repo_id in %s
""".format(
cache_func_name=func_name
)
).as_string(cache_conn)

# exec query
cache_cur.execute(query=composed_query, vars=(tuple(repolist),))

# get list of cached repos
already_cached: list[tuple] = cache_cur.fetchall()

# process list of single-value tuples to get list of values.
# looks like: [(val1,), (val2,), ...]
already_cached: set[int] = set([v[0] for v in already_cached])

# repos that are already cached will be removed from repolist set,
# leaving uncached remaining.
not_cached: list[int] = list(set(repolist) - already_cached)

return not_cached
57 changes: 51 additions & 6 deletions 8Knot/pages/contributions/visualizations/commits_over_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from pages.utils.job_utils import nodata_graph
import io
import time
import psycopg2
import cache_manager.cache_facade as cf

PAGE = "contributions"
VIZ_ID = "commits-over-time"
Expand Down Expand Up @@ -116,16 +118,57 @@ def toggle_popover(n, is_open):
)
def commits_over_time_graph(repolist, interval):
# wait for data to asynchronously download and become available.
cache = cm()
df = cache.grabm(func=cmq, repos=repolist)
while df is None:
time.sleep(1.0)
df = cache.grabm(func=cmq, repos=repolist)
while not_cached := cf.get_uncached(func_name=cmq.__name__, repolist=repolist):
logging.warning(f"COMMITS_OVER_TIME_VIZ - WAITING ON DATA TO BECOME AVAILABLE")
time.sleep(0.5)

# data ready.
start = time.perf_counter()
logging.warning("COMMITS_OVER_TIME_VIZ - START")

# GET ALL DATA FROM POSTGRES CACHE
df = None
with psycopg2.connect(
dbname="augur_cache",
user="postgres",
password="password",
host="postgres-cache",
port="5432",
) as cache_conn:
with cache_conn.cursor() as cache_cur:
logging.critical(
cache_cur.mogrify(
"""
SELECT *
FROM commits c
WHERE c.repo_id IN %s;
""",
(tuple(repolist),),
)
)
cache_cur.execute(
"""
SELECT *
FROM commits c
WHERE c.repo_id IN %s;
""",
(tuple(repolist),),
)

logging.warning("COMMITS OVER TIME - LOADING DATA FROM CACHE")
df = pd.DataFrame(
cache_cur.fetchall(),
columns=[
"id",
"commits",
"author_email",
"date",
"author_timestamp",
"committer_timestamp",
],
)
logging.warning(f"COMMITS OVER TIME - DATA LOADED {df.shape}, {df.head()}")

# test if there is data
if df.empty:
logging.warning("COMMITS OVER TIME - NO DATA AVAILABLE")
Expand Down Expand Up @@ -162,7 +205,9 @@ def process_data(df: pd.DataFrame, interval):

# converts date column to a datetime object, converts to string first to handle period information
# the period slice is to handle weekly corner case
df_created["Date"] = pd.to_datetime(df_created["Date"].astype(str).str[:period_slice])
df_created["Date"] = pd.to_datetime(
df_created["Date"].astype(str).str[:period_slice]
)

return df_created

Expand Down
Loading

0 comments on commit e5fbc79

Please sign in to comment.