Skip to content

Commit

Permalink
Merge branch 'chouinar/2808-create-a-jwt' into chouinar/2809-parse-a-jwt
Browse files Browse the repository at this point in the history
  • Loading branch information
chouinar committed Nov 20, 2024
2 parents 92664ed + 3dc50ad commit 5b8dab5
Show file tree
Hide file tree
Showing 19 changed files with 550 additions and 277 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci-analytics.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ jobs:
- name: Run linting
run: make lint

- name: Run database initialization
run: docker compose down --volumes && make init-db

- name: Run tests
run: make test-audit

- name: Run database initialization
run: make init-db

# Both of these tasks are looking for github and slack auth
# - name: Export GitHub data
# run: make gh-data-export
Expand Down
5 changes: 3 additions & 2 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ Any changes to features, tools, or workflows should include updates or additions
To run the load test:

1. Install artillery locally if you haven't done so with `npm install -g artillery@latest`
2. `$ cd api` or or `$ cd frontend`
3. `$ make load-test-<env>` where env is either `local`, `dev`, `staging`, or `production`
2. For the frontend, download the required data from https://drive.google.com/file/d/1zknvVSRqL7xs8VGinztuKelfppYlgRoP and save "params.json" to `frontend/tests/artillery/params.json`
3. `$ cd api` or or `$ cd frontend`
4. `$ make load-test-<env>` where env is either `local`, `dev`, `staging`, or `production`

- `make load-test-local`
- requires running a local container in another console
Expand Down
4 changes: 2 additions & 2 deletions analytics/src/analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def export_json_to_database(delivery_file: Annotated[str, ISSUE_FILE_ARG]) -> No
def initialize_database() -> None:
"""Initialize etl database."""
logger.info("initializing database")
etldb.init_db()
etldb.initialize_database()
logger.info("done")


Expand All @@ -296,7 +296,7 @@ def transform_and_load(
dataset = EtlDataset.load_from_json_file(file_path=issue_file)

# sync data to db
etldb.sync_db(dataset, datestamp)
etldb.sync_data(dataset, datestamp)

# finish
print("transform and load is done")
8 changes: 4 additions & 4 deletions analytics/src/analytics/integrations/etldb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Read and write data from/to delivery metrics database."""

__all__ = [
"init_db",
"sync_db",
"initialize_database",
"sync_data",
]

from analytics.integrations.etldb.main import (
init_db,
sync_db,
initialize_database,
sync_data,
)
84 changes: 83 additions & 1 deletion analytics/src/analytics/integrations/etldb/etldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from enum import Enum

from sqlalchemy import Connection
from sqlalchemy import Connection, text

from analytics.integrations.db import PostgresDbClient

Expand Down Expand Up @@ -36,6 +36,88 @@ def commit(self, connection: Connection) -> None:
"""Commit an open transaction."""
connection.commit()

def get_schema_version(self) -> int:
"""Select schema version from etl database."""
version = 0

if self.schema_versioning_exists():
result = self.connection().execute(
text("select version from schema_version"),
)
row = result.fetchone()
if row:
version = row[0]

return version

def set_schema_version(self, new_value: int) -> bool:
"""Set schema version number."""
if not self.schema_versioning_exists():
return False

# sanity check new version number
current_version = self.get_schema_version()
if new_value < current_version:
message = (
"WARNING: cannot bump schema version "
f"from {current_version} to {new_value}"
)
print(message)
return False

if new_value > current_version:
cursor = self.connection()
cursor.execute(
text(
"insert into schema_version (version) values (:new_value) "
"on conflict(one_row) do update "
"set version = :new_value",
),
{"new_value": new_value},
)
self.commit(cursor)
return True

return False

def revert_to_schema_version(self, new_value: int) -> bool:
"""Revert schema version number to the previous version."""
if not self.schema_versioning_exists():
return False

# sanity check new version number
current_version = self.get_schema_version()
if new_value != current_version - 1 or new_value < 0:
message = (
"WARNING: cannot bump schema version "
f"from {current_version} to {new_value}"
)
print(message)
return False

cursor = self.connection()
cursor.execute(
text(
"insert into schema_version (version) values (:new_value) "
"on conflict(one_row) do update "
"set version = :new_value",
),
{"new_value": new_value},
)
self.commit(cursor)
return True

def schema_versioning_exists(self) -> bool:
"""Determine whether schema version table exists."""
result = self.connection().execute(
text(
"select table_name from information_schema.tables "
"where table_name = 'schema_version'",
),
)
row = result.fetchone()
return bool(row and row[0] == "schema_version")


class EtlChangeType(Enum):
"""An enum to describe ETL change types."""
Expand Down
139 changes: 98 additions & 41 deletions analytics/src/analytics/integrations/etldb/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Integrate with database to read and write etl data."""

import os
import re
from pathlib import Path

from psycopg.errors import InsufficientPrivilege
Expand All @@ -17,35 +19,55 @@
VERBOSE = False


def init_db() -> None:
"""Initialize etl database."""
# define the path to the sql file
parent_path = Path(__file__).resolve().parent
sql_path = f"{parent_path}/create_etl_db.sql"

# read sql file
with open(sql_path) as f:
sql = f.read()
def initialize_database() -> None:
"""
Create and/or update an etl database by applying a sequential set of migration scripts.
It applies the migrations using the following steps:
- Check the current schema version listed in the database
- Retrieve the list of migration scripts ordered by version
- If the current schema version is less than the version of the latest migration script
run the remaining migrations in order
- Bump the schema version in the database to the latest version
- If the current schema version matches the latest script, do nothing
"""
# get connection to database
etldb = EtlDb()
current_version = etldb.get_schema_version()
print(f"current schema version: {current_version}")

# get all sql file paths and respective version numbers
sql_file_path_map = get_sql_file_paths()
all_versions = sorted(sql_file_path_map.keys())

# iterate sql files
migration_count = 0
for next_version in all_versions:
if next_version <= current_version:
continue
# read sql file
with open(sql_file_path_map[next_version]) as f:
sql = f.read()
# execute sql
print(f"applying migration for schema version: {next_version}")
print(f"migration source file: {sql_file_path_map[next_version]}")
cursor = etldb.connection()
cursor.execute(
text(sql),
)
# commit changes
etldb.commit(cursor)
# bump schema version number
_ = etldb.set_schema_version(next_version)
current_version = next_version
migration_count += 1

# execute sql
try:
db = EtlDb()
cursor = db.connection()
cursor.execute(
text(sql),
)
db.commit(cursor)
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
) as e:
message = f"FATAL: Failed to initialize db: {e}"
raise RuntimeError(message) from e
# summarize results in output
print(f"total migrations applied: {migration_count}")
print(f"new schema version: {current_version}")


