Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create teams, datasets, and account aliases when ingesting manifests #41

Merged
merged 23 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4bacad3
Create a team when ingesting an experiment from a manifest file
karlhigley Sep 3, 2024
14ec332
Create a dataset when ingesting an experiment from a manifest file
karlhigley Sep 3, 2024
5331dd0
Create account aliases when ingesting an experiment from a manifest file
karlhigley Sep 3, 2024
4f4bb1b
Apply formatting
karlhigley Sep 4, 2024
7908ac2
Reorganize the tests
karlhigley Sep 4, 2024
101882d
Add a test for manifest parsing and conversion
karlhigley Sep 4, 2024
6c1a706
Move sample manifest to a separate file
karlhigley Sep 4, 2024
1412ddc
Add a simple test that exercises experiment storage
karlhigley Sep 4, 2024
e1f21fe
Apply injection decorator to a non-test function
karlhigley Sep 4, 2024
b258e6e
Remove injection from test
karlhigley Sep 4, 2024
380aafe
Add additional tables to `DbExperimentRepository`
karlhigley Sep 4, 2024
1ef6939
Fix method call
karlhigley Sep 4, 2024
8a0c773
Create account for experiment owner
karlhigley Sep 4, 2024
c7008bf
Assert on `store_account` result
karlhigley Sep 4, 2024
5e8d43e
Remove `dataset_name` column from `datasets` table
karlhigley Sep 4, 2024
1e602c0
Assign UUIDs to groups etc when parsing manifest
karlhigley Sep 4, 2024
4e89d9c
Fix experiment and group columns for inserts
karlhigley Sep 4, 2024
d77e60f
Commit instead of rolling back before experiment insert
karlhigley Sep 4, 2024
1879e9a
Create the experiment owner account if it doesn't already exist
karlhigley Sep 4, 2024
bb67589
Fix repo name in docstring
karlhigley Sep 5, 2024
4569770
Make `dataset_name` column nullable instead of removing it
karlhigley Sep 5, 2024
bef16dc
Clear the relevant tables before running the experiment test
karlhigley Sep 6, 2024
fcbf587
Apply `clear_tables` test helper in other tests
karlhigley Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""make datasets table name column nullable

Revision ID: 8bf414e0ddfb
Revises: dd50d8e7777e
Create Date: 2024-09-04 11:25:43.062068

"""

from typing import Sequence, Union

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "8bf414e0ddfb"
down_revision: Union[str, None] = "dd50d8e7777e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.alter_column("datasets", "dataset_name", nullable=True)


def downgrade() -> None:
op.alter_column("datasets", "dataset_name", nullable=False)
7 changes: 7 additions & 0 deletions src/poprox_storage/concepts/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@
from pydantic import BaseModel, PositiveInt


class Team(BaseModel):
team_id: UUID | None = None
team_name: str
members: list[UUID]


class Experiment(BaseModel):
experiment_id: UUID | None = None
owner: Team
description: str
start_date: date
end_date: date
Expand Down
38 changes: 34 additions & 4 deletions src/poprox_storage/concepts/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

from copy import deepcopy
from datetime import date, timedelta
from uuid import UUID
from uuid import UUID, uuid4

import tomli
from pydantic import BaseModel, PositiveInt

from poprox_storage.concepts.experiment import (
Experiment,
Group,
Phase,
Recommender,
Team,
Treatment,
)

Expand All @@ -22,6 +24,7 @@ class ManifestFile(BaseModel):
"""

experiment: ManifestExperiment
owner: ManifestTeam
users: ManifestGroupSpec
recommenders: dict[str, ManifestRecommender]
phases: ManifestPhases
Expand All @@ -34,6 +37,12 @@ class ManifestExperiment(BaseModel):
start_date: date | None = None


class ManifestTeam(BaseModel):
team_id: UUID
team_name: str
members: list[UUID]


