diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index b865f67b98a9..fdfa36bd4069 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -357,11 +357,11 @@ def replace_dataset_ids(path, key, value): stmt_sq = self._build_job_subquery(tool_id, user.id, tool_version, job_state, wildcard_param_dump) - query = select(Job.id).select_from(Job.table.join(stmt_sq, stmt_sq.c.id == Job.id)) + stmt = select(Job.id).select_from(Job.table.join(stmt_sq, stmt_sq.c.id == Job.id)) data_conditions = [] - # We now build the query filters that relate to the input datasets + # We now build the stmt filters that relate to the input datasets # that this job uses. We keep track of the requested dataset id in `requested_ids`, # the type (hda, hdca or lda) in `data_types` # and the ids that have been used in the job that has already been run in `used_ids`. @@ -377,139 +377,20 @@ def replace_dataset_ids(path, key, value): v = type_values["id"] requested_ids.append(v) data_types.append(t) - identifier = type_values["identifier"] if t == "hda": - a = aliased(model.JobToInputDatasetAssociation) - b = aliased(model.HistoryDatasetAssociation) - c = aliased(model.HistoryDatasetAssociation) - d = aliased(model.JobParameter) - e = aliased(model.HistoryDatasetAssociationHistory) - query = query.add_columns(a.dataset_id) - used_ids.append(a.dataset_id) - query = query.join(a, a.job_id == model.Job.id) - stmt = select(model.HistoryDatasetAssociation.id).where( - model.HistoryDatasetAssociation.id == e.history_dataset_association_id - ) - # b is the HDA used for the job - query = query.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) - name_condition = [] - if identifier: - query = query.join(d) - data_conditions.append( - and_( - d.name.in_({f"{_}|__identifier__" for _ in k}), - d.value == json.dumps(identifier), - ) - ) - else: - stmt = stmt.where(e.name == c.name) - name_condition.append(b.name == c.name) - stmt = ( - stmt.where( - e.extension == c.extension, - ) - .where( - a.dataset_version == e.version, - ) - .where( - e._metadata == c._metadata, - ) - ) - data_conditions.append( - and_( - a.name.in_(k), - c.id == v, # c is the requested job input HDA - # We need to make sure that the job we are looking for has been run with identical inputs. - # Here we deal with 3 requirements: - # - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready - # - b's update_time is older than the job create time, meaning no changes occurred - # - the job has a dataset_version recorded, and that versions' metadata matches c's metadata. - or_( - and_( - or_(a.dataset_version.in_([0, b.version]), b.update_time < model.Job.create_time), - b.extension == c.extension, - b.metadata == c.metadata, - *name_condition, - ), - b.id.in_(stmt), - ), - or_(b.deleted == false(), c.deleted == false()), - ) - ) + stmt = self._build_stmt_for_hda(stmt, data_conditions, used_ids, k, v, type_values["identifier"]) elif t == "ldda": - a = aliased(model.JobToInputLibraryDatasetAssociation) - query = query.add_columns(a.ldda_id) - query = query.join(a, a.job_id == model.Job.id) - data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) - used_ids.append(a.ldda_id) + stmt = self._build_stmt_for_ldda(stmt, data_conditions, used_ids, k, v) elif t == "hdca": - a = aliased(model.JobToInputDatasetCollectionAssociation) - b = aliased(model.HistoryDatasetCollectionAssociation) - c = aliased(model.HistoryDatasetCollectionAssociation) - query = query.add_columns(a.dataset_collection_id) - query = ( - query.join(a, a.job_id == model.Job.id) - .join(b, b.id == a.dataset_collection_id) - .join(c, b.name == c.name) - ) - data_conditions.append( - and_( - a.name.in_(k), - c.id == v, - or_( - and_(b.deleted == false(), b.id == v), - and_( - or_( - c.copied_from_history_dataset_collection_association_id == b.id, - b.copied_from_history_dataset_collection_association_id == c.id, - ), - c.deleted == false(), - ), - ), - ) - ) - used_ids.append(a.dataset_collection_id) + stmt = self._build_stmt_for_hdca(stmt, data_conditions, used_ids, k, v) elif t == "dce": - a = aliased(model.JobToInputDatasetCollectionElementAssociation) - b = aliased(model.DatasetCollectionElement) - c = aliased(model.DatasetCollectionElement) - d = aliased(model.HistoryDatasetAssociation) - e = aliased(model.HistoryDatasetAssociation) - query = query.add_columns(a.dataset_collection_element_id) - query = ( - query.join(a, a.job_id == model.Job.id) - .join(b, b.id == a.dataset_collection_element_id) - .join( - c, - and_( - c.element_identifier == b.element_identifier, - or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id), - ), - ) - .outerjoin(d, d.id == c.hda_id) - .outerjoin(e, e.dataset_id == d.dataset_id) - ) - data_conditions.append( - and_( - a.name.in_(k), - or_( - c.child_collection_id == b.child_collection_id, - and_( - c.hda_id == b.hda_id, - d.id == c.hda_id, - e.dataset_id == d.dataset_id, - ), - ), - c.id == v, - ) - ) - used_ids.append(a.dataset_collection_element_id) + stmt = self._build_stmt_for_dce(stmt, data_conditions, used_ids, k, v) else: return [] - query = query.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc()) + stmt = stmt.where(*data_conditions).group_by(model.Job.id, *used_ids).order_by(model.Job.id.desc()) - for job in self.sa_session.execute(query): + for job in self.sa_session.execute(stmt): # We found a job that is equal in terms of tool_id, user, state and input datasets, # but to be able to verify that the parameters match we need to modify all instances of # dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the @@ -630,6 +511,136 @@ def _build_job_subquery(self, tool_id, user_id, tool_version, job_state, wildcar return stmt.subquery() + def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier): + a = aliased(model.JobToInputDatasetAssociation) + b = aliased(model.HistoryDatasetAssociation) + c = aliased(model.HistoryDatasetAssociation) + d = aliased(model.JobParameter) + e = aliased(model.HistoryDatasetAssociationHistory) + stmt = stmt.add_columns(a.dataset_id) + used_ids.append(a.dataset_id) + stmt = stmt.join(a, a.job_id == model.Job.id) + stmt = select(model.HistoryDatasetAssociation.id).where( + model.HistoryDatasetAssociation.id == e.history_dataset_association_id + ) + # b is the HDA used for the job + stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) + name_condition = [] + if identifier: + stmt = stmt.join(d) + data_conditions.append( + and_( + d.name.in_({f"{_}|__identifier__" for _ in k}), + d.value == json.dumps(identifier), + ) + ) + else: + stmt = stmt.where(e.name == c.name) + name_condition.append(b.name == c.name) + stmt = ( + stmt.where( + e.extension == c.extension, + ) + .where( + a.dataset_version == e.version, + ) + .where( + e._metadata == c._metadata, + ) + ) + data_conditions.append( + and_( + a.name.in_(k), + c.id == v, # c is the requested job input HDA + # We need to make sure that the job we are looking for has been run with identical inputs. + # Here we deal with 3 requirements: + # - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready + # - b's update_time is older than the job create time, meaning no changes occurred + # - the job has a dataset_version recorded, and that versions' metadata matches c's metadata. + or_( + and_( + or_(a.dataset_version.in_([0, b.version]), b.update_time < model.Job.create_time), + b.extension == c.extension, + b.metadata == c.metadata, + *name_condition, + ), + b.id.in_(stmt), + ), + or_(b.deleted == false(), c.deleted == false()), + ) + ) + return stmt + + def _build_stmt_for_ldda(self, stmt, data_conditions, used_ids, k, v): + a = aliased(model.JobToInputLibraryDatasetAssociation) + stmt = stmt.add_columns(a.ldda_id) + stmt = stmt.join(a, a.job_id == model.Job.id) + data_conditions.append(and_(a.name.in_(k), a.ldda_id == v)) + used_ids.append(a.ldda_id) + return stmt + + def _build_stmt_for_hdca(self, stmt, data_conditions, used_ids, k, v): + a = aliased(model.JobToInputDatasetCollectionAssociation) + b = aliased(model.HistoryDatasetCollectionAssociation) + c = aliased(model.HistoryDatasetCollectionAssociation) + stmt = stmt.add_columns(a.dataset_collection_id) + stmt = stmt.join(a, a.job_id == model.Job.id).join(b, b.id == a.dataset_collection_id).join(c, b.name == c.name) + data_conditions.append( + and_( + a.name.in_(k), + c.id == v, + or_( + and_(b.deleted == false(), b.id == v), + and_( + or_( + c.copied_from_history_dataset_collection_association_id == b.id, + b.copied_from_history_dataset_collection_association_id == c.id, + ), + c.deleted == false(), + ), + ), + ) + ) + used_ids.append(a.dataset_collection_id) + return stmt + + def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v): + a = aliased(model.JobToInputDatasetCollectionElementAssociation) + b = aliased(model.DatasetCollectionElement) + c = aliased(model.DatasetCollectionElement) + d = aliased(model.HistoryDatasetAssociation) + e = aliased(model.HistoryDatasetAssociation) + stmt = stmt.add_columns(a.dataset_collection_element_id) + stmt = ( + stmt.join(a, a.job_id == model.Job.id) + .join(b, b.id == a.dataset_collection_element_id) + .join( + c, + and_( + c.element_identifier == b.element_identifier, + or_(c.hda_id == b.hda_id, c.child_collection_id == b.child_collection_id), + ), + ) + .outerjoin(d, d.id == c.hda_id) + .outerjoin(e, e.dataset_id == d.dataset_id) + ) + data_conditions.append( + and_( + a.name.in_(k), + or_( + c.child_collection_id == b.child_collection_id, + and_( + c.hda_id == b.hda_id, + d.id == c.hda_id, + e.dataset_id == d.dataset_id, + ), + ), + c.id == v, + ) + ) + used_ids.append(a.dataset_collection_element_id) + return stmt + def view_show_job(trans, job: Job, full: bool) -> typing.Dict: is_admin = trans.user_is_admin