def sync_db(dataset: EtlDataset, effective: str) -> None:
def sync_data(dataset: EtlDataset, effective: str) -> None:
"""Write github data to etl database."""
# initialize a map of github id to db row id
ghid_map: dict[EtlEntityType, dict[str, int]] = {
Expand All @@ -63,10 +85,10 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
ghid_map[EtlEntityType.QUAD] = sync_quads(db, dataset)
print(f"quad row(s) processed: {len(ghid_map[EtlEntityType.QUAD])}")
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync quad data: {e}"
raise RuntimeError(message) from e
Expand All @@ -82,10 +104,10 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
f"deliverable row(s) processed: {len(ghid_map[EtlEntityType.DELIVERABLE])}",
)
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync deliverable data: {e}"
raise RuntimeError(message) from e
Expand All @@ -95,10 +117,10 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
ghid_map[EtlEntityType.SPRINT] = sync_sprints(db, dataset, ghid_map)
print(f"sprint row(s) processed: {len(ghid_map[EtlEntityType.SPRINT])}")
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync sprint data: {e}"
raise RuntimeError(message) from e
Expand All @@ -108,10 +130,10 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
ghid_map[EtlEntityType.EPIC] = sync_epics(db, dataset, ghid_map)
print(f"epic row(s) processed: {len(ghid_map[EtlEntityType.EPIC])}")
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync epic data: {e}"
raise RuntimeError(message) from e
Expand All @@ -121,10 +143,10 @@ def sync_db(dataset: EtlDataset, effective: str) -> None:
issue_map = sync_issues(db, dataset, ghid_map)
print(f"issue row(s) processed: {len(issue_map)}")
except (
RuntimeError,
ProgrammingError,
OperationalError,
InsufficientPrivilege,
OperationalError,
ProgrammingError,
RuntimeError,
) as e:
message = f"FATAL: Failed to sync issue data: {e}"
raise RuntimeError(message) from e
Expand Down Expand Up @@ -190,3 +212,38 @@ def sync_quads(db: EtlDb, dataset: EtlDataset) -> dict:
f"QUAD '{ghid}' title = '{quad_df['quad_name']}', row_id = {result[ghid]}",
)
return result


def get_sql_file_paths() -> dict[int, str]:
"""Get all sql files needed for database initialization."""
result = {}

# define the path to the sql files
sql_file_directory = f"{Path(__file__).resolve().parent}/migrations/versions"

# get list of sorted filenames
filename_list = sorted(os.listdir(sql_file_directory))

# expected filename format: {4_digit_version_number}_{short_description}.sql
# example: 0003_alter_tables_set_default_timestamp.sql
pattern = re.compile(r"^\d\d\d\d_.+\.sql$")

# compile dict of results
for filename in filename_list:
# validate filename format
if not pattern.match(filename):
message = f"FATAL: malformed db migration filename: {filename}"
raise RuntimeError(message)

# extrace version number from filename
version = int(filename[:4])

# do not allow duplicate version number
if version in result:
message = f"FATAL: Duplicate db migration version number: {version} "
raise RuntimeError(message)

# map the version number to the file path
result[version] = f"{sql_file_directory}/{filename}"

return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS schema_version (
one_row BOOL GENERATED ALWAYS AS (TRUE) STORED,
version INTEGER NOT NULL,
t_created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
t_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX IF NOT EXISTS sv_i1 ON schema_version(one_row);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ALTER TABLE gh_deliverable ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_deliverable_quad_map ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_epic ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_epic_deliverable_map ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_issue ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_issue_history ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_issue_sprint_map ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_sprint ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE gh_quad ALTER COLUMN t_modified SET DEFAULT CURRENT_TIMESTAMP;
Loading

0 comments on commit 5b8dab5

Please sign in to comment.