Skip to content

Commit

Permalink
Job search refactor 2: move build stmt into functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Jan 27, 2024
1 parent 6bd6c51 commit af53e10
Showing 1 changed file with 138 additions and 127 deletions.
265 changes: 138 additions & 127 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,11 @@ def replace_dataset_ids(path, key, value):

stmt_sq = self._build_job_subquery(tool_id, user.id, tool_version, job_state, wildcard_param_dump)

query = select(Job.id).select_from(Job.table.join(stmt_sq, stmt_sq.c.id == Job.id))
stmt = select(Job.id).select_from(Job.table.join(stmt_sq, stmt_sq.c.id == Job.id))

data_conditions = []

# We now build the query filters that relate to the input datasets
# We now build the stmt filters that relate to the input datasets
# that this job uses. We keep track of the requested dataset id in `requested_ids`,
# the type (hda, hdca or lda) in `data_types`
# and the ids that have been used in the job that has already been run in `used_ids`.
Expand All @@ -377,139 +377,20 @@ def replace_dataset_ids(path, key, value):
v = type_values["id"]
requested_ids.append(v)
data_types.append(t)
identifier = type_values["identifier"]
if t == "hda":
a = aliased(model.JobToInputDatasetAssociation)
b = aliased(model.HistoryDatasetAssociation)
c = aliased(model.HistoryDatasetAssociation)
d = aliased(model.JobParameter)
e = aliased(model.HistoryDatasetAssociationHistory)
query = query.add_columns(a.dataset_id)
used_ids.append(a.dataset_id)
query = query.join(a, a.job_id == model.Job.id)
stmt = select(model.HistoryDatasetAssociation.id).where(
model.HistoryDatasetAssociation.id == e.history_dataset_association_id
)
# b is the HDA used for the job
query = query.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id)
name_condition = []
if identifier:
query = query.join(d)
data_conditions.append(
and_(
d.name.in_({f"{_}|__identifier__" for _ in k}),
d.value == json.dumps(identifier),
)
)
else:
stmt = stmt.where(e.name == c.name)
name_condition.append(b.name == c.name)
stmt = (
stmt.where(
e.extension == c.extension,
)
.where(
a.dataset_version == e.version,
)
.where(
e._metadata == c._metadata,
)
)
data_conditions.append(
and_(
a.name.in_(k),
c.id == v, # c is the requested job input HDA
# We need to make sure that the job we are looking for has been run with identical inputs.
# Here we deal with 3 requirements:
# - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready
# - b's update_time is older than the job create time, meaning no changes occurred
# - the job has a dataset_version recorded, and that versions' metadata matches c's metadata.
or_(
and_(
or_(a.dataset_version.in_([0, b.version]), b.update_time < model.Job.create_time),
b.extension == c.extension,
b.metadata == c.metadata,
*name_condition,
),
b.id.in_(stmt),
),
or_(b.deleted == false(), c.deleted == false()),
)
)
stmt = self._build_stmt_for_hda(stmt, data_conditions, used_ids, k, v, type_values["identifier"])
elif t == "ldda":
a = aliased(model.JobToInputLibraryDatasetAssociation)
query = query.add_columns(a.ldda_id)
query = query.join(a, a.job_id == model.Job.id)
data_conditions.append(and_(a.name.in_(k), a.ldda_id == v))
used_ids.append(a.ldda_id)
stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v)
elif t == "hdca":
a = aliased(model.JobToInputDatasetCollectionAssociation)
b = aliased(model.HistoryDatasetCollectionAssociation)
c = aliased(model.HistoryDatasetCollectionAssociation)
query = query.add_columns(a.dataset_collection_id)
query = (
query.join(a, a.job_id == model.Job.id)
.join(b, b.id == a.dataset_collection_id)
.join(c, b.name == c.name)
)
data_conditions.append(
and_(
a.name.in_(k),
c.id == v,
or_(
and_(b.deleted == false(), b.id == v),
and_(
or_(
c.copied_from_history_dataset_collection_association_id == b.id,
b.copied_from_history_dataset_collection_association_id == c.id,
),
c.deleted == false(),
),
),
)
)
used_ids.append(a.dataset_collection_id)
stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v)
elif t == "dce":
a = aliased(model.JobToInputDatasetCollectionElementAssociation)
b = aliased(model.DatasetCollectionElement)
c = aliased(model.DatasetCollectionElement)
d = aliased(model.HistoryDatasetAssociation)
e = aliased(model.HistoryDatasetAssociation)
query = query.add_columns(a.dataset_collection_element_id)
query = (
query.join(a, a.job_id == model.Job.id)
.join(b, b.id == a.dataset_collection_element_id)
.join(
c,
and_(
c.element_identifier == b.element_identifier,
or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id),
),
)
.outerjoin(d, d.id == c.hda_id)
.outerjoin(e, e.dataset_id == d.dataset_id)
)
data_conditions.append(
and_(
a.name.in_(k),
or_(
c.child_collection_id == b.child_collection_id,
and_(
c.hda_id == b.hda_id,
d.id == c.hda_id,
e.dataset_id == d.dataset_id,
),
),
c.id == v,
)
)
used_ids.append(a.dataset_collection_element_id)
stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v)
else:
return []