class ManifestPhases(BaseModel):
sequence: list[str]
phases: dict[str, ManifestPhase]
Expand Down Expand Up @@ -85,34 +94,43 @@ def manifest_to_experiment(manifest: ManifestFile) -> Experiment:
start_date = manifest.experiment.start_date or (date.today() + timedelta(days=1)) # noqa: DTZ011
end_date = start_date + convert_duration(manifest.experiment.duration)

owner = Team(
team_id=manifest.owner.team_id,
team_name=manifest.owner.team_name,
members=manifest.owner.members,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the implication here that the manifest will contain a list of member UUIDs? Do we like this, or should we have the manifest list emails and then look up the UUIDs as part of ingress here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- from a type-safety standpoint (which might be uninteresting FWIW) would this be list[str] or list[UUID] at this point? I feel like it would be list[str] but the Team class has list[UUID]?

Not sure we care about that, but it occurred to me that it's worth checking.

Copy link
Collaborator Author

@karlhigley karlhigley Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the implication here that the manifest will contain a list of member UUIDs?

Yes!

Do we like this, or should we have the manifest list emails and then look up the UUIDs as part of ingress here?

No! I do not like this but it allowed me to punt on integrating account lookups, which complicate the picture as far as testing goes.

would this be list[str] or list[UUID] at this point? I feel like it would be list[str] but the Team class has list[UUID]?

It should be list[UUID] but it's actually list[str]. I'll fix this!

Copy link
Collaborator Author

@karlhigley karlhigley Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sit corrected—after checking with a breakpoint it actually is list[UUID] here, so the types are correct. The conversion from string to UUID happens when Pydantic turns the manifest JSON into model objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from string to UUID happens when Pydantic turns the manifest JSON into model objects.

Neat!

)

experiment = Experiment(
experiment_id=manifest.experiment.id,
owner=owner,
start_date=start_date,
end_date=end_date,
description=manifest.experiment.description,
phases=[],
)

recommenders = {
rec_name: Recommender(name=rec_name, endpoint_url=recommender.endpoint)
rec_name: Recommender(recommender_id=uuid4(), name=rec_name, endpoint_url=recommender.endpoint)
for rec_name, recommender in manifest.recommenders.items()
}

groups = {}
for group_name, group in manifest.users.groups.items():
if group.identical_to:
new_group = deepcopy(groups[group.identical_to])
new_group.group_id = uuid4()
new_group.name = group_name
groups[group_name] = new_group
else:
groups[group_name] = Group(name=group_name, minimum_size=group.minimum_size)
groups[group_name] = Group(group_id=uuid4(), name=group_name, minimum_size=group.minimum_size)