query = query.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc())
stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc())

for job in self.sa_session.execute(query):
for job in self.sa_session.execute(stmt):
# We found a job that is equal in terms of tool_id, user, state and input datasets,
# but to be able to verify that the parameters match we need to modify all instances of
# dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the
Expand Down Expand Up @@ -630,6 +511,136 @@ def _build_job_subquery(self, tool_id, user_id, tool_version, job_state, wildcar

return stmt.subquery()

def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier):
a = aliased(model.JobToInputDatasetAssociation)
b = aliased(model.HistoryDatasetAssociation)
c = aliased(model.HistoryDatasetAssociation)
d = aliased(model.JobParameter)
e = aliased(model.HistoryDatasetAssociationHistory)
stmt = stmt.add_columns(a.dataset_id)
used_ids.append(a.dataset_id)
stmt = stmt.join(a, a.job_id == model.Job.id)
stmt = select(model.HistoryDatasetAssociation.id).where(
model.HistoryDatasetAssociation.id == e.history_dataset_association_id
)
# b is the HDA used for the job
stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id)
name_condition = []
if identifier:
stmt = stmt.join(d)
data_conditions.append(
and_(
d.name.in_({f"{_}|__identifier__" for _ in k}),
d.value == json.dumps(identifier),
)
)
else:
stmt = stmt.where(e.name == c.name)
name_condition.append(b.name == c.name)
stmt = (
stmt.where(
e.extension == c.extension,
)
.where(
a.dataset_version == e.version,
)
.where(
e._metadata == c._metadata,
)
)
data_conditions.append(
and_(
a.name.in_(k),
c.id == v, # c is the requested job input HDA
# We need to make sure that the job we are looking for has been run with identical inputs.
# Here we deal with 3 requirements:
# - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready
# - b's update_time is older than the job create time, meaning no changes occurred
# - the job has a dataset_version recorded, and that versions' metadata matches c's metadata.
or_(
and_(
or_(a.dataset_version.in_([0, b.version]), b.update_time < model.Job.create_time),
b.extension == c.extension,
b.metadata == c.metadata,
*name_condition,
),
b.id.in_(stmt),
),
or_(b.deleted == false(), c.deleted == false()),
)
)
return stmt

def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v):
a = aliased(model.JobToInputLibraryDatasetAssociation)
stmt = stmt.add_columns(a.ldda_id)
stmt = stmt.join(a, a.job_id == model.Job.id)
data_conditions.append(and_(a.name.in_(k), a.ldda_id == v))
used_ids.append(a.ldda_id)
return stmt

def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v):
a = aliased(model.JobToInputDatasetCollectionAssociation)
b = aliased(model.HistoryDatasetCollectionAssociation)
c = aliased(model.HistoryDatasetCollectionAssociation)
stmt = stmt.add_columns(a.dataset_collection_id)
stmt = stmt.join(a, a.job_id == model.Job.id).join(b, b.id == a.dataset_collection_id).join(c, b.name == c.name)
data_conditions.append(
and_(
a.name.in_(k),
c.id == v,
or_(
and_(b.deleted == false(), b.id == v),
and_(
or_(
c.copied_from_history_dataset_collection_association_id == b.id,
b.copied_from_history_dataset_collection_association_id == c.id,
),
c.deleted == false(),
),
),
)
)
used_ids.append(a.dataset_collection_id)
return stmt

def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v):
a = aliased(model.JobToInputDatasetCollectionElementAssociation)
b = aliased(model.DatasetCollectionElement)
c = aliased(model.DatasetCollectionElement)
d = aliased(model.HistoryDatasetAssociation)
e = aliased(model.HistoryDatasetAssociation)
stmt = stmt.add_columns(a.dataset_collection_element_id)
stmt = (
stmt.join(a, a.job_id == model.Job.id)
.join(b, b.id == a.dataset_collection_element_id)
.join(
c,
and_(
c.element_identifier == b.element_identifier,
or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id),
),
)
.outerjoin(d, d.id == c.hda_id)
.outerjoin(e, e.dataset_id == d.dataset_id)
)
data_conditions.append(
and_(
a.name.in_(k),
or_(
c.child_collection_id == b.child_collection_id,
and_(
c.hda_id == b.hda_id,
d.id == c.hda_id,
e.dataset_id == d.dataset_id,
),
),
c.id == v,
)
)
used_ids.append(a.dataset_collection_element_id)
return stmt


def view_show_job(trans, job: Job, full: bool) -> typing.Dict:
is_admin = trans.user_is_admin
Expand Down

0 comments on commit af53e10

Please sign in to comment.