phase_start = start_date
for phase_name in manifest.phases.sequence:
manifest_phase = manifest.phases.phases[phase_name]
duration = convert_duration(manifest_phase.duration)
phase_start = start_date + sum([phase.duration for phase in experiment.phases], start=timedelta(0))
phase_end = phase_start + duration
phase = Phase(name=phase_name, start_date=phase_start, end_date=phase_end, treatments=[])
phase = Phase(phase_id=uuid4(), name=phase_name, start_date=phase_start, end_date=phase_end, treatments=[])
for group_name, assignment in manifest_phase.assignments.items():
recommender_name = assignment.recommender
phase.treatments.append(
Expand All @@ -137,3 +155,15 @@ def convert_duration(duration: str) -> timedelta:
msg = f"Unsupported duration unit: {unit}"
raise ValueError(msg)
return duration


def parse_manifest_toml(manifest_file: str):
manifest_dict = tomli.loads(manifest_file)
phases = {"sequence": manifest_dict["phases"]["sequence"], "phases": {}}
for name, phase in manifest_dict["phases"].items():
if name != "sequence":
phases["phases"][name] = phase

manifest_dict["phases"] = phases

return ManifestFile.model_validate(manifest_dict)
61 changes: 61 additions & 0 deletions src/poprox_storage/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# pyright: strict
from __future__ import annotations

import logging
from pathlib import Path
from typing import overload

logger = logging.getLogger(__name__)
_cached_root: Path | None = None


@overload
def project_root() -> Path: ...
@overload
def project_root(*, require: bool) -> Path | None: ...
def project_root(*, require: bool = True) -> Path | None:
"""
Find the project root directory (when we are running in the project).

This searches upwards from the **current working directory** to find the
root of the project, which it identifies by the ``pyproject.toml`` file. If
this function is called from a directory that is not within a checkout of
the ``poprox-storage`` repository, it will raise an error.

Args:
require:
Whether to fail when the project root is not found, or return
``None``. If ``require=False`` this function will stil fail on a
*defective* project root (contains an invalid ``pyproject.toml``).

Returns:
The full path to the project root directory. If the project root is
not found and ``require=False``, returns ``None``.
"""
global _cached_root
if _cached_root is None:
cwd = Path(".").resolve()
candidate = cwd
logger.debug("searching for project root upwards from %s", candidate)
while not _is_project_root(candidate):
candidate = candidate.parent
if not candidate or str(candidate) == "/":
if require:
msg = f"cannot find project root for {cwd}"
raise RuntimeError(msg)
else:
# don't cache None
return None

logger.debug("found project root at %s", candidate)
_cached_root = candidate

return _cached_root


def _is_project_root(path: Path) -> bool:
tomlf = path / "pyproject.toml"
if tomlf.exists():
return True
else:
return False
15 changes: 15 additions & 0 deletions src/poprox_storage/repositories/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ def fetch_account_by_email(self, email: str) -> Account | None:
return accounts[0]
return None

def store_account(self, account: Account) -> UUID | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth revising the web code that uses the following store_new_account to use this version instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally sure, but maybe? I left the other method there so this wouldn't break anything, but I do kinda like having methods that accept domain objects instead of accepting the relevant info and creating domain objects internally

account_tbl = self.tables["accounts"]
query = (
sqlalchemy.insert(account_tbl)
.values(
account_id=account.account_id,
email=account.email,
source=account.source,
status="new_account",
)
.returning(account_tbl.c.account_id)
)
row = self.conn.execute(query).one_or_none()
return row.account_id

def store_new_account(self, email: str, source: str) -> Account:
account_tbl = self.tables["accounts"]
query = (
Expand Down
99 changes: 78 additions & 21 deletions src/poprox_storage/repositories/experiments.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
from uuid import UUID

import tomli
from sqlalchemy import Connection, Table, and_, select, update

from poprox_concepts import Account
Expand All @@ -11,9 +10,10 @@
Group,
Phase,
Recommender,
Team,
Treatment,
)
from poprox_storage.concepts.manifest import ManifestFile
from poprox_storage.concepts.manifest import ManifestFile, parse_manifest_toml
from poprox_storage.repositories.data_stores.db import DatabaseRepository
from poprox_storage.repositories.data_stores.s3 import S3Repository

Expand All @@ -22,23 +22,34 @@ class DbExperimentRepository(DatabaseRepository):
def __init__(self, connection: Connection):
super().__init__(connection)
self.tables: dict[str, Table] = self._load_tables(
"datasets",
"experiments",
"expt_assignments",
"expt_groups",
"expt_phases",
"expt_recommenders",
"expt_treatments",
"teams",
"team_memberships",
)

def store_experiment(self, experiment: Experiment, assignments: dict[str, list[Account]] | None = None):
def store_experiment(
self,
experiment: Experiment,
assignments: dict[str, list[Account]] | None = None,
):
assignments = assignments or {}
self.conn.rollback()
self.conn.commit()
with self.conn.begin():
experiment_id = self._insert_experiment(experiment)
experiment.owner.team_id = self._insert_expt_team(experiment.owner)
dataset_id = self._insert_expt_dataset(experiment.owner)
experiment_id = self._insert_experiment(dataset_id, experiment)

for group in experiment.groups:
group.group_id = self._insert_expt_group(experiment_id, group)
for account in assignments.get(group.name, []):
self._insert_account_alias(dataset_id, account)

assignment = Assignment(account_id=account.account_id, group_id=group.group_id)
self._insert_expt_assignment(assignment)

Expand Down Expand Up @@ -110,12 +121,21 @@ def fetch_active_expt_assignments(self, date: datetime.date | None = None) -> di
group_ids = self.fetch_active_expt_group_ids(date)

group_query = select(
assignments_tbl.c.assignment_id, assignments_tbl.c.account_id, assignments_tbl.c.group_id
).where(and_(assignments_tbl.c.group_id.in_(group_ids), assignments_tbl.c.opted_out is False))
assignments_tbl.c.assignment_id,
assignments_tbl.c.account_id,
assignments_tbl.c.group_id,
).where(
and_(
assignments_tbl.c.group_id.in_(group_ids),
assignments_tbl.c.opted_out is False,
)
)
result = self.conn.execute(group_query).fetchall()
group_lookup_by_account = {
row.account_id: Assignment(
assignment_id=row.assignment_id, account_id=row.account_id, group_id=row.group_id
assignment_id=row.assignment_id,
account_id=row.account_id,
group_id=row.group_id,
)
for row in result
}
Expand All @@ -140,16 +160,51 @@ def update_expt_assignment_to_opt_out(self, account_id: UUID) -> UUID | None:
)
self.conn.execute(assignment_query)

def _insert_experiment(self, experiment: Experiment) -> UUID | None:
return self._insert_model("experiments", experiment, exclude={"phases"}, commit=False)
def _insert_experiment(self, dataset_id: UUID, experiment: Experiment) -> UUID | None:
return self._insert_model(
"experiments",
experiment,
addl_fields={
"dataset_id": dataset_id,
"team_id": experiment.owner.team_id,
},
exclude={"owner", "phases"},
commit=False,
)

def _insert_expt_team(self, team: Team) -> UUID | None:
team_id = self._insert_model("teams", team, exclude={"members"}, commit=False)
for account_id in team.members:
self._insert_team_membership(team_id, account_id)
return team_id

def _insert_team_membership(self, team_id: UUID, account_id: UUID) -> UUID | None:
return self._upsert_and_return_id(
self.conn,
self.tables["team_memberships"],
{"team_id": team_id, "account_id": account_id},
commit=False,
)

def _insert_expt_dataset(self, team: Team) -> UUID | None:
return self._upsert_and_return_id(
self.conn,
self.tables["datasets"],
{"team_id": team.team_id},
commit=False,
)

def _insert_expt_group(
self,
experiment_id: UUID,
group: Group,
) -> UUID | None:
return self._insert_model(
"expt_groups", group, {"experiment_id": experiment_id}, exclude={"minimum_size"}, commit=False
"expt_groups",
group,
{"experiment_id": experiment_id, "group_name": group.name},
exclude={"minimum_size", "name"},
commit=False,
)

def _insert_expt_recommender(
Expand Down Expand Up @@ -198,6 +253,17 @@ def _insert_expt_treatment(
commit=False,
)

def _insert_account_alias(self, dataset_id: UUID, account: Account) -> UUID | None:
return self._upsert_and_return_id(
self.conn,
self.tables["account_aliases"],
values={
"dataset_id": dataset_id,
"account_id": account.account_id,
},
commit=False,
)

def _insert_expt_assignment(
self,
assignment: Assignment,
Expand All @@ -212,13 +278,4 @@ def _insert_expt_assignment(
class S3ExperimentRepository(S3Repository):
def fetch_manifest(self, manifest_file_key) -> ManifestFile:
manifest_toml = self._get_s3_file(manifest_file_key)
manifest_dict = tomli.loads(manifest_toml)

phases = {"sequence": manifest_dict["phases"]["sequence"], "phases": {}}
for name, phase in manifest_dict["phases"].items():
if name != "sequence":
phases["phases"][name] = phase

manifest_dict["phases"] = phases

return ManifestFile.model_validate(manifest_dict)
return parse_manifest_toml(manifest_toml)
Loading
Loading