From 82a4769b41f0704c82b7801ddaa3ba9a2d09d3b7 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 2 Jun 2022 09:56:08 +0200 Subject: [PATCH 01/11] Remove support for mover resolves #31 --- .../8a4cc1553379_decommission_mover.py | 36 ++++ config/app.config | 1 - delivery/app.py | 76 +++---- delivery/exceptions.py | 27 ++- delivery/handlers/delivery_handlers.py | 44 ++-- delivery/models/db_models.py | 18 +- .../repositories/deliveries_repository.py | 11 +- delivery/services/dds_service.py | 81 +++++--- delivery/services/delivery_service.py | 39 ++-- delivery/services/mover_service.py | 158 -------------- delivery/services/staging_service.py | 8 +- tests/integration_tests/base.py | 6 +- tests/integration_tests/test_integration.py | 8 +- .../integration_tests/test_integration_dds.py | 25 ++- .../test_integration_mover.py | 193 ------------------ tests/unit_tests/models/test_db_models.py | 19 +- tests/unit_tests/services/test_dds.py | 18 +- .../services/test_delivery_service.py | 108 +++++----- .../services/test_mover_delivery_service.py | 181 ---------------- 19 files changed, 290 insertions(+), 767 deletions(-) create mode 100644 alembic/versions/8a4cc1553379_decommission_mover.py delete mode 100644 delivery/services/mover_service.py delete mode 100644 tests/integration_tests/test_integration_mover.py delete mode 100644 tests/unit_tests/services/test_mover_delivery_service.py diff --git a/alembic/versions/8a4cc1553379_decommission_mover.py b/alembic/versions/8a4cc1553379_decommission_mover.py new file mode 100644 index 0000000..4f564d9 --- /dev/null +++ b/alembic/versions/8a4cc1553379_decommission_mover.py @@ -0,0 +1,36 @@ +"""decommission mover + +Revision ID: 8a4cc1553379 +Revises: 85082f64ccd0 +Create Date: 2022-06-02 10:35:30.789457 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '8a4cc1553379' +down_revision = '85082f64ccd0' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + #with op.batch_alter_table('delivery_orders', schema=None) as batch_op: + #op.alter_column('delivery_orders', 'mover_pid', new_column_name='dds_pid') + with op.batch_alter_table('delivery_orders') as batch_op: + batch_op.drop_column('mover_delivery_id') + batch_op.drop_column('md5sum_file') + batch_op.alter_column('mover_pid', new_column_name='dds_pid') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('delivery_orders') as batch_op: + batch_op.alter_column('dds_pid', new_column_name='mover_pid') + op.add_column('delivery_orders', sa.Column('md5sum_file', sa.String(), nullable=True)) + op.add_column('delivery_orders', sa.Column('mover_delivery_id', sa.String(), nullable=True)) + # ### end Alembic commands ### diff --git a/config/app.config b/config/app.config index d373aaf..d04b828 100644 --- a/config/app.config +++ b/config/app.config @@ -8,7 +8,6 @@ runfolder_directory: tests/resources/runfolders general_project_directory: tests/resources/projects staging_directory: /tmp/ project_links_directory: /tmp/ -path_to_mover: '/usr/local/mover/1.0.0/' dds_conf: log_path: dds.log mount_dir: /tmp/ diff --git a/delivery/app.py b/delivery/app.py index dec8027..ca427df 100644 --- a/delivery/app.py +++ b/delivery/app.py @@ -30,7 +30,6 @@ from delivery.repositories.sample_repository import RunfolderProjectBasedSampleRepository -from delivery.services.mover_service import MoverDeliveryService from delivery.services.dds_service import DDSService from delivery.services.external_program_service import ExternalProgramService from delivery.services.staging_service import StagingService @@ -100,16 +99,17 @@ def create_and_migrate_db(db_engine, alembic_path, db_connection_string): def compose_application(config): """ - Instantiates all service, repos, etc which are then used by the application. - The resulting dictionary can then be passed on to routes which instantiates the - http handlers. + Instantiates all service, repos, etc which are then used by the + application. The resulting dictionary can then be passed on to routes + which instantiates the http handlers. :param config: a configuration instance :return: a dictionary with references to any relevant resources """ def _assert_is_dir(directory): if not FileSystemService.isdir(directory): - raise AssertionError("{} is not a directory".format(os.path.abspath(directory))) + raise AssertionError( + "{} is not a directory".format(os.path.abspath(directory))) staging_dir = config['staging_directory'] _assert_is_dir(staging_dir) @@ -132,7 +132,8 @@ def _assert_is_dir(directory): general_project_dir = config['general_project_directory'] _assert_is_dir(general_project_dir) - general_project_repo = GeneralProjectRepository(root_directory=general_project_dir) + general_project_repo = GeneralProjectRepository( + root_directory=general_project_dir) external_program_service = ExternalProgramService() db_connection_string = config["db_connection_string"] @@ -144,45 +145,45 @@ def _assert_is_dir(directory): session_factory = scoped_session(sessionmaker()) session_factory.configure(bind=engine) - staging_repo = DatabaseBasedStagingRepository(session_factory=session_factory) + staging_repo = DatabaseBasedStagingRepository( + session_factory=session_factory) - staging_service = StagingService(external_program_service=external_program_service, - runfolder_repo=runfolder_repo, - project_dir_repo=general_project_repo, - staging_repo=staging_repo, - staging_dir=staging_dir, - project_links_directory=project_links_directory, - session_factory=session_factory) + staging_service = StagingService( + external_program_service=external_program_service, + runfolder_repo=runfolder_repo, + project_dir_repo=general_project_repo, + staging_repo=staging_repo, + staging_dir=staging_dir, + project_links_directory=project_links_directory, + session_factory=session_factory) - delivery_repo = DatabaseBasedDeliveriesRepository(session_factory=session_factory) - - path_to_mover = config['path_to_mover'] - mover_delivery_service = MoverDeliveryService(external_program_service=external_program_service, - staging_service=staging_service, - delivery_repo=delivery_repo, - session_factory=session_factory, - path_to_mover=path_to_mover) + delivery_repo = DatabaseBasedDeliveriesRepository( + session_factory=session_factory) dds_conf = config['dds_conf'] dds_project_repo = DDSProjectRepository(session_factory=session_factory) - dds_service = DDSService(external_program_service=external_program_service, - staging_service=staging_service, - delivery_repo=delivery_repo, - dds_project_repo=dds_project_repo, - session_factory=session_factory, - dds_conf=dds_conf) - - delivery_sources_repo = DatabaseBasedDeliverySourcesRepository(session_factory=session_factory) + dds_service = DDSService( + external_program_service=external_program_service, + staging_service=staging_service, + delivery_repo=delivery_repo, + dds_project_repo=dds_project_repo, + session_factory=session_factory, + dds_conf=dds_conf) + + delivery_sources_repo = DatabaseBasedDeliverySourcesRepository( + session_factory=session_factory) runfolder_service = RunfolderService(runfolder_repo) - delivery_service = DeliveryService(mover_service=mover_delivery_service, - staging_service=staging_service, - delivery_sources_repo=delivery_sources_repo, - general_project_repo=general_project_repo, - runfolder_service=runfolder_service, - project_links_directory=project_links_directory) + delivery_service = DeliveryService( + dds_service=dds_service, + staging_service=staging_service, + delivery_sources_repo=delivery_sources_repo, + general_project_repo=general_project_repo, + runfolder_service=runfolder_service, + project_links_directory=project_links_directory) - best_practice_analysis_service = BestPracticeAnalysisService(general_project_repo) + best_practice_analysis_service = BestPracticeAnalysisService( + general_project_repo) organise_service = OrganiseService( runfolder_service=RunfolderService(unorganised_runfolder_repo)) @@ -191,7 +192,6 @@ def _assert_is_dir(directory): runfolder_repo=runfolder_repo, external_program_service=external_program_service, staging_service=staging_service, - mover_delivery_service=mover_delivery_service, dds_service=dds_service, delivery_service=delivery_service, general_project_repo=general_project_repo, diff --git a/delivery/exceptions.py b/delivery/exceptions.py index 27ecee7..3c95e84 100644 --- a/delivery/exceptions.py +++ b/delivery/exceptions.py @@ -2,21 +2,22 @@ class RunfolderNotFoundException(Exception): """ - Should be raised when a runfolder is not found + Should be raised when a runfolder is not found. """ pass class ChecksumNotFoundException(Exception): """ - Should be raised when a file checksum could not be found in the list of checksums + Should be raised when a file checksum could not be found in the list of + checksums. """ pass class ChecksumFileNotFoundException(Exception): """ - Should be raised when an expected checksum file could not be found + Should be raised when an expected checksum file could not be found. """ pass @@ -37,22 +38,16 @@ class ProjectReportNotFoundException(Exception): class TooManyProjectsFound(Exception): """ - Should be raise when to many projects match some specific criteria + Should be raise when to many projects match some specific criteria. """ pass class InvalidStatusException(Exception): """ - Should be raised when an object is found to be in a invalid state, e.g. if the program tries to start staging - on a StagingOrder which is already `in_progress` - """ - pass - - -class CannotParseMoverOutputException(Exception): - """ - Should be raised when movers output cannot be parsed for e.g. a mover delivery id. + Should be raised when an object is found to be in a invalid state, e.g. if + the program tries to start staging on a StagingOrder which is already + `in_progress`. """ pass @@ -81,12 +76,14 @@ class SamplesheetNotFoundException(Exception): class ProjectsDirNotfoundException(Exception): """ - Should be raised when a directory containing projects could not be found + Should be raised when a directory containing projects could not be found. """ pass + class CannotParseDDSOutputException(Exception): """ - Should be raised when DDS's output cannot be parsed for e.g. creating a project. + Should be raised when DDS's output cannot be parsed for e.g. creating a + project. """ pass diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 7e254fc..9e53bc4 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -16,15 +16,15 @@ class DeliverByStageIdHandler(ArteriaDeliveryBaseHandler): """ def initialize(self, **kwargs): - self.dds = self.body_as_object().get('dds', False) - self.delivery_service = kwargs["dds_service"] if self.dds else kwargs["mover_delivery_service"] + self.delivery_service = kwargs["dds_service"] super(DeliverByStageIdHandler, self).initialize(kwargs) @coroutine def post(self, staging_id): - required_members = ["delivery_project_id"] - if self.dds: - required_members += ["token_path"] + required_members = [ + "delivery_project_id", + "token_path" + ] request_data = self.body_as_object(required_members=required_members) delivery_project_id = request_data["delivery_project_id"] @@ -36,24 +36,25 @@ def post(self, staging_id): extra_args['token_path'] = token_path # This should only be used for testing purposes /JD 20170202 - skip_mover_request = request_data.get("skip_mover") - if skip_mover_request and skip_mover_request == True: - log.info("Got the command to skip Mover...") - skip_mover = True + skip_delivery_request = request_data.get("skip_delivery") + if skip_delivery_request and skip_delivery_request == True: + log.info("Got the command to skip delivery...") + skip_delivery = True else: - log.debug("Will not skip running mover!") - skip_mover = False + log.debug("Will not skip running delivery!") + skip_delivery = False delivery_id = yield self.delivery_service.deliver_by_staging_id( staging_id=staging_id, delivery_project=delivery_project_id, md5sum_file=md5sum_file, - skip_mover=skip_mover, + skip_delivery=skip_delivery, **extra_args) - status_end_point = "{0}://{1}{2}".format(self.request.protocol, - self.request.host, - self.reverse_url("delivery_status", delivery_id)) + status_end_point = "{0}://{1}{2}".format( + self.request.protocol, + self.request.host, + self.reverse_url("delivery_status", delivery_id)) self.set_status(ACCEPTED) self.write_json({'delivery_order_id': delivery_id, @@ -63,25 +64,20 @@ def post(self, staging_id): class DeliveryStatusHandler(ArteriaDeliveryBaseHandler): def initialize(self, **kwargs): - self.mover_delivery_service = kwargs["mover_delivery_service"] - self.dds_delivery_service = kwargs["dds_service"] + self.delivery_service = kwargs["dds_service"] super(DeliveryStatusHandler, self).initialize(kwargs) @coroutine def get(self, delivery_order_id): - delivery_order = self.mover_delivery_service\ + delivery_order = self.delivery_service\ .get_delivery_order_by_id(delivery_order_id) - delivery_service = self.dds_delivery_service\ - if delivery_order.is_dds()\ - else self.mover_delivery_service - - delivery_order = yield delivery_service.update_delivery_status(delivery_order_id) + delivery_order = yield self.delivery_service.update_delivery_status( + delivery_order_id) body = { 'id': delivery_order.id, 'status': delivery_order.delivery_status.name, - 'mover_delivery_id': delivery_order.mover_delivery_id } self.write_json(body) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index 0d2aff9..58555f6 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -6,9 +6,10 @@ from sqlalchemy.ext.declarative import declarative_base """ -Use this as the base for all database based models. This is used by alembic to know what the tables -should look like in the database, so defining new base classes elsewhere will mean that they will not -be updated properly in the actual database. +Use this as the base for all database based models. This is used by alembic to +know what the tables should look like in the database, so defining new base +classes elsewhere will mean that they will not be updated properly in the +actual database. """ SQLAlchemyBase = declarative_base() @@ -117,8 +118,6 @@ class DeliveryStatus(base_enum.Enum): pending = 'pending' - mover_processing_delivery = 'mover_processing_delivery' - mover_failed_delivery = 'mover_failed_delivery' delivery_in_progress = 'delivery_in_progress' delivery_successful = 'delivery_successful' delivery_failed = 'delivery_failed' @@ -136,15 +135,8 @@ class DeliveryOrder(SQLAlchemyBase): delivery_source = Column(String, nullable=False) delivery_project = Column(String, nullable=False) - # Optional path to md5sum file - md5sum_file = Column(String) - # Process id of Mover process used to start the delivery - mover_pid = Column(Integer) - - # Mover delivery id - the id that is needed to query mover about - # a delivery status - mover_delivery_id = Column(String) + dds_pid = Column(Integer) delivery_status = Column(Enum(DeliveryStatus)) # TODO This should really be enforcing a foreign key constraint diff --git a/delivery/repositories/deliveries_repository.py b/delivery/repositories/deliveries_repository.py index 9a539d1..4cba258 100644 --- a/delivery/repositories/deliveries_repository.py +++ b/delivery/repositories/deliveries_repository.py @@ -57,23 +57,22 @@ def create_delivery_order(self, delivery_project, delivery_status, staging_order_id, - md5sum_file=None): + ): """ Create a new delivery order and commit it to the database :param delivery_source: the source directory to be delivered :param delivery_project: the project code for the project to deliver to :param delivery_status: status of the delivery - :param staging_order_id: NOTA BENE: this will need to be verified against the staging table before - inserting it here, because at this point there is no validation that the - value is valid! - :param md5sum_file: Optional path to an md5sum file that mover to check files against. + :param staging_order_id: NOTA BENE: this will need to be verified + against the staging table before inserting it here, because at this + point there is no validation that the value is valid! :return: the created delivery order """ order = DeliveryOrder(delivery_source=delivery_source, delivery_project=delivery_project, delivery_status=delivery_status, staging_order_id=staging_order_id, - md5sum_file=md5sum_file) + ) self.session.add(order) self.session.commit() diff --git a/delivery/services/dds_service.py b/delivery/services/dds_service.py index 744a997..78c47f2 100644 --- a/delivery/services/dds_service.py +++ b/delivery/services/dds_service.py @@ -22,7 +22,7 @@ def __init__( session_factory, dds_conf): self.external_program_service = external_program_service - self.mover_external_program_service = self.external_program_service + self.dds_external_program_service = self.external_program_service self.staging_service = staging_service self.delivery_repo = delivery_repo self.dds_project_repo = dds_project_repo @@ -94,13 +94,22 @@ async def create_dds_project(self, project_name, project_metadata): @staticmethod @gen.coroutine - def _run_dds_put(delivery_order_id, delivery_order_repo, external_program_service, session_factory, token_path, dds_conf): + def _run_dds_put( + delivery_order_id, + delivery_order_repo, + external_program_service, + session_factory, + token_path, + dds_conf, + ): session = session_factory() - # This is a somewhat hacky work-around to the problem that objects created in one - # thread, and thus associated with another session cannot be accessed by another - # thread, therefore it is re-materialized in here... - delivery_order = delivery_order_repo.get_delivery_order_by_id(delivery_order_id, session) + # This is a somewhat hacky work-around to the problem that objects + # created in one thread, and thus associated with another session + # cannot be accessed by another thread, therefore it is re-materialized + # in here... + delivery_order = delivery_order_repo.get_delivery_order_by_id( + delivery_order_id, session) try: cmd = [ 'dds', @@ -120,7 +129,7 @@ def _run_dds_put(delivery_order_id, delivery_order_repo, external_program_servic execution = external_program_service.run(cmd) delivery_order.delivery_status = DeliveryStatus.delivery_in_progress - delivery_order.mover_pid = execution.pid + delivery_order.dds_pid = execution.pid session.commit() execution_result = yield external_program_service.wait_for_execution(execution) @@ -130,41 +139,55 @@ def _run_dds_put(delivery_order_id, delivery_order_repo, external_program_servic log.info(f"Successfully delivered: {delivery_order}") else: delivery_order.delivery_status = DeliveryStatus.delivery_failed - error_msg = f"Failed to deliver: {delivery_order}. DDS returned status code: {execution_result.status_code}" + error_msg = \ + f"Failed to deliver: {delivery_order}." \ + f"DDS returned status code: {execution_result.status_code}" log.error(error_msg) raise RuntimeError(error_msg) except Exception as e: delivery_order.delivery_status = DeliveryStatus.delivery_failed - log.error(f"Failed to deliver: {delivery_order} because this exception was logged: {e}") + log.error( + f"Failed to deliver: {delivery_order}" + f"because this exception was logged: {e}") raise e finally: # Always commit the state change to the database session.commit() @gen.coroutine - def deliver_by_staging_id(self, staging_id, delivery_project, md5sum_file, token_path, skip_mover=False): + def deliver_by_staging_id( + self, + staging_id, + delivery_project, + md5sum_file, + token_path, + skip_delivery=False): stage_order = self.staging_service.get_stage_order_by_id(staging_id) - if not stage_order or not stage_order.status == StagingStatus.staging_successful: - raise InvalidStatusException("Only deliver by staging_id if it has a successful status!" - "Staging order was: {}".format(stage_order)) - - delivery_order = self.delivery_repo.create_delivery_order(delivery_source=stage_order.get_staging_path(), - delivery_project=delivery_project, - delivery_status=DeliveryStatus.pending, - staging_order_id=staging_id, - md5sum_file=md5sum_file) - - args_for_run_dds_put = {'delivery_order_id': delivery_order.id, - 'delivery_order_repo': self.delivery_repo, - 'external_program_service': self.mover_external_program_service, - 'session_factory': self.session_factory, - 'token_path': token_path, - 'dds_conf': self.dds_conf, - } - - if skip_mover: + if not stage_order \ + or not stage_order.status == StagingStatus.staging_successful: + raise InvalidStatusException( + "Only deliver by staging_id if it has a successful status!" + "Staging order was: {}".format(stage_order)) + + delivery_order = self.delivery_repo.create_delivery_order( + delivery_source=stage_order.get_staging_path(), + delivery_project=delivery_project, + delivery_status=DeliveryStatus.pending, + staging_order_id=staging_id, + ) + + args_for_run_dds_put = { + 'delivery_order_id': delivery_order.id, + 'delivery_order_repo': self.delivery_repo, + 'external_program_service': self.dds_external_program_service, + 'session_factory': self.session_factory, + 'token_path': token_path, + 'dds_conf': self.dds_conf, + } + + if skip_delivery: session = self.session_factory() delivery_order.delivery_status = DeliveryStatus.delivery_skipped session.commit() diff --git a/delivery/services/delivery_service.py b/delivery/services/delivery_service.py index 255f7ab..d09c968 100644 --- a/delivery/services/delivery_service.py +++ b/delivery/services/delivery_service.py @@ -18,12 +18,12 @@ def __init__(self, general_project_repo, runfolder_service, staging_service, - mover_service, + dds_service, project_links_directory, file_system_service=FileSystemService()): self.delivery_sources_repo = delivery_sources_repo self.staging_service = staging_service - self.mover_service = mover_service + self.dds_service = dds_service self.general_project_repo = general_project_repo self.runfolder_service = runfolder_service self.project_links_directory = project_links_directory @@ -131,21 +131,26 @@ def _get_projects_to_deliver(self, projects, mode, batch_nbr): def deliver_all_runfolders_for_project(self, project_name, mode): """ - This method will attempt to deliver all runfolders for the specified project. - - Since the process is somewhat involved, here's a explanation of what's going on and why. - - First, there are three modes of delivery which needs to be handled. CLEAN, which denotes - that this project is not allowed to be delivered previously. BATCH, which will deliver any - runfolders which have not previously been delivered. And finally, FORCE, which will deliver - all the runfolders regardless of their previous status. - - Two steps are then required to enable the staging, that require some explanation. - Reading the code you will note that the _get_projects_to_deliver will create a - DeliverySource and then a new DeliverySource will be created by this method. The reason - for this is that since we create a intermediate directory in which links to all the - runfolders which are to be delivered together are created. This directory is then passed - as a DeliverySource when creating a new StagingOrder (which is goes on to be staged). + This method will attempt to deliver all runfolders for the specified + project. + + Since the process is somewhat involved, here's a explanation of what's + going on and why. + + First, there are three modes of delivery which needs to be handled. + CLEAN, which denotes that this project is not allowed to be delivered + previously. BATCH, which will deliver any runfolders which have not + previously been delivered. And finally, FORCE, which will deliver all + the runfolders regardless of their previous status. + + Two steps are then required to enable the staging, that require some + explanation. Reading the code you will note that the + _get_projects_to_deliver will create a DeliverySource and then a new + DeliverySource will be created by this method. The reason for this is + that since we create a intermediate directory in which links to all the + runfolders which are to be delivered together are created. This + directory is then passed as a DeliverySource when creating a new + StagingOrder (which is goes on to be staged). :param project_name: of project to deliver :param mode: A DeliveryMode diff --git a/delivery/services/mover_service.py b/delivery/services/mover_service.py deleted file mode 100644 index 6902a5a..0000000 --- a/delivery/services/mover_service.py +++ /dev/null @@ -1,158 +0,0 @@ -import os.path -import logging -import re -from tornado import gen - -from delivery.exceptions import InvalidStatusException, CannotParseMoverOutputException -from delivery.models.db_models import StagingStatus, DeliveryStatus - -log = logging.getLogger(__name__) - - -class MoverDeliveryService(object): - - def __init__(self, external_program_service, staging_service, delivery_repo, session_factory, path_to_mover): - self.external_program_service = external_program_service - self.mover_external_program_service = self.external_program_service - self.moverinfo_external_program_service = self.external_program_service - self.staging_service = staging_service - self.delivery_repo = delivery_repo - self.session_factory = session_factory - self.path_to_mover = path_to_mover - - @staticmethod - def _parse_mover_id_from_mover_output(mover_output): - log.debug('Mover output was: {}'.format(mover_output)) - pattern = re.compile('^(.+-\w+-\d+)$') - hits = pattern.match(mover_output) - if hits: - return hits.group(1) - else: - raise CannotParseMoverOutputException("Could not parse mover id from: {}".format(mover_output)) - - @staticmethod - @gen.coroutine - def _run_mover(delivery_order_id, delivery_order_repo, external_program_service, session_factory, path_to_mover): - session = session_factory() - - # This is a somewhat hacky work-around to the problem that objects created in one - # thread, and thus associated with another session cannot be accessed by another - # thread, there fore it is re-materialized in here... - delivery_order = delivery_order_repo.get_delivery_order_by_id(delivery_order_id, session) - try: - - cmd = [os.path.join(path_to_mover, 'to_outbox'), - delivery_order.delivery_source, - delivery_order.delivery_project] - - if delivery_order.md5sum_file: - cmd += delivery_order.md5sum_file - - log.debug("Running mover with cmd: {}".format(" ".join(cmd))) - - execution = external_program_service.run(cmd) - delivery_order.delivery_status = DeliveryStatus.mover_processing_delivery - delivery_order.mover_pid = execution.pid - session.commit() - - execution_result = yield external_program_service.wait_for_execution(execution) - - if execution_result.status_code == 0: - delivery_order.delivery_status = DeliveryStatus.delivery_in_progress - delivery_order.mover_delivery_id = MoverDeliveryService.\ - _parse_mover_id_from_mover_output(execution_result.stdout) - log.info("Successfully started delivery with Mover of: {}".format(delivery_order)) - else: - delivery_order.delivery_status = DeliveryStatus.mover_failed_delivery - log.info("Failed to start Mover delivery: {}. Mover returned status code: {}". - format(delivery_order, execution_result.status_code)) - - # TODO Better exception handling here... - except Exception as e: - delivery_order.delivery_status = DeliveryStatus.delivery_failed - log.info("Failed in starting delivery: {} because this exception was logged: {}". - format(delivery_order, e)) - finally: - # Always commit the state change to the database - session.commit() - - @gen.coroutine - def deliver_by_staging_id(self, staging_id, delivery_project, md5sum_file, skip_mover=False): - - stage_order = self.staging_service.get_stage_order_by_id(staging_id) - if not stage_order or not stage_order.status == StagingStatus.staging_successful: - raise InvalidStatusException("Only deliver by staging_id if it has a successful status!" - "Staging order was: {}".format(stage_order)) - - delivery_order = self.delivery_repo.create_delivery_order(delivery_source=stage_order.get_staging_path(), - delivery_project=delivery_project, - delivery_status=DeliveryStatus.pending, - staging_order_id=staging_id, - md5sum_file=md5sum_file) - - args_for_run_mover = {'delivery_order_id': delivery_order.id, - 'delivery_order_repo': self.delivery_repo, - 'external_program_service': self.mover_external_program_service, - 'session_factory': self.session_factory, - 'path_to_mover': self.path_to_mover} - - if skip_mover: - session = self.session_factory() - delivery_order.delivery_status = DeliveryStatus.delivery_skipped - session.commit() - else: - yield MoverDeliveryService._run_mover(**args_for_run_mover) - - return delivery_order.id - - @staticmethod - def _parse_status_from_mover_info_result(mover_info_result): - #Parse status from this type of example string: - # Delivered: Jan 19 00:23:31 [1484781811UTC] - pattern = re.compile('^(\w+):\s') - hits = pattern.match(mover_info_result) - if hits: - return hits.group(1) - else: - raise CannotParseMoverOutputException("Could not parse mover info status from: {}". - format(mover_info_result)) - - @gen.coroutine - def _run_mover_info(self, mover_delivery_order_id): - - cmd = [os.path.join(self.path_to_mover, 'moverinfo'), '-i', mover_delivery_order_id] - execution_result = yield self.moverinfo_external_program_service.run_and_wait(cmd) - - if execution_result.status_code == 0: - mover_status = MoverDeliveryService._parse_status_from_mover_info_result(execution_result.stdout) - else: - raise CannotParseMoverOutputException("moverinfo returned a non-zero exit status: {}". - format(execution_result)) - return mover_status - - @gen.coroutine - def update_delivery_status(self, delivery_order_id): - """ - Check delivery status and update the delivery database accordingly - """ - delivery_order = self.get_delivery_order_by_id(delivery_order_id) - - if delivery_order.mover_delivery_id and delivery_order.delivery_status == DeliveryStatus.delivery_in_progress: - mover_info_result = yield self._run_mover_info(delivery_order.mover_delivery_id) - session = self.session_factory() - - if mover_info_result == 'Delivered': - log.info("Got successful status from Mover for delivery order: {}".format(delivery_order.id)) - delivery_order.delivery_status = DeliveryStatus.delivery_successful - else: - log.info("Got \"in progress\" status from Mover. Status was: {}".format(mover_info_result)) - - session.commit() - - return delivery_order - - def get_delivery_order_by_id(self, delivery_order_id): - return self.delivery_repo.get_delivery_order_by_id(delivery_order_id) - - def get_status_of_delivery_order(self, delivery_order_id): - return self.get_delivery_order_by_id(delivery_order_id).delivery_status diff --git a/delivery/services/staging_service.py b/delivery/services/staging_service.py index c383836..54fa63b 100644 --- a/delivery/services/staging_service.py +++ b/delivery/services/staging_service.py @@ -17,9 +17,11 @@ class StagingService(object): """ - Starting in this context means copying a directory or file to a separate directory before delivering it. - This service handles that in a asynchronous way. Copying operations (right nwo powered by rsync) can be - started, and their status monitored by querying the underlying database for their status. + Starting in this context means copying a directory or file to a separate + directory before delivering it. This service handles that in a + asynchronous way. Copying operations (right now powered by rsync) can be + started, and their status monitored by querying the underlying database for + their status. """ # TODO On initiation of a Staging service, restart any ongoing stagings diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index 8a6af82..106804e 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -87,12 +87,10 @@ def mock_delivery(cmd): project""" log.debug(f"Mock is called with {cmd}") shell = False - if any( - cmd[0].endswith(delivery_prgm) - for delivery_prgm in ['dds', 'moverinfo', 'to_outbox']): + if cmd[0].endswith('dds'): new_cmd = ['sleep', str(self.mock_duration)] - if cmd[0].endswith('dds') and 'project' in cmd: + if 'project' in cmd: new_cmd += ['&&', 'echo', f'"{dds_output}"'] new_cmd = " ".join(new_cmd) shell = True diff --git a/tests/integration_tests/test_integration.py b/tests/integration_tests/test_integration.py index d19784a..b2334cc 100644 --- a/tests/integration_tests/test_integration.py +++ b/tests/integration_tests/test_integration.py @@ -119,8 +119,8 @@ def _verify_checksum(file_path, expected_checksum): _verify_checksum(relative_file_path, sample_file.checksum) def test_cannot_stage_the_same_runfolder_twice(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) + # Note that this is a test which skips delivery (since to_outbox is not + # expected to be installed on the system where this runs) with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: @@ -141,8 +141,8 @@ def test_cannot_stage_the_same_runfolder_twice(self): self.assertEqual(response.code, 202) def test_cannot_stage_the_same_project_twice(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) + # Note that this is a test which skips delivery (since to_outbox is not + # expected to be installed on the system where this runs) with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index ee3f46a..beeeab7 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -20,13 +20,16 @@ from tests.test_utils import assert_eventually_equals, unorganised_runfolder, samplesheet_file_from_runfolder, \ project_report_files + class TestIntegrationDDS(BaseIntegration): @gen_test def test_can_stage_and_delivery_runfolder(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) + # Note that this is a test which skips delivery (since to_outbox is not + # expected to be installed on the system where this runs) - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: + with tempfile.TemporaryDirectory( + dir='./tests/resources/runfolders/', + prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: dir_name = os.path.basename(tmp_dir) self._create_projects_dir_with_random_data(tmp_dir) @@ -59,11 +62,13 @@ def test_can_stage_and_delivery_runfolder(self): delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { 'delivery_project_id': 'fakedeliveryid2016', - 'dds': True, 'token_path': 'token_path', - 'skip_mover': True, + 'skip_delivery': True, } - delivery_resp = yield self.http_client.fetch(self.get_url(delivery_url), method='POST', body=json.dumps(delivery_body)) + delivery_resp = yield self.http_client.fetch( + self.get_url(delivery_url), + method='POST', + body=json.dumps(delivery_body)) delivery_resp_as_json = json.loads(delivery_resp.body) delivery_link = delivery_resp_as_json['delivery_order_link'] @@ -74,8 +79,8 @@ def test_can_stage_and_delivery_runfolder(self): @gen_test def test_can_stage_and_delivery_project_dir(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) + # Note that this is a test which skips delivery (since to_outbox is not + # expected to be installed on the system where this runs) with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: @@ -100,7 +105,7 @@ def test_can_stage_and_delivery_project_dir(self): delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { 'delivery_project_id': 'fakedeliveryid2016', - 'skip_mover': True, + 'skip_delivery': True, 'dds': True, 'token_path': 'token_path', } @@ -295,7 +300,7 @@ def test_can_deliver_and_respond(self): 'delivery_project_id': 'fakedeliveryid2016', 'dds': True, 'token_path': 'token_path', - 'skip_mover': False, + 'skip_delivery': False, } delivery_response = self.http_client.fetch(self.get_url(delivery_url), method='POST', body=json.dumps(delivery_body)) diff --git a/tests/integration_tests/test_integration_mover.py b/tests/integration_tests/test_integration_mover.py deleted file mode 100644 index f785871..0000000 --- a/tests/integration_tests/test_integration_mover.py +++ /dev/null @@ -1,193 +0,0 @@ - - -import json -from functools import partial -import sys -import time -import tempfile - -from tornado.testing import * -from tornado.web import Application - -from arteria.web.app import AppService - -from delivery.app import routes as app_routes, compose_application -from delivery.models.db_models import StagingStatus, DeliveryStatus -from delivery.services.metadata_service import MetadataService - -from tests.integration_tests.base import BaseIntegration -from tests.test_utils import assert_eventually_equals, unorganised_runfolder, samplesheet_file_from_runfolder, \ - project_report_files - -class TestIntegrationMover(BaseIntegration): - @gen_test - def test_can_stage_and_delivery_runfolder(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) - - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: - - dir_name = os.path.basename(tmp_dir) - self._create_projects_dir_with_random_data(tmp_dir) - self._create_checksums_file(tmp_dir) - - url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) - response = yield self.http_client.fetch(self.get_url(url), method='POST', body='') - self.assertEqual(response.code, 202) - - response_json = json.loads(response.body) - - staging_status_links = response_json.get("staging_order_links") - - for project, link in staging_status_links.items(): - - self.assertEqual(project, "ABC_123") - - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) - - - # The size of the fake project is 1024 bytes - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["size"], 1024) - - staging_order_project_and_id = response_json.get("staging_order_ids") - - for project, staging_id in staging_order_project_and_id.items(): - delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) - delivery_body = {'delivery_project_id': 'fakedeliveryid2016', - 'skip_mover': True} - delivery_resp = yield self.http_client.fetch(self.get_url(delivery_url), method='POST', body=json.dumps(delivery_body)) - delivery_resp_as_json = json.loads(delivery_resp.body) - delivery_link = delivery_resp_as_json['delivery_order_link'] - - status_response = yield self.http_client.fetch(delivery_link) - self.assertEqual(json.loads(status_response.body)["status"], DeliveryStatus.delivery_skipped.name) - - @gen_test - def test_can_stage_and_delivery_project_dir(self): - # Note that this is a test which skips mover (since to_outbox is not expected to be installed on the system - # where this runs) - - with tempfile.TemporaryDirectory(dir='./tests/resources/projects') as tmp_dir: - - dir_name = os.path.basename(tmp_dir) - url = "/".join([self.API_BASE, "stage", "project", dir_name]) - response = yield self.http_client.fetch(self.get_url(url), method='POST', body='') - self.assertEqual(response.code, 202) - - response_json = json.loads(response.body) - - staging_status_links = response_json.get("staging_order_links") - - for project, link in staging_status_links.items(): - self.assertEqual(project, dir_name) - - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) - - staging_order_project_and_id = response_json.get("staging_order_ids") - - for project, staging_id in staging_order_project_and_id.items(): - delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) - delivery_body = {'delivery_project_id': 'fakedeliveryid2016', - 'skip_mover': True} - delivery_resp = yield self.http_client.fetch(self.get_url(delivery_url), method='POST', body=json.dumps(delivery_body)) - delivery_resp_as_json = json.loads(delivery_resp.body) - delivery_link = delivery_resp_as_json['delivery_order_link'] - - status_response = yield self.http_client.fetch(delivery_link) - self.assertEqual(json.loads(status_response.body)["status"], DeliveryStatus.delivery_skipped.name) - - @gen_test - def test_can_stage_and_deliver_clean_flowcells(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0555_BH37CWALXX_') as tmpdir1,\ - tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0556_BH37CWALXX_') as tmpdir2: - self._create_projects_dir_with_random_data(tmpdir1, 'XYZ_123') - self._create_projects_dir_with_random_data(tmpdir2, 'XYZ_123') - - url = "/".join([self.API_BASE, "stage", "project", 'runfolders', 'XYZ_123']) - payload = {'delivery_mode': 'CLEAN'} - response = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload)) - self.assertEqual(response.code, 202) - - payload = {'delivery_mode': 'CLEAN'} - response_failed = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload), raise_error=False) - self.assertEqual(response_failed.code, 403) - - response_json = json.loads(response.body) - - staging_status_links = response_json.get("staging_order_links") - - for project, link in staging_status_links.items(): - self.assertEqual(project, 'XYZ_123') - - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) - - @gen_test - def test_can_stage_and_deliver_batch_flowcells(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0555_BH37CWALXX_') as tmpdir1, \ - tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0556_BH37CWALXX_') as tmpdir2: - self._create_projects_dir_with_random_data(tmpdir1, 'XYZ_123') - self._create_projects_dir_with_random_data(tmpdir2, 'XYZ_123') - - url = "/".join([self.API_BASE, "stage", "project", 'runfolders', 'XYZ_123']) - payload = {'delivery_mode': 'BATCH'} - response = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload)) - self.assertEqual(response.code, 202) - - payload = {'delivery_mode': 'BATCH'} - response_failed = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload), raise_error=False) - self.assertEqual(response_failed.code, 403) - - response_json = json.loads(response.body) - - staging_status_links = response_json.get("staging_order_links") - - time.sleep(1) - - for project, link in staging_status_links.items(): - self.assertEqual(project, 'XYZ_123') - - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) - - @gen_test - def test_can_stage_and_deliver_force_flowcells(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0555_BH37CWALXX_') as tmpdir1, \ - tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', - prefix='160930_ST-E00216_0556_BH37CWALXX_') as tmpdir2: - self._create_projects_dir_with_random_data(tmpdir1, 'XYZ_123') - self._create_projects_dir_with_random_data(tmpdir2, 'XYZ_123') - - # First just stage it - url = "/".join([self.API_BASE, "stage", "project", 'runfolders', 'XYZ_123']) - payload = {'delivery_mode': 'BATCH'} - response = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload)) - self.assertEqual(response.code, 202) - - # The it should be denied (since if has already been staged) - payload = {'delivery_mode': 'BATCH'} - response_failed = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload), raise_error=False) - self.assertEqual(response_failed.code, 403) - - # Then it should work once force is specified. - payload = {'delivery_mode': 'FORCE'} - response_forced = yield self.http_client.fetch(self.get_url(url), method='POST', body=json.dumps(payload)) - self.assertEqual(response_forced.code, 202) - - response_json = json.loads(response_forced.body) - - staging_status_links = response_json.get("staging_order_links") - - for project, link in staging_status_links.items(): - self.assertEqual(project, 'XYZ_123') - - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) diff --git a/tests/unit_tests/models/test_db_models.py b/tests/unit_tests/models/test_db_models.py index ee6a5d2..42d5e32 100644 --- a/tests/unit_tests/models/test_db_models.py +++ b/tests/unit_tests/models/test_db_models.py @@ -1,22 +1,7 @@ import unittest -from delivery.models.db_models import DeliveryOrder from delivery.models.db_models import StagingOrder, StagingStatus -class TestDeliveryOrder(unittest.TestCase): - def test_is_dds(self): - mover_order = DeliveryOrder( - delivery_source="/foo/bar", - delivery_project="delivery123456", - ) - - dds_order = DeliveryOrder( - delivery_source="/foo/bar", - delivery_project="snpseq00001", - ) - - self.assertFalse(mover_order.is_dds()) - self.assertTrue(dds_order.is_dds()) class TestStagingOrder(unittest.TestCase): def test_get_staging_path(self): @@ -26,4 +11,6 @@ def test_get_staging_path(self): status=StagingStatus.pending, ) - self.assertEquals(staging_order.get_staging_path(), '/staging/target/data') + self.assertEquals( + staging_order.get_staging_path(), + '/staging/target/data') diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index f3a6697..fbb5555 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -9,7 +9,7 @@ from delivery.services.dds_service import DDSService from delivery.models.db_models import DeliveryOrder, StagingOrder, StagingStatus, DeliveryStatus, DDSProject from delivery.models.execution import ExecutionResult, Execution -from delivery.exceptions import InvalidStatusException, CannotParseMoverOutputException +from delivery.exceptions import InvalidStatusException from tests.test_utils import MockIOLoop, assert_eventually_equals @@ -39,16 +39,16 @@ def setUp(self): """ - self.mock_mover_runner = create_autospec(ExternalProgramService) + self.mock_dds_runner = create_autospec(ExternalProgramService) mock_process = MagicMock() mock_execution = Execution(pid=random.randint(1, 1000), process_obj=mock_process) - self.mock_mover_runner.run.return_value = mock_execution + self.mock_dds_runner.run.return_value = mock_execution @coroutine def wait_as_coroutine(x): return ExecutionResult(stdout=example_dds_project_ls_stdout, stderr="", status_code=0) - self.mock_mover_runner.wait_for_execution = wait_as_coroutine + self.mock_dds_runner.wait_for_execution = wait_as_coroutine self.mock_staging_service = MagicMock() @@ -80,7 +80,7 @@ def wait_as_coroutine(x): # Inject separate external runner instances for the tests, since they need to return # different information - self.dds_service.mover_external_program_service = self.mock_mover_runner + self.dds_service.dds_external_program_service = self.mock_dds_runner super(TestDDSService, self).setUp() @@ -105,7 +105,7 @@ def test_deliver_by_staging_id(self): def _get_delivery_order(): return self.delivery_order.delivery_status assert_eventually_equals(self, 1, _get_delivery_order, DeliveryStatus.delivery_successful) - self.mock_mover_runner.run.assert_called_with([ + self.mock_dds_runner.run.assert_called_with([ 'dds', '--token-path', 'token_path', '--log-file', '/foo/bar/log', @@ -151,12 +151,12 @@ def test_delivery_order_by_id(self): delivery_project='snpseq00001', delivery_status=DeliveryStatus.delivery_in_progress, staging_order_id=11, - md5sum_file='file') + ) self.mock_delivery_repo.get_delivery_order_by_id.return_value = delivery_order actual = self.dds_service.get_delivery_order_by_id(1) self.assertEqual(actual.id, 1) - def test_possible_to_delivery_by_staging_id_and_skip_mover(self): + def test_possible_to_delivery_by_staging_id_and_skip_delivery(self): staging_order = StagingOrder(source='/foo/bar', staging_target='/staging/dir/bar') staging_order.status = StagingStatus.staging_successful @@ -169,7 +169,7 @@ def test_possible_to_delivery_by_staging_id_and_skip_mover(self): delivery_project='snpseq00001', md5sum_file='md5sum_file', token_path='token_path', - skip_mover=True, + skip_delivery=True, ) def _get_delivery_order(): diff --git a/tests/unit_tests/services/test_delivery_service.py b/tests/unit_tests/services/test_delivery_service.py index 63e36aa..c30b3a4 100644 --- a/tests/unit_tests/services/test_delivery_service.py +++ b/tests/unit_tests/services/test_delivery_service.py @@ -11,7 +11,7 @@ from delivery.models.delivery_modes import DeliveryMode from delivery.services.delivery_service import DeliveryService -from delivery.services.mover_service import MoverDeliveryService +from delivery.services.dds_service import DDSService from delivery.services.staging_service import StagingService from delivery.services.runfolder_service import RunfolderService @@ -20,37 +20,46 @@ class TestDeliveryService(unittest.TestCase): - runfolder_projects = [RunfolderProject(name="ABC_123", - path="/foo/160930_ST-E00216_0112_BH37CWALXX/Projects/ABC_123", - runfolder_path="/foo/160930_ST-E00216_0112_BH37CWALXX", - runfolder_name="160930_ST-E00216_0112_BH37CWALXX"), - RunfolderProject(name="ABC_123", - path="/foo/160930_ST-E00216_0111_BH37CWALXX/Projects/ABC_123", - runfolder_path="/foo/160930_ST-E00216_0111_BH37CWALXX/", - runfolder_name="160930_ST-E00216_0111_BH37CWALXX")] + runfolder_projects = [ + RunfolderProject( + name="ABC_123", + path="/foo/160930_ST-E00216_0112_BH37CWALXX/Projects/ABC_123", + runfolder_path="/foo/160930_ST-E00216_0112_BH37CWALXX", + runfolder_name="160930_ST-E00216_0112_BH37CWALXX"), + RunfolderProject( + name="ABC_123", + path="/foo/160930_ST-E00216_0111_BH37CWALXX/Projects/ABC_123", + runfolder_path="/foo/160930_ST-E00216_0111_BH37CWALXX/", + runfolder_name="160930_ST-E00216_0111_BH37CWALXX"), + ] general_project = GeneralProject(name="ABC_123", path="/foo/bar/ABC_123") - def _compose_delivery_service(self, - mover_delivery_service=mock.create_autospec(MoverDeliveryService), - staging_service=mock.create_autospec(StagingService), - delivery_sources_repo=mock.create_autospec(DatabaseBasedDeliverySourcesRepository), - general_project_repo=mock.create_autospec(GeneralProjectRepository), - runfolder_service=mock.create_autospec(RunfolderService), - project_links_dir=mock.MagicMock()): - mover_delivery_service = mover_delivery_service + def _compose_delivery_service( + self, + dds_delivery_service=mock.create_autospec(DDSService), + staging_service=mock.create_autospec(StagingService), + delivery_sources_repo=mock.create_autospec( + DatabaseBasedDeliverySourcesRepository), + general_project_repo=mock.create_autospec( + GeneralProjectRepository), + runfolder_service=mock.create_autospec(RunfolderService), + project_links_dir=mock.MagicMock(), + ): + dds_delivery_service = dds_delivery_service self.staging_service = staging_service delivery_sources_repo = delivery_sources_repo general_project_repo = general_project_repo runfolder_service = runfolder_service self.project_links_dir = project_links_dir - self.delivery_service = DeliveryService(mover_service=mover_delivery_service, - staging_service=self.staging_service, - delivery_sources_repo=delivery_sources_repo, - general_project_repo=general_project_repo, - runfolder_service=runfolder_service, - project_links_directory=self.project_links_dir) + self.delivery_service = DeliveryService( + dds_service=dds_delivery_service, + staging_service=self.staging_service, + delivery_sources_repo=delivery_sources_repo, + general_project_repo=general_project_repo, + runfolder_service=runfolder_service, + project_links_directory=self.project_links_dir) def setUp(self): self._compose_delivery_service() @@ -61,13 +70,16 @@ def test__create_links_area_for_project_runfolders(self): self.delivery_service.project_links_directory = tmpdirname batch_nbr = 1337 - project_link_area = self.delivery_service._create_links_area_for_project_runfolders("ABC_123", - self.runfolder_projects, - batch_nbr) - - project_linking_area_base = os.path.join(self.delivery_service.project_links_directory, - "ABC_123", - str(batch_nbr)) + project_link_area = self.delivery_service\ + ._create_links_area_for_project_runfolders( + "ABC_123", + self.runfolder_projects, + batch_nbr) + + project_linking_area_base = os.path.join( + self.delivery_service.project_links_directory, + "ABC_123", + str(batch_nbr)) self.assertEqual(project_link_area, project_linking_area_base) @@ -114,25 +126,29 @@ def test_deliver_arbitrary_directory_project_force(self): staging_service_mock = mock.create_autospec(StagingService) staging_service_mock.create_new_stage_order.return_value = \ - StagingOrder(id=1, - source=self.general_project.path, - status=StagingStatus.pending, - staging_target='/foo/bar', - size=1024 - ) - - general_project_repo_mock = mock.create_autospec(GeneralProjectRepository) + StagingOrder( + id=1, + source=self.general_project.path, + status=StagingStatus.pending, + staging_target='/foo/bar', + size=1024, + ) + + general_project_repo_mock = mock.create_autospec( + GeneralProjectRepository) general_project_repo_mock.get_project.return_value = self.general_project - delivery_sources_repo_mock = mock.create_autospec(DatabaseBasedDeliverySourcesRepository) + delivery_sources_repo_mock = mock.create_autospec( + DatabaseBasedDeliverySourcesRepository) delivery_sources_repo_mock.source_exists.return_value = True - delivery_sources_repo_mock.create_source.return_value = DeliverySource(project_name="ABC_123", - source_name=self.general_project.name, - path=self.general_project.path) - - self._compose_delivery_service(general_project_repo=general_project_repo_mock, - delivery_sources_repo=delivery_sources_repo_mock, - staging_service=staging_service_mock) + delivery_sources_repo_mock.create_source.return_value = DeliverySource( + project_name="ABC_123", source_name=self.general_project.name, + path=self.general_project.path) + + self._compose_delivery_service( + general_project_repo=general_project_repo_mock, + delivery_sources_repo=delivery_sources_repo_mock, + staging_service=staging_service_mock) with self.assertRaises(ProjectAlreadyDeliveredException): self.delivery_service.deliver_arbitrary_directory_project("ABC_123", force_delivery=False) diff --git a/tests/unit_tests/services/test_mover_delivery_service.py b/tests/unit_tests/services/test_mover_delivery_service.py deleted file mode 100644 index e652ab3..0000000 --- a/tests/unit_tests/services/test_mover_delivery_service.py +++ /dev/null @@ -1,181 +0,0 @@ - -import random -from mock import MagicMock, create_autospec - -from tornado.testing import AsyncTestCase, gen_test -from tornado.gen import coroutine - -from delivery.services.external_program_service import ExternalProgramService -from delivery.services.mover_service import MoverDeliveryService -from delivery.models.db_models import DeliveryOrder, StagingOrder, StagingStatus, DeliveryStatus -from delivery.models.execution import ExecutionResult, Execution -from delivery.exceptions import InvalidStatusException, CannotParseMoverOutputException - -from tests.test_utils import MockIOLoop, assert_eventually_equals - - -class TestMoverDeliveryService(AsyncTestCase): - - def setUp(self): - - example_mover_stdout = """TestCase_31-ngi2016001-1484739218""" - - example_moverinfo_stdout = """Delivered: Jan 19 00:23:31 [1484781811UTC]""" - - self.mock_mover_runner = create_autospec(ExternalProgramService) - mock_process = MagicMock() - mock_execution = Execution(pid=random.randint(1, 1000), process_obj=mock_process) - self.mock_mover_runner.run.return_value = mock_execution - - @coroutine - def wait_as_coroutine(x): - return ExecutionResult(stdout=example_mover_stdout, stderr="", status_code=0) - - self.mock_mover_runner.wait_for_execution = wait_as_coroutine - - self.mock_moverinfo_runner = create_autospec(ExternalProgramService) - - @coroutine - def mover_info_wait_as_coroutine(x): - return ExecutionResult(stdout=example_moverinfo_stdout, stderr="", status_code=0) - - self.mock_moverinfo_runner.run_and_wait = MagicMock(wraps=mover_info_wait_as_coroutine) - - self.mock_staging_service = MagicMock() - self.mock_delivery_repo = MagicMock() - - self.delivery_order = DeliveryOrder(id=1, delivery_source="/foo", delivery_project="TestProj") - - self.mock_delivery_repo.create_delivery_order.return_value = self.delivery_order - self.mock_delivery_repo.get_delivery_order_by_id.return_value = self.delivery_order - - self.mock_session_factory = MagicMock() - self.mock_path_to_mover = "/foo/bar/" - self.mover_delivery_service = MoverDeliveryService(external_program_service=None, - staging_service=self.mock_staging_service, - delivery_repo=self.mock_delivery_repo, - session_factory=self.mock_session_factory, - path_to_mover=self.mock_path_to_mover) - - # Inject separate external runner instances for the tests, since they need to return - # different information - self.mover_delivery_service.mover_external_program_service = self.mock_mover_runner - self.mover_delivery_service.moverinfo_external_program_service = self.mock_moverinfo_runner - - super(TestMoverDeliveryService, self).setUp() - - @gen_test - def test_deliver_by_staging_id(self): - staging_order = StagingOrder(source='/foo/bar', staging_target='/staging/dir/bar') - staging_order.status = StagingStatus.staging_successful - self.mock_staging_service.get_stage_order_by_id.return_value = staging_order - - self.mock_staging_service.get_delivery_order_by_id.return_value = self.delivery_order - - res = yield self.mover_delivery_service.deliver_by_staging_id(staging_id=1, - delivery_project='xyz123', - md5sum_file='md5sum_file') - - def _get_delivery_order(): - return self.delivery_order.delivery_status - assert_eventually_equals(self, 1, _get_delivery_order, DeliveryStatus.delivery_in_progress) - self.mock_mover_runner.run.assert_called_once_with(['/foo/bar/to_outbox', '/foo', 'TestProj']) - - @gen_test - def test_update_delivery_status(self): - delivery_order = DeliveryOrder(mover_delivery_id="TestCase_31-ngi2016001-1484739218 ", - delivery_status=DeliveryStatus.delivery_in_progress) - self.mock_delivery_repo.get_delivery_order_by_id.return_value = delivery_order - result = yield self.mover_delivery_service.update_delivery_status(self.delivery_order.id) - self.assertEqual(result.delivery_status, DeliveryStatus.delivery_successful) - - self.mock_moverinfo_runner.run_and_wait.assert_called_once_with(['/foo/bar/moverinfo', '-i', 'TestCase_31-ngi2016001-1484739218 ']) - - @gen_test - def test_deliver_by_staging_id_raises_on_non_existent_stage_id(self): - self.mock_staging_service.get_stage_order_by_id.return_value = None - - with self.assertRaises(InvalidStatusException): - - yield self.mover_delivery_service.deliver_by_staging_id(staging_id=1, - delivery_project='foo', - md5sum_file='md5sum_file') - - @gen_test - def test_deliver_by_staging_id_raises_on_non_successful_stage_id(self): - - staging_order = StagingOrder() - staging_order.status = StagingStatus.staging_failed - self.mock_staging_service.get_stage_order_by_id.return_value = staging_order - - with self.assertRaises(InvalidStatusException): - - yield self.mover_delivery_service.deliver_by_staging_id(staging_id=1, - delivery_project='foo', - md5sum_file='md5sum_file') - - def test_get_status_of_delivery_order(self): - delivery_order = DeliveryOrder(id=1, - delivery_source='src', - delivery_project='xyz123', - delivery_status=DeliveryStatus.mover_processing_delivery, - staging_order_id=11, - md5sum_file='file') - self.mock_delivery_repo.get_delivery_order_by_id.return_value = delivery_order - actual = self.mover_delivery_service.get_status_of_delivery_order(1) - self.assertEqual(actual, DeliveryStatus.mover_processing_delivery) - - def test_delivery_order_by_id(self): - delivery_order = DeliveryOrder(id=1, - delivery_source='src', - delivery_project='xyz123', - delivery_status=DeliveryStatus.mover_processing_delivery, - staging_order_id=11, - md5sum_file='file') - self.mock_delivery_repo.get_delivery_order_by_id.return_value = delivery_order - actual = self.mover_delivery_service.get_delivery_order_by_id(1) - self.assertEqual(actual.id, 1) - - def test_possible_to_delivery_by_staging_id_and_skip_mover(self): - - staging_order = StagingOrder(source='/foo/bar', staging_target='/staging/dir/bar') - staging_order.status = StagingStatus.staging_successful - self.mock_staging_service.get_stage_order_by_id.return_value = staging_order - - self.mock_staging_service.get_delivery_order_by_id.return_value = self.delivery_order - - self.mover_delivery_service.deliver_by_staging_id(staging_id=1, - delivery_project='xyz123', - md5sum_file='md5sum_file', - skip_mover=True) - - def _get_delivery_order(): - return self.delivery_order.delivery_status - assert_eventually_equals(self, 1, _get_delivery_order, DeliveryStatus.delivery_skipped) - - def test__parse_mover_id_from_mover_output(self): - example_mover_output = """TestCase_31-ngi2016001-1484739218""" - - actual = self.mover_delivery_service._parse_mover_id_from_mover_output(example_mover_output) - self.assertEqual(actual, "TestCase_31-ngi2016001-1484739218") - - def test__parse_mover_id_from_mover_output_with_dash(self): - example_mover_output = """TestCase-31-ngi2016001-1484739218""" - - actual = self.mover_delivery_service._parse_mover_id_from_mover_output(example_mover_output) - self.assertEqual(actual, "TestCase-31-ngi2016001-1484739218") - - def test__parse_mover_id_from_mover_output_raises_on_invalid_output(self): - example_mover_output = """Found receiver delivery00001 with end date: 2017-03-11 - TestCase_31 queued for delivery to delivery00001, id = TestCase_31-ngi2016001-1484739218""" - with self.assertRaises(CannotParseMoverOutputException): - self.mover_delivery_service._parse_mover_id_from_mover_output(example_mover_output) - - def test__parse_status_from_mover_info_result(self): - example_moverinfo_stdout = """Delivered: Jan 19 00:23:31 [1484781811UTC]""" - actual = self.mover_delivery_service._parse_status_from_mover_info_result(example_moverinfo_stdout) - self.assertEqual(actual, "Delivered") - - def test__parse_status_from_mover_info_result_raises_on_invalid_output(self): - with self.assertRaises(CannotParseMoverOutputException): - self.mover_delivery_service._parse_status_from_mover_info_result("Invalid input...") From c195a40ebe7fb055a2da27dff2c989010e224f02 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 3 Jun 2022 14:27:48 +0200 Subject: [PATCH 02/11] Use token_path directly instead of through extra_args --- delivery/handlers/delivery_handlers.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 9e53bc4..a53977a 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -31,10 +31,6 @@ def post(self, staging_id): token_path = request_data.get("token_path") md5sum_file = request_data.get("md5sums_file") - extra_args = {} - if token_path: - extra_args['token_path'] = token_path - # This should only be used for testing purposes /JD 20170202 skip_delivery_request = request_data.get("skip_delivery") if skip_delivery_request and skip_delivery_request == True: @@ -49,7 +45,8 @@ def post(self, staging_id): delivery_project=delivery_project_id, md5sum_file=md5sum_file, skip_delivery=skip_delivery, - **extra_args) + token_path=token_path, + ) status_end_point = "{0}://{1}{2}".format( self.request.protocol, From 260b9f821741b1d586f26e9d07df74837754a030 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 10 Jun 2022 15:17:46 +0200 Subject: [PATCH 03/11] Improve DDS token handling --- delivery/handlers/dds_handlers.py | 11 +---- delivery/handlers/delivery_handlers.py | 11 +---- delivery/services/dds_service.py | 59 ++++++++++++++++++++++++-- tests/unit_tests/services/test_dds.py | 36 +++++++++++++++- 4 files changed, 94 insertions(+), 23 deletions(-) diff --git a/delivery/handlers/dds_handlers.py b/delivery/handlers/dds_handlers.py index ed5e946..e1191cb 100644 --- a/delivery/handlers/dds_handlers.py +++ b/delivery/handlers/dds_handlers.py @@ -2,6 +2,7 @@ from delivery.handlers import * from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler +from delivery.services.dds_service import DDSToken import os import tempfile @@ -50,15 +51,7 @@ async def post(self, project_name): project_metadata = self.body_as_object( required_members=required_members) - with tempfile.NamedTemporaryFile(mode='w', delete=True) as token_file: - if os.path.exists(project_metadata["auth_token"]): - token_path = project_metadata["auth_token"] - else: - token_file.write(project_metadata["auth_token"]) - token_file.flush() - - token_path = token_file.name - + with DDSToken(project_metadata["auth_token"]) as token_path: dds_project_id = await self.dds_service.create_dds_project( project_name, project_metadata, diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 2683c1b..95d2c07 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -8,6 +8,7 @@ from delivery.handlers import * from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler +from delivery.services.dds_service import DDSToken log = logging.getLogger(__name__) @@ -42,15 +43,7 @@ def post(self, staging_id): log.debug("Will not skip running delivery!") skip_delivery = False - with tempfile.NamedTemporaryFile(mode='w', delete=True) as token_file: - if os.path.exists(auth_token): - token_path = auth_token - else: - token_file.write(auth_token) - token_file.flush() - - token_path = token_file.name - + with DDSToken(auth_token) as token_path: delivery_id = yield self.delivery_service.deliver_by_staging_id( staging_id=staging_id, delivery_project=delivery_project_id, diff --git a/delivery/services/dds_service.py b/delivery/services/dds_service.py index a5aaaec..8972fb0 100644 --- a/delivery/services/dds_service.py +++ b/delivery/services/dds_service.py @@ -1,3 +1,4 @@ +import tempfile import os.path import shutil import logging @@ -11,6 +12,54 @@ log = logging.getLogger(__name__) +class DDSToken: + """ + A wrapper to handle DDS tokens either from the string itself or from an + existing token file. + + Raises + ------ + FileNotFoundError + If the token is expired, DDS will delete it when attempting to create + a project or deliver data. + """ + def __init__(self, auth_token): + """ + Parameters + ---------- + auth_token : str + can be either the token string or a path to the token file + """ + self.auth_token = auth_token + + def __enter__(self): + if os.path.exists(self.auth_token): + self.token_path = self.auth_token + else: + self.temporary_token = tempfile.NamedTemporaryFile( + mode='w', delete=True) + self.temporary_token.write(self.auth_token) + self.temporary_token.flush() + + self.token_path = self.temporary_token.name + + return self.token_path + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.temporary_token.close() + + except AttributeError: + # No temporary file was created, nothing to do here + pass + + except FileNotFoundError: + log.error( + "Token was deleted during delivery (probably by DDS)." + " Check token expiry date (`dds auth info`).") + raise + + class DDSService(object): def __init__( @@ -39,7 +88,8 @@ def _parse_dds_project_id(dds_output): if hits: return hits.group(1) else: - raise CannotParseDDSOutputException(f"Could not parse DDS project ID from: {dds_output}") + raise CannotParseDDSOutputException( + f"Could not parse DDS project ID from: {dds_output}") async def create_dds_project( self, @@ -88,9 +138,12 @@ async def create_dds_project( execution_result = await self.external_program_service.run_and_wait(cmd) if execution_result.status_code == 0: - dds_project_id = DDSService._parse_dds_project_id(execution_result.stdout) + dds_project_id = DDSService._parse_dds_project_id( + execution_result.stdout) else: - error_msg = f"Failed to create project in DDS: {execution_result.stderr}. DDS returned status code: {execution_result.status_code}" + error_msg = ( + f"Failed to create project in DDS: {execution_result.stderr}." + " DDS returned status code: {execution_result.status_code}") log.error(error_msg) raise RuntimeError(error_msg) diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index 5d9f45b..d504304 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -1,12 +1,14 @@ - +import os import random +import tempfile +import unittest from mock import MagicMock, AsyncMock, create_autospec, patch from tornado.testing import AsyncTestCase, gen_test from tornado.gen import coroutine from delivery.services.external_program_service import ExternalProgramService -from delivery.services.dds_service import DDSService +from delivery.services.dds_service import DDSToken, DDSService from delivery.models.db_models import DeliveryOrder, StagingOrder, StagingStatus, DeliveryStatus, DDSProject from delivery.models.execution import ExecutionResult, Execution from delivery.exceptions import InvalidStatusException @@ -14,6 +16,36 @@ from tests.test_utils import MockIOLoop, assert_eventually_equals +class TestDDSToken(unittest.TestCase): + # TODO test can handle existing file + # TODO raise error if file is deleted + def test_can_write_token(self): + auth_token = "1234" + with DDSToken(auth_token) as token_path: + with open(token_path, mode='r') as token_file: + self.assertEqual(auth_token, token_file.read()) + + self.assertFalse(os.path.exists(token_path)) + + def test_can_handle_existing_token(self): + auth_token = "1234" + with tempfile.NamedTemporaryFile(mode='w', delete=True) as token_file: + token_file.write(auth_token) + token_file.flush() + + with DDSToken(token_file.name) as token_path: + self.assertEqual(token_file.name, token_path) + + with open(token_path, mode='r') as dds_token: + self.assertEqual(auth_token, dds_token.read()) + + def test_raise_error_if_file_is_deleted(self): + auth_token = "1234" + with self.assertRaises(FileNotFoundError): + with DDSToken(auth_token) as token_path: + os.remove(token_path) + + class TestDDSService(AsyncTestCase): def setUp(self): From 6df33d1e898eb8f97d36ab25e98b67b09bcfc830 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 15 Jun 2022 10:27:47 +0200 Subject: [PATCH 04/11] Remove mover leftovers --- delivery/handlers/delivery_handlers.py | 2 -- delivery/models/db_models.py | 3 --- delivery/services/dds_service.py | 1 - tests/unit_tests/services/test_dds.py | 5 +---- 4 files changed, 1 insertion(+), 10 deletions(-) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index 95d2c07..eabc4dd 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -32,7 +32,6 @@ def post(self, staging_id): delivery_project_id = request_data["delivery_project_id"] auth_token = request_data.get("auth_token") - md5sum_file = request_data.get("md5sums_file") # This should only be used for testing purposes /JD 20170202 skip_delivery_request = request_data.get("skip_delivery") @@ -47,7 +46,6 @@ def post(self, staging_id): delivery_id = yield self.delivery_service.deliver_by_staging_id( staging_id=staging_id, delivery_project=delivery_project_id, - md5sum_file=md5sum_file, skip_delivery=skip_delivery, token_path=token_path ) diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index 58555f6..2d94f9d 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -154,6 +154,3 @@ def __repr__(self): f"status: {self.delivery_status}, " " }" ) - - def is_dds(self): - return self.delivery_project.startswith("snpseq") diff --git a/delivery/services/dds_service.py b/delivery/services/dds_service.py index 8972fb0..a501fa7 100644 --- a/delivery/services/dds_service.py +++ b/delivery/services/dds_service.py @@ -223,7 +223,6 @@ def deliver_by_staging_id( self, staging_id, delivery_project, - md5sum_file, token_path, skip_delivery=False): diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index d504304..aeb83f4 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -131,7 +131,7 @@ def test_deliver_by_staging_id(self): staging_id=1, delivery_project='snpseq00001', token_path='token_path', - md5sum_file='md5sum_file') + ) mock_rmtree.assert_called_once_with(staging_target) def _get_delivery_order(): @@ -158,7 +158,6 @@ def test_deliver_by_staging_id_raises_on_non_existent_stage_id(self): yield self.dds_service.deliver_by_staging_id( staging_id=1, delivery_project='snpseq00001', - md5sum_file='md5sum_file', token_path='token_path', ) @@ -174,7 +173,6 @@ def test_deliver_by_staging_id_raises_on_non_successful_stage_id(self): yield self.dds_service.deliver_by_staging_id( staging_id=1, delivery_project='snpseq00001', - md5sum_file='md5sum_file', token_path='token_path', ) @@ -200,7 +198,6 @@ def test_possible_to_delivery_by_staging_id_and_skip_delivery(self): self.dds_service.deliver_by_staging_id( staging_id=1, delivery_project='snpseq00001', - md5sum_file='md5sum_file', token_path='token_path', skip_delivery=True, ) From 5517e078e9e63873c2c7fab8aa92b6929257a832 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 17 Jun 2022 15:48:20 +0200 Subject: [PATCH 05/11] Implement new DDSProject class --- .../74b309c44134_drop_ddsproject_table.py | 33 ++ .../85082f64ccd0_add_dds_project_table.py | 7 +- .../8a4cc1553379_decommission_mover.py | 6 - delivery/app.py | 4 +- delivery/handlers/dds_handlers.py | 15 +- delivery/handlers/delivery_handlers.py | 27 +- delivery/models/db_models.py | 18 +- delivery/models/project.py | 378 +++++++++++++++++ .../repositories/deliveries_repository.py | 22 +- delivery/repositories/project_repository.py | 28 -- delivery/services/dds_service.py | 240 ----------- requirements/prod | 2 +- tests/integration_tests/base.py | 28 +- tests/integration_tests/test_integration.py | 10 +- .../integration_tests/test_integration_dds.py | 121 ++++-- tests/unit_tests/models/test_db_models.py | 2 +- .../repositories/test_delivery_repository.py | 11 +- .../repositories/test_project_repository.py | 43 +- tests/unit_tests/services/test_dds.py | 395 ++++++++++++------ 19 files changed, 827 insertions(+), 563 deletions(-) create mode 100644 alembic/versions/74b309c44134_drop_ddsproject_table.py diff --git a/alembic/versions/74b309c44134_drop_ddsproject_table.py b/alembic/versions/74b309c44134_drop_ddsproject_table.py new file mode 100644 index 0000000..433e747 --- /dev/null +++ b/alembic/versions/74b309c44134_drop_ddsproject_table.py @@ -0,0 +1,33 @@ +"""Drop DDSProject table + +Revision ID: 74b309c44134 +Revises: 8a4cc1553379 +Create Date: 2022-06-16 15:37:24.124003 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '74b309c44134' +down_revision = '8a4cc1553379' +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_table('dds_projects') + op.add_column( + 'delivery_orders', + sa.Column('ngi_project_name', sa.String(), nullable=True)) + + +def downgrade(): + op.drop_column('delivery_orders', 'ngi_project_name') + op.create_table( + 'dds_projects', + sa.Column('dds_project_id', sa.String(), nullable=False), + sa.Column('project_name', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('dds_project_id') + ) diff --git a/alembic/versions/85082f64ccd0_add_dds_project_table.py b/alembic/versions/85082f64ccd0_add_dds_project_table.py index cd8413c..03d4912 100644 --- a/alembic/versions/85082f64ccd0_add_dds_project_table.py +++ b/alembic/versions/85082f64ccd0_add_dds_project_table.py @@ -15,9 +15,12 @@ branch_labels = None depends_on = None + def upgrade(): - op.create_table('dds_projects', - sa.Column('dds_project_id', sa.String, nullable=False, primary_key=True), + op.create_table( + 'dds_projects', + sa.Column( + 'dds_project_id', sa.String, nullable=False, primary_key=True), sa.Column('project_name', sa.String), ) diff --git a/alembic/versions/8a4cc1553379_decommission_mover.py b/alembic/versions/8a4cc1553379_decommission_mover.py index 4f564d9..9d12d72 100644 --- a/alembic/versions/8a4cc1553379_decommission_mover.py +++ b/alembic/versions/8a4cc1553379_decommission_mover.py @@ -17,20 +17,14 @@ def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - #with op.batch_alter_table('delivery_orders', schema=None) as batch_op: - #op.alter_column('delivery_orders', 'mover_pid', new_column_name='dds_pid') with op.batch_alter_table('delivery_orders') as batch_op: batch_op.drop_column('mover_delivery_id') batch_op.drop_column('md5sum_file') batch_op.alter_column('mover_pid', new_column_name='dds_pid') - # ### end Alembic commands ### def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('delivery_orders') as batch_op: batch_op.alter_column('dds_pid', new_column_name='mover_pid') op.add_column('delivery_orders', sa.Column('md5sum_file', sa.String(), nullable=True)) op.add_column('delivery_orders', sa.Column('mover_delivery_id', sa.String(), nullable=True)) - # ### end Alembic commands ### diff --git a/delivery/app.py b/delivery/app.py index 12c75af..6b4aa7a 100644 --- a/delivery/app.py +++ b/delivery/app.py @@ -25,7 +25,7 @@ FileSystemBasedUnorganisedRunfolderRepository from delivery.repositories.staging_repository import DatabaseBasedStagingRepository from delivery.repositories.deliveries_repository import DatabaseBasedDeliveriesRepository -from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository, DDSProjectRepository +from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository from delivery.repositories.delivery_sources_repository import DatabaseBasedDeliverySourcesRepository from delivery.repositories.sample_repository import RunfolderProjectBasedSampleRepository @@ -161,13 +161,11 @@ def _assert_is_dir(directory): session_factory=session_factory) dds_conf = config['dds_conf'] - dds_project_repo = DDSProjectRepository(session_factory=session_factory) dds_service = DDSService( external_program_service=external_program_service, staging_service=staging_service, staging_dir=staging_dir, delivery_repo=delivery_repo, - dds_project_repo=dds_project_repo, session_factory=session_factory, dds_conf=dds_conf) diff --git a/delivery/handlers/dds_handlers.py b/delivery/handlers/dds_handlers.py index e1191cb..4856b94 100644 --- a/delivery/handlers/dds_handlers.py +++ b/delivery/handlers/dds_handlers.py @@ -2,7 +2,7 @@ from delivery.handlers import * from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler -from delivery.services.dds_service import DDSToken +from delivery.models.project import DDSProject import os import tempfile @@ -51,12 +51,11 @@ async def post(self, project_name): project_metadata = self.body_as_object( required_members=required_members) - with DDSToken(project_metadata["auth_token"]) as token_path: - dds_project_id = await self.dds_service.create_dds_project( - project_name, - project_metadata, - token_path, - ) + dds_project = await DDSProject.new( + project_name, + project_metadata, + project_metadata["auth_token"], + self.dds_service) self.set_status(ACCEPTED) - self.write_json({'dds_project_id': dds_project_id}) + self.write_json({'dds_project_id': dds_project.project_id}) diff --git a/delivery/handlers/delivery_handlers.py b/delivery/handlers/delivery_handlers.py index eabc4dd..1bc9614 100644 --- a/delivery/handlers/delivery_handlers.py +++ b/delivery/handlers/delivery_handlers.py @@ -8,7 +8,7 @@ from delivery.handlers import * from delivery.handlers.utility_handlers import ArteriaDeliveryBaseHandler -from delivery.services.dds_service import DDSToken +from delivery.models.project import DDSProject log = logging.getLogger(__name__) @@ -26,12 +26,15 @@ def initialize(self, **kwargs): def post(self, staging_id): required_members = [ "delivery_project_id", - "auth_token" + "ngi_project_name", + "auth_token", ] request_data = self.body_as_object(required_members=required_members) delivery_project_id = request_data["delivery_project_id"] - auth_token = request_data.get("auth_token") + auth_token = request_data["auth_token"] + deadline = request_data.get("deadline") + release = request_data.get("release", True) # This should only be used for testing purposes /JD 20170202 skip_delivery_request = request_data.get("skip_delivery") @@ -42,13 +45,17 @@ def post(self, staging_id): log.debug("Will not skip running delivery!") skip_delivery = False - with DDSToken(auth_token) as token_path: - delivery_id = yield self.delivery_service.deliver_by_staging_id( - staging_id=staging_id, - delivery_project=delivery_project_id, - skip_delivery=skip_delivery, - token_path=token_path - ) + dds_project = DDSProject( + self.delivery_service, + auth_token, + delivery_project_id) + + delivery_id = yield dds_project.put( + staging_id, + skip_delivery=skip_delivery, + deadline=deadline, + release=release, + ) status_end_point = "{0}://{1}{2}".format( self.request.protocol, diff --git a/delivery/models/db_models.py b/delivery/models/db_models.py index 2d94f9d..52577c0 100644 --- a/delivery/models/db_models.py +++ b/delivery/models/db_models.py @@ -39,23 +39,6 @@ def __repr__(self): self.batch) -class DDSProject(SQLAlchemyBase): - """ - Keeps track of project names and project IDs in DDS - """ - __tablename__ = 'dds_projects' - dds_project_id = Column(String, nullable=False, primary_key=True) - project_name = Column(String) - - def __repr__(self): - return ( - "DDS Project: { " - f"dds_project_id: {self.dds_project_id}, " - f"project_name: {self.project_name} " - "}" - ) - - class StagingStatus(base_enum.Enum): """ Enumerate possible staging statuses @@ -134,6 +117,7 @@ class DeliveryOrder(SQLAlchemyBase): id = Column(Integer, primary_key=True, autoincrement=True) delivery_source = Column(String, nullable=False) delivery_project = Column(String, nullable=False) + ngi_project_name = Column(String, nullable=True) # Process id of Mover process used to start the delivery dds_pid = Column(Integer) diff --git a/delivery/models/project.py b/delivery/models/project.py index b61c34e..3fcd171 100644 --- a/delivery/models/project.py +++ b/delivery/models/project.py @@ -1,6 +1,17 @@ import os +import re +import json +import shutil +import tempfile +import logging +from tornado import gen from delivery.models import BaseModel +from delivery.exceptions import CannotParseDDSOutputException, \ + InvalidStatusException, ProjectNotFoundException +from delivery.models.db_models import StagingStatus, DeliveryStatus + +log = logging.getLogger(__name__) class BaseProject(BaseModel): @@ -79,3 +90,370 @@ def __init__(self, name, path): """ self.name = name self.path = os.path.abspath(path) + + +class DDSProject: + """ + Model representing a project in DDS. + + Attributes + ---------- + project_id: str + id of the project in DDS + dds_service: DDSService + arteria-delivery config instance + """ + + def __init__( + self, + dds_service, + auth_token, + dds_project_id, + ): + """ + Parameters + ---------- + dds_service: DDSService + servince handling config and relations with other instances in + arteria-delivery (e.g. staging_service,...) + auth_token: str + either DDS token string or path to the token file + dds_project_id: str + project id in DDS + """ + + if os.path.exists(auth_token): + token_path = auth_token + else: + self.temporary_token = tempfile.NamedTemporaryFile( + mode='w', delete=True) + self.temporary_token.write(auth_token) + self.temporary_token.flush() + + token_path = self.temporary_token.name + + self.dds_service = dds_service + self.project_id = dds_project_id + + self._base_cmd = [ + 'dds', + '--token-path', token_path, + '--log-file', dds_service.dds_conf["log_path"], + '--no-prompt', + ] + + def __del__(self): + try: + self.temporary_token.close() + except AttributeError: + # No temporary file was created, nothing to do here + pass + except FileNotFoundError: + log.error( + "Token was deleted during delivery (probably by DDS)." + " Check token expiry date (`dds auth info`).") + raise + + @classmethod + @gen.coroutine + def new( + cls, + ngi_project_name, + project_metadata, + auth_token, + dds_service, + ): + """ + Create a new project in DDS. + + Parameters + ---------- + ngi_project_name: str + NGI project name (e.g. AB-1234). + project_metadata: dict + Project metadata to be sent to DDS, must contain fields + "description" (str) and "pi" (str). Can also include fields + "owners" (list(str)), "researchers" (list(str)) and "non-sensitive" + (bool). + auth_token: str + either DDS token string or path to the token file + dds_project_id: str + project id in DDS + + Returns + ------- + DDSProject + A DDSProject instance + """ + self = cls( + dds_service=dds_service, + auth_token=auth_token, + dds_project_id=None, + ) + + cmd = self._base_cmd[:] + + cmd += [ + 'project', 'create', + '--title', ngi_project_name.replace('-', ''), + '--description', '"{}"'.format(project_metadata['description']), + '-pi', project_metadata['pi'] + ] + + cmd += [ + args + for owner in project_metadata.get('owners', []) + for args in ['--owner', owner] + ] + + cmd += [ + args + for researcher in project_metadata.get('researchers', []) + for args in ['--researcher', researcher] + ] + + if project_metadata.get('non-sensitive', False): + cmd += ['--non-sensitive'] + + stdout = yield self._run(cmd) + self.project_id = cls._parse_dds_project_id(stdout) + + self._ngi_project_name = ngi_project_name + + return self + + @gen.coroutine + def get_ngi_project_name(self): + """ + NGI project name (e.g. AB-1234). + + If the attribute is not set, it will fetched from DDS. + """ + try: + return self._ngi_project_name + except AttributeError: + cmd = self._base_cmd[:] + cmd += [ + 'ls', + '--json', + ] + + dds_output = yield self._run(cmd) + try: + dds_project_title = next( + project["Title"] + for project in json.loads(dds_output) + if project["Project ID"] == self.project_id + ) + + self._ngi_project_name = re.sub( + r"(\D{2})(\d{4})", + r"\1-\2", + dds_project_title) + except StopIteration: + err_msg = "Project {self.project_id} not found in DDS." + log.error(err_msg) + raise ProjectNotFoundException(err_msg) + + return self._ngi_project_name + + @gen.coroutine + def put( + self, + staging_id, + skip_delivery=False, + deadline=None, + release=True, + ): + """ + Upload staged data to DDS + + Parameters + ---------- + staging_id: int + id of the staging order to deliver + skip_delivery: bool + whether or not to skip the delivery step and only create a + DeliveryOrder (for testing purposes only). + deadline: int + project deadline in days. + release: bool + whether or not to release the project on DDS + + Returns + ------- + int + Delivery order id, can be used to retrieve delivery status. + """ + staging_order = self.dds_service.staging_service \ + .get_stage_order_by_id(staging_id) + if not staging_order or \ + not staging_order.status == StagingStatus.staging_successful: + raise InvalidStatusException( + "Only deliver by staging_id if it has a successful status!" + "Staging order was: {}".format(staging_order)) + + ngi_project_name = yield self.get_ngi_project_name() + + delivery_order = self.dds_service.delivery_repo.create_delivery_order( + delivery_source=staging_order.get_staging_path(), + delivery_project=self.project_id, + ngi_project_name=ngi_project_name, + delivery_status=DeliveryStatus.pending, + staging_order_id=staging_id, + ) + + cmd = self._base_cmd[:] + + cmd += [ + 'data', 'put', + '--mount-dir', self.dds_service.staging_dir, + '--source', delivery_order.delivery_source, + '--project', delivery_order.delivery_project, + '--silent', + ] + + if skip_delivery: + session = self.dds_service.session_factory() + delivery_order.delivery_status = DeliveryStatus.delivery_skipped + session.commit() + else: + self._run_delivery( + cmd, + delivery_order, + staging_order, + deadline=deadline, + release=release) + + return delivery_order.id + + @gen.coroutine + def release(self, deadline=None): + """ + Release the project in DDS + + Parameters + ---------- + deadline: int + project deadline in days. + """ + cmd = self._base_cmd[:] + + cmd += [ + 'project', 'status', 'release', + '--project', self.project_id, + '--no-mail', + ] + + if deadline: + cmd += [ + '--deadline', str(deadline), + ] + yield self._run(cmd) + + @gen.coroutine + def _run(self, cmd): + """ + Run a dds command and wait for result. + + Parameters + ---------- + cmd: str + shell command to run. + """ + log.debug(f"Running dds with command: {' '.join(cmd)}") + execution = self.dds_service.external_program_service.run(cmd) + execution_result = yield self.dds_service.external_program_service \ + .wait_for_execution(execution) + + if execution_result.status_code != 0: + error_msg = ( + f"Failed to run DDS command: {execution_result.stderr}." + f" DDS returned status code: {execution_result.status_code}") + log.error(error_msg) + raise RuntimeError(error_msg) + + return execution_result.stdout + + @gen.coroutine + def _run_delivery( + self, + cmd, + delivery_order, + staging_order, + deadline=None, + release=True, + ): + """ + Start a delivery and release the project in DDS + + Parameters + ---------- + cmd: str + dds command to run to start the delivery. + delivery_order: DeliveryOrder + Delivery Order associated to the delivery + staging_order: StagingOrder + Staging Order to deliver + deadline: int + project deadline in days. + release: bool + whether or not to release the project on DDS + """ + session = self.dds_service.session_factory() + try: + log.debug(f"Delivering {delivery_order}...") + log.debug("Running dds with cmd: {}".format(" ".join(cmd))) + + execution = self.dds_service.dds_external_program_service.run(cmd) + + delivery_order.delivery_status = DeliveryStatus.delivery_in_progress + delivery_order.dds_pid = execution.pid + session.commit() + + execution_result = yield self.dds_service \ + .dds_external_program_service \ + .wait_for_execution(execution) + + if execution_result.status_code == 0: + log.info(f"Removing staged runfolder at {staging_order.staging_target}") + shutil.rmtree(staging_order.staging_target) + + if release: + # OBS: in the future we might want to do this through a + # specific endpoint, e.g. if we want to do several + # deliveries before releasing a project /AC 2022-06-23 + log.info(f"Releasing project {self.project_id}") + yield self.release(deadline=deadline) + + delivery_order.delivery_status = DeliveryStatus.delivery_successful + log.info(f"Successfully delivered: {delivery_order}") + else: + delivery_order.delivery_status = DeliveryStatus.delivery_failed + error_msg = \ + f"Failed to deliver: {delivery_order}." \ + f"DDS returned status code: {execution_result.status_code}" + log.error(error_msg) + raise RuntimeError(error_msg) + + except Exception as e: + delivery_order.delivery_status = DeliveryStatus.delivery_failed + raise e + finally: + session.commit() + + @staticmethod + def _parse_dds_project_id(dds_output): + """ + Parse dds project id from the output of "dds project create". + """ + log.debug('DDS output was: {}'.format(dds_output)) + pattern = re.compile(r'Project created with id: (snpseq\d+)') + hits = pattern.search(dds_output) + if hits: + return hits.group(1) + else: + raise CannotParseDDSOutputException( + f"Could not parse DDS project ID from: {dds_output}") + + diff --git a/delivery/repositories/deliveries_repository.py b/delivery/repositories/deliveries_repository.py index 4cba258..9722978 100644 --- a/delivery/repositories/deliveries_repository.py +++ b/delivery/repositories/deliveries_repository.py @@ -52,11 +52,13 @@ def get_delivery_orders(self): """ return self.session.query(DeliveryOrder).all() - def create_delivery_order(self, - delivery_source, - delivery_project, - delivery_status, - staging_order_id, + def create_delivery_order( + self, + delivery_source, + delivery_project, + ngi_project_name, + delivery_status, + staging_order_id, ): """ Create a new delivery order and commit it to the database @@ -68,10 +70,12 @@ def create_delivery_order(self, point there is no validation that the value is valid! :return: the created delivery order """ - order = DeliveryOrder(delivery_source=delivery_source, - delivery_project=delivery_project, - delivery_status=delivery_status, - staging_order_id=staging_order_id, + order = DeliveryOrder( + delivery_source=delivery_source, + delivery_project=delivery_project, + ngi_project_name=ngi_project_name, + delivery_status=delivery_status, + staging_order_id=staging_order_id, ) self.session.add(order) self.session.commit() diff --git a/delivery/repositories/project_repository.py b/delivery/repositories/project_repository.py index 4421acf..1ac7af9 100644 --- a/delivery/repositories/project_repository.py +++ b/delivery/repositories/project_repository.py @@ -5,7 +5,6 @@ from delivery.services.file_system_service import FileSystemService from delivery.services.metadata_service import MetadataService from delivery.models.project import GeneralProject, RunfolderProject -from delivery.models.db_models import DDSProject from delivery.models.runfolder import RunfolderFile from delivery.exceptions import TooManyProjectsFound, ProjectNotFoundException, ProjectReportNotFoundException, \ ProjectsDirNotfoundException @@ -13,33 +12,6 @@ log = logging.getLogger(__name__) -class DDSProjectRepository: - """ - A repository of DDS projects backed by a database. - """ - - def __init__(self, session_factory): - """ - Instantiate a new DDSProjectRepository - :param session_factory: factory method which can produce new sqlalchemy Session objects - """ - self.session = session_factory() - - def add_dds_project(self, dds_project_id, project_name): - """ - Add a DDS project and commit it to the database - :param dds_project_id: DDS project id - :param project_name: Clarity project name - - :return: DDSProject - """ - dds_project = DDSProject(dds_project_id=dds_project_id, project_name=project_name) - self.session.add(dds_project) - self.session.commit() - - return dds_project - - class GeneralProjectRepository(object): """ Repository for a general project. For this purpose a project is represented by any director in diff --git a/delivery/services/dds_service.py b/delivery/services/dds_service.py index a501fa7..bc32384 100644 --- a/delivery/services/dds_service.py +++ b/delivery/services/dds_service.py @@ -1,74 +1,17 @@ -import tempfile -import os.path -import shutil import logging -import re -import json from tornado import gen -from delivery.models.db_models import StagingStatus, DeliveryStatus -from delivery.exceptions import ProjectNotFoundException, TooManyProjectsFound, InvalidStatusException, CannotParseDDSOutputException log = logging.getLogger(__name__) -class DDSToken: - """ - A wrapper to handle DDS tokens either from the string itself or from an - existing token file. - - Raises - ------ - FileNotFoundError - If the token is expired, DDS will delete it when attempting to create - a project or deliver data. - """ - def __init__(self, auth_token): - """ - Parameters - ---------- - auth_token : str - can be either the token string or a path to the token file - """ - self.auth_token = auth_token - - def __enter__(self): - if os.path.exists(self.auth_token): - self.token_path = self.auth_token - else: - self.temporary_token = tempfile.NamedTemporaryFile( - mode='w', delete=True) - self.temporary_token.write(self.auth_token) - self.temporary_token.flush() - - self.token_path = self.temporary_token.name - - return self.token_path - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - self.temporary_token.close() - - except AttributeError: - # No temporary file was created, nothing to do here - pass - - except FileNotFoundError: - log.error( - "Token was deleted during delivery (probably by DDS)." - " Check token expiry date (`dds auth info`).") - raise - - class DDSService(object): - def __init__( self, external_program_service, staging_service, staging_dir, delivery_repo, - dds_project_repo, session_factory, dds_conf): self.external_program_service = external_program_service @@ -76,192 +19,9 @@ def __init__( self.staging_service = staging_service self.staging_dir = staging_dir self.delivery_repo = delivery_repo - self.dds_project_repo = dds_project_repo self.session_factory = session_factory self.dds_conf = dds_conf - @staticmethod - def _parse_dds_project_id(dds_output): - log.debug('DDS output was: {}'.format(dds_output)) - pattern = re.compile(r'Project created with id: (snpseq\d+)') - hits = pattern.search(dds_output) - if hits: - return hits.group(1) - else: - raise CannotParseDDSOutputException( - f"Could not parse DDS project ID from: {dds_output}") - - async def create_dds_project( - self, - project_name, - project_metadata, - token_path): - """ - Create a new project in dds - :param project_name: Project name from Clarity - :param project_metadata: dictionnary containing pi email, project - description, owner and researcher emails as well as whether the data is - sensitive or not. - :param token_path: path to DDS authentication token. - :return: project id in dds - """ - cmd = [ - 'dds', - '--token-path', token_path, - '--log-file', self.dds_conf["log_path"], - '--no-prompt', - ] - - cmd += [ - 'project', 'create', - '--title', project_name, - '--description', f"\"{project_metadata['description']}\"", - '-pi', project_metadata['pi'] - ] - - cmd += [ - args - for owner in project_metadata.get('owners', []) - for args in ['--owner', owner] - ] - - cmd += [ - args - for researcher in project_metadata.get('researchers', []) - for args in ['--researcher', researcher] - ] - - if project_metadata.get('non-sensitive', False): - cmd += ['--non-sensitive'] - - log.debug(f"Running dds with command: {' '.join(cmd)}") - execution_result = await self.external_program_service.run_and_wait(cmd) - - if execution_result.status_code == 0: - dds_project_id = DDSService._parse_dds_project_id( - execution_result.stdout) - else: - error_msg = ( - f"Failed to create project in DDS: {execution_result.stderr}." - " DDS returned status code: {execution_result.status_code}") - log.error(error_msg) - raise RuntimeError(error_msg) - - self.dds_project_repo.add_dds_project( - project_name=project_name, - dds_project_id=dds_project_id) - - return dds_project_id - - @staticmethod - @gen.coroutine - def _run_dds_put( - delivery_order_id, - delivery_order_repo, - staging_dir, - external_program_service, - session_factory, - token_path, - dds_conf - ): - session = session_factory() - - # This is a somewhat hacky work-around to the problem that objects - # created in one thread, and thus associated with another session - # cannot be accessed by another thread, therefore it is re-materialized - # in here... - delivery_order = delivery_order_repo.get_delivery_order_by_id( - delivery_order_id, session) - try: - cmd = [ - 'dds', - '--token-path', token_path, - '--log-file', dds_conf["log_path"], - '--no-prompt', - ] - - cmd += [ - 'data', 'put', - '--mount-dir', staging_dir, - '--source', delivery_order.delivery_source, - '--project', delivery_order.delivery_project, - '--silent', - ] - - log.debug("Running dds with cmd: {}".format(" ".join(cmd))) - - execution = external_program_service.run(cmd) - delivery_order.delivery_status = DeliveryStatus.delivery_in_progress - delivery_order.dds_pid = execution.pid - session.commit() - - execution_result = yield external_program_service.wait_for_execution(execution) - - if execution_result.status_code == 0: - delivery_order.delivery_status = DeliveryStatus.delivery_successful - log.info(f"Successfully delivered: {delivery_order}") - else: - delivery_order.delivery_status = DeliveryStatus.delivery_failed - error_msg = \ - f"Failed to deliver: {delivery_order}." \ - f"DDS returned status code: {execution_result.status_code}" - log.error(error_msg) - raise RuntimeError(error_msg) - - except Exception as e: - delivery_order.delivery_status = DeliveryStatus.delivery_failed - log.error( - f"Failed to deliver: {delivery_order}" - f"because this exception was logged: {e}") - raise e - finally: - # Always commit the state change to the database - session.commit() - - @gen.coroutine - def deliver_by_staging_id( - self, - staging_id, - delivery_project, - token_path, - skip_delivery=False): - - stage_order = self.staging_service.get_stage_order_by_id(staging_id) - if not stage_order \ - or not stage_order.status == StagingStatus.staging_successful: - raise InvalidStatusException( - "Only deliver by staging_id if it has a successful status!" - "Staging order was: {}".format(stage_order)) - - delivery_order = self.delivery_repo.create_delivery_order( - delivery_source=stage_order.get_staging_path(), - delivery_project=delivery_project, - delivery_status=DeliveryStatus.pending, - staging_order_id=staging_id, - ) - - args_for_run_dds_put = { - 'delivery_order_id': delivery_order.id, - 'delivery_order_repo': self.delivery_repo, - 'staging_dir': self.staging_dir, - 'external_program_service': self.dds_external_program_service, - 'session_factory': self.session_factory, - 'token_path': token_path, - 'dds_conf': self.dds_conf, - } - - if skip_delivery: - session = self.session_factory() - delivery_order.delivery_status = DeliveryStatus.delivery_skipped - session.commit() - else: - yield DDSService._run_dds_put(**args_for_run_dds_put) - - log.info(f"Removing staged runfolder at {stage_order.staging_target}") - shutil.rmtree(stage_order.staging_target) - - return delivery_order.id - def get_delivery_order_by_id(self, delivery_order_id): return self.delivery_repo.get_delivery_order_by_id(delivery_order_id) diff --git a/requirements/prod b/requirements/prod index 92c999c..a298f45 100644 --- a/requirements/prod +++ b/requirements/prod @@ -5,4 +5,4 @@ sqlalchemy==1.4.35 alembic==1.7.7 enum34==1.1.10 arteria==1.1.3 -dds-cli==1.0.5 +dds-cli==1.0.6 diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index 2bd2d65..ea70282 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -1,6 +1,8 @@ import os +import json import mock import random +import logging from subprocess import PIPE @@ -9,7 +11,6 @@ from arteria.web.app import AppService -import delivery from delivery.app import routes as app_routes, compose_application from delivery.services.metadata_service import MetadataService from delivery.models.execution import Execution @@ -18,6 +19,7 @@ log = logging.getLogger(__name__) + class BaseIntegration(AsyncHTTPTestCase): def __init__(self, *args): super().__init__(*args) @@ -86,20 +88,34 @@ def get_app(self): if self.mock_delivery: def mock_delivery(cmd): project_id = f"snpseq{random.randint(0, 10**10):010d}" - dds_output = f"""Current user: bio -Project created with id: {project_id} -User forskare was associated with Project {project_id} as Owner=True. An e-mail notification has not been sent. -Invitation sent to email@adress.com. The user should have a valid account to be added to a -project""" log.debug(f"Mock is called with {cmd}") shell = False if cmd[0].endswith('dds'): new_cmd = ['sleep', str(self.mock_duration)] if 'project' in cmd: + dds_output = f"""Current user: bio + Project created with id: {project_id} + User forskare was associated with Project {project_id} as Owner=True. An e-mail notification has not been sent. + Invitation sent to email@adress.com. The user should have a valid account to be added to a + project""" new_cmd += ['&&', 'echo', f'"{dds_output}"'] new_cmd = " ".join(new_cmd) shell = True + elif cmd[-2:] == ['ls', '--json']: + new_cmd = ['sleep', str(0.01)] + dds_output = json.dumps([{ + "Access": True, + "Last updated": "Fri, 01 Jul 2022 14:31:13 CEST", + "PI": "matilda.aslin@medsci.uu.se", + "Project ID": "snpseq00025", + "Size": 25856185058, + "Status": "In Progress", + "Title": "AB1234" + }]) + new_cmd += ['&&', 'echo', f"'{dds_output}'"] + new_cmd = " ".join(new_cmd) + shell = True else: new_cmd = cmd diff --git a/tests/integration_tests/test_integration.py b/tests/integration_tests/test_integration.py index b2334cc..71d54da 100644 --- a/tests/integration_tests/test_integration.py +++ b/tests/integration_tests/test_integration.py @@ -1,23 +1,15 @@ import json -from functools import partial import sys -import time import tempfile from tornado.testing import * -from tornado.web import Application -from arteria.web.app import AppService - -from delivery.app import routes as app_routes, compose_application -from delivery.models.db_models import StagingStatus, DeliveryStatus from delivery.services.metadata_service import MetadataService from tests.integration_tests.base import BaseIntegration -from tests.test_utils import assert_eventually_equals, unorganised_runfolder, samplesheet_file_from_runfolder, \ - project_report_files +from tests.test_utils import unorganised_runfolder class TestPythonVersion(unittest.TestCase): """ diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 5a4fd0b..4d76d73 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -1,24 +1,12 @@ import json -from functools import partial -import sys import time import tempfile -import mock - from tornado.testing import * -from tornado.web import Application - -from arteria.web.app import AppService -from delivery.app import routes as app_routes, compose_application from delivery.models.db_models import StagingStatus, DeliveryStatus -from delivery.services.metadata_service import MetadataService -from delivery.services.external_program_service import ExternalProgramService from tests.integration_tests.base import BaseIntegration -from tests.test_utils import assert_eventually_equals, unorganised_runfolder, samplesheet_file_from_runfolder, \ - project_report_files class TestIntegrationDDS(BaseIntegration): @@ -50,7 +38,6 @@ def test_can_stage_and_delivery_runfolder(self): status_response = yield self.http_client.fetch(link) self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) - # The size of the fake project is 1024 bytes status_response = yield self.http_client.fetch(link) self.assertEqual(json.loads(status_response.body)["size"], 1024) @@ -61,7 +48,8 @@ def test_can_stage_and_delivery_runfolder(self): self.assertTrue(os.path.exists(f"/tmp/{staging_id}/{project}")) delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'fakedeliveryid2016', + 'delivery_project_id': 'snpseq00025', + 'ngi_project_name': 'AB-1234', 'auth_token': '1234', 'skip_delivery': True, } @@ -75,8 +63,6 @@ def test_can_stage_and_delivery_runfolder(self): status_response = yield self.http_client.fetch(delivery_link) self.assertEqual(json.loads(status_response.body)["status"], DeliveryStatus.delivery_skipped.name) - self.assertFalse(os.path.exists(f"/tmp/{staging_id}/{project}")) - @gen_test def test_can_stage_and_delivery_project_dir(self): # Note that this is a test which skips delivery (since to_outbox is not @@ -104,7 +90,8 @@ def test_can_stage_and_delivery_project_dir(self): for project, staging_id in staging_order_project_and_id.items(): delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'fakedeliveryid2016', + 'delivery_project_id': 'snpseq00025', + 'ngi_project_name': 'AB-1234', 'skip_delivery': True, 'dds': True, 'auth_token': '1234', @@ -257,55 +244,109 @@ def test_can_create_two_projects(self): self.assertNotEqual(dds_project_id1, dds_project_id2) -class TestIntegrationDDSLongWait(BaseIntegration): +class TestIntegrationDDSShortWait(BaseIntegration): def __init__(self, *args): super().__init__(*args) - self.mock_duration = 10 + self.mock_duration = 2 - @gen_test - def test_can_deliver_and_respond(self): - with tempfile.TemporaryDirectory(dir='./tests/resources/runfolders/', prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: + @gen_test(timeout=5) + def test_mock_duration_is_2(self): + with tempfile.TemporaryDirectory( + dir='./tests/resources/runfolders/', + prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: dir_name = os.path.basename(tmp_dir) self._create_projects_dir_with_random_data(tmp_dir) self._create_checksums_file(tmp_dir) url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) - response = yield self.http_client.fetch(self.get_url(url), method='POST', body='') - self.assertEqual(response.code, 202) + response = yield self.http_client.fetch( + self.get_url(url), + method='POST', + body='') response_json = json.loads(response.body) - staging_status_links = response_json.get("staging_order_links") + staging_order_project_and_id = response_json.get("staging_order_ids") - for project, link in staging_status_links.items(): + for project, staging_id in staging_order_project_and_id.items(): + delivery_url = '/'.join([ + self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) + delivery_body = { + 'delivery_project_id': 'snpseq00025', + 'ngi_project_name': 'AB-1234', + 'dds': True, + 'auth_token': '1234', + 'skip_mover': False, + } - self.assertEqual(project, "ABC_123") + start = time.time() - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["status"], StagingStatus.staging_successful.name) + delivery_resp = yield self.http_client.fetch( + self.get_url(delivery_url), + method='POST', + body=json.dumps(delivery_body)) + delivery_resp_as_json = json.loads(delivery_resp.body) + delivery_link = delivery_resp_as_json['delivery_order_link'] - # The size of the fake project is 1024 bytes - status_response = yield self.http_client.fetch(link) - self.assertEqual(json.loads(status_response.body)["size"], 1024) + while True: + status_response = yield self.http_client.fetch( + delivery_link) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break - staging_order_project_and_id = response_json.get("staging_order_ids") + stop = time.time() + self.assertTrue(stop - start >= self.mock_duration) + + +class TestIntegrationDDSLongWait(BaseIntegration): + def __init__(self, *args): + super().__init__(*args) + + self.mock_duration = 10 + + @gen_test + def test_can_deliver_and_not_timeout(self): + """ + This test checks that the service does not wait for the full duration + of the delivery (10s in this case) to respond. If it does wait, it will + raise a time-out error after 5s (default duration of tornado tests). + """ + with tempfile.TemporaryDirectory( + dir='./tests/resources/runfolders/', + prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: + + dir_name = os.path.basename(tmp_dir) + self._create_projects_dir_with_random_data(tmp_dir) + self._create_checksums_file(tmp_dir) + + url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) + response = yield self.http_client.fetch( + self.get_url(url), method='POST', body='') + + response_json = json.loads(response.body) + + staging_order_project_and_id = response_json.get( + "staging_order_ids") for project, staging_id in staging_order_project_and_id.items(): - self.assertTrue(os.path.exists(f"/tmp/{staging_id}")) - delivery_url = '/'.join([self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) + delivery_url = '/'.join([ + self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) delivery_body = { - 'delivery_project_id': 'fakedeliveryid2016', + 'delivery_project_id': 'snpseq00025', + 'ngi_project_name': 'AB-1234', 'dds': True, 'auth_token': '1234', 'skip_delivery': False, } - delivery_response = self.http_client.fetch(self.get_url(delivery_url), method='POST', body=json.dumps(delivery_body)) - - staging_response = yield self.http_client.fetch(staging_status_links["ABC_123"]) - self.assertEqual(json.loads(staging_response.body)["status"], StagingStatus.staging_successful.name) + delivery_response = yield self.http_client.fetch( + self.get_url(delivery_url), + method='POST', + body=json.dumps(delivery_body)) + self.assertEqual(delivery_response.code, 202) class TestIntegrationDDSUnmocked(BaseIntegration): diff --git a/tests/unit_tests/models/test_db_models.py b/tests/unit_tests/models/test_db_models.py index 42d5e32..ef47f61 100644 --- a/tests/unit_tests/models/test_db_models.py +++ b/tests/unit_tests/models/test_db_models.py @@ -11,6 +11,6 @@ def test_get_staging_path(self): status=StagingStatus.pending, ) - self.assertEquals( + self.assertEqual( staging_order.get_staging_path(), '/staging/target/data') diff --git a/tests/unit_tests/repositories/test_delivery_repository.py b/tests/unit_tests/repositories/test_delivery_repository.py index 94cafa2..7b4b9c4 100644 --- a/tests/unit_tests/repositories/test_delivery_repository.py +++ b/tests/unit_tests/repositories/test_delivery_repository.py @@ -26,6 +26,7 @@ def setUp(self): self.session = session_factory() self.delivery_order_1 = DeliveryOrder(delivery_source='/foo/source', + ngi_project_name='AB-1234', delivery_project='bar', delivery_status=DeliveryStatus.pending, staging_order_id=1) @@ -53,10 +54,12 @@ def test_get_delivery_orders(self): def test_create_delivery_order(self): - actual = self.delivery_repo.create_delivery_order(delivery_source='/foo/source2', - delivery_project='snpseq00001', - delivery_status=DeliveryStatus.pending, - staging_order_id=2) + actual = self.delivery_repo.create_delivery_order( + delivery_source='/foo/source2', + delivery_project='snpseq00001', + ngi_project_name='AB-1234', + delivery_status=DeliveryStatus.pending, + staging_order_id=2) self.assertEqual(actual.id, 2) self.assertEqual(actual.delivery_source, '/foo/source2') diff --git a/tests/unit_tests/repositories/test_project_repository.py b/tests/unit_tests/repositories/test_project_repository.py index c938e2d..2eb262f 100644 --- a/tests/unit_tests/repositories/test_project_repository.py +++ b/tests/unit_tests/repositories/test_project_repository.py @@ -8,8 +8,7 @@ from delivery.exceptions import ProjectReportNotFoundException from delivery.models.project import GeneralProject, RunfolderProject -from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository, DDSProjectRepository -from delivery.models.db_models import SQLAlchemyBase, DDSProject +from delivery.repositories.project_repository import GeneralProjectRepository, UnorganisedRunfolderProjectRepository from delivery.repositories.sample_repository import RunfolderProjectBasedSampleRepository from delivery.services.file_system_service import FileSystemService from delivery.services.metadata_service import MetadataService @@ -17,46 +16,6 @@ from tests.test_utils import UNORGANISED_RUNFOLDER -class TestDDSProjectRepository(unittest.TestCase): - def setUp(self): - # NOTE setting echo to true is very useful to se which sql statements get - # executed, but since it fills up the logs a lot it's been disabled by - # default here. - engine = create_engine('sqlite:///:memory:', echo=False) - SQLAlchemyBase.metadata.create_all(engine) - - # Throw some data into the in-memory db - session_factory = sessionmaker() - session_factory.configure(bind=engine) - - self.session = session_factory() - - self.dds_project_1 = DDSProject(project_name="CD-1234", dds_project_id="snpseq00001") - self.session.add(self.dds_project_1) - - self.dds_project_2 = DDSProject(project_name="EF-5678", dds_project_id="snpseq00002") - self.session.add(self.dds_project_2) - - self.session.commit() - - # Prep the repo - self.dds_project_repo = DDSProjectRepository(session_factory) - - def test_add_dds_project(self): - dds_project = self.dds_project_repo\ - .add_dds_project(project_name="GH-9012", dds_project_id="snpseq00003") - - self.assertIsInstance(dds_project, DDSProject) - self.assertEqual(dds_project.project_name, "GH-9012") - self.assertEqual(dds_project.dds_project_id, "snpseq00003") - - # Check that the object has been committed, i.e. there are no 'dirty' objects in session - self.assertEqual(len(self.session.dirty), 0) - project_from_session = self.session.query( - DDSProject).filter(DDSProject.project_name == dds_project.project_name).one() - self.assertEqual(project_from_session.dds_project_id, dds_project.dds_project_id) - - class TestGeneralProjectRepository(unittest.TestCase): class FakeFileSystemService(FileSystemService): diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index aeb83f4..5fe54ae 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -1,91 +1,42 @@ -import os +import json import random import tempfile -import unittest -from mock import MagicMock, AsyncMock, create_autospec, patch +from mock import MagicMock, AsyncMock, create_autospec, patch, call from tornado.testing import AsyncTestCase, gen_test from tornado.gen import coroutine from delivery.services.external_program_service import ExternalProgramService -from delivery.services.dds_service import DDSToken, DDSService -from delivery.models.db_models import DeliveryOrder, StagingOrder, StagingStatus, DeliveryStatus, DDSProject +from delivery.services.dds_service import DDSService +from delivery.models.db_models import DeliveryOrder, StagingOrder, StagingStatus, DeliveryStatus from delivery.models.execution import ExecutionResult, Execution +from delivery.models.project import DDSProject from delivery.exceptions import InvalidStatusException -from tests.test_utils import MockIOLoop, assert_eventually_equals - - -class TestDDSToken(unittest.TestCase): - # TODO test can handle existing file - # TODO raise error if file is deleted - def test_can_write_token(self): - auth_token = "1234" - with DDSToken(auth_token) as token_path: - with open(token_path, mode='r') as token_file: - self.assertEqual(auth_token, token_file.read()) - - self.assertFalse(os.path.exists(token_path)) - - def test_can_handle_existing_token(self): - auth_token = "1234" - with tempfile.NamedTemporaryFile(mode='w', delete=True) as token_file: - token_file.write(auth_token) - token_file.flush() - - with DDSToken(token_file.name) as token_path: - self.assertEqual(token_file.name, token_path) - - with open(token_path, mode='r') as dds_token: - self.assertEqual(auth_token, dds_token.read()) - - def test_raise_error_if_file_is_deleted(self): - auth_token = "1234" - with self.assertRaises(FileNotFoundError): - with DDSToken(auth_token) as token_path: - os.remove(token_path) +from tests.test_utils import assert_eventually_equals class TestDDSService(AsyncTestCase): def setUp(self): - - example_dds_project_ls_stdout = """[ - { - "Last updated": "Thu, 03 Mar 2022 11:46:31 CET", - "PI": "Dungeon master", - "Project ID": "snpseq00001", - "Size": 26956752654, - "Status": "In Progress", - "Title": "Bullywug anatomy" - }, - { - "Last updated": "Thu, 03 Mar 2022 10:34:05 CET", - "PI": "matas618", - "Project ID": "snpseq00002", - "Size": 0, - "Status": "In Progress", - "Title": "Site admins project" - } -] - """ - - self.mock_dds_runner = create_autospec(ExternalProgramService) mock_process = MagicMock() - mock_execution = Execution(pid=random.randint(1, 1000), process_obj=mock_process) + mock_execution = Execution( + pid=random.randint(1, 1000), + process_obj=mock_process) self.mock_dds_runner.run.return_value = mock_execution @coroutine def wait_as_coroutine(x): - return ExecutionResult(stdout=example_dds_project_ls_stdout, stderr="", status_code=0) + return ExecutionResult( + stdout="", + stderr="", + status_code=0) self.mock_dds_runner.wait_for_execution = wait_as_coroutine - self.mock_staging_service = MagicMock() self.mock_delivery_repo = MagicMock() - self.mock_dds_project_repo = MagicMock() self.delivery_order = DeliveryOrder( id=1, @@ -105,106 +56,237 @@ def wait_as_coroutine(x): staging_service=self.mock_staging_service, staging_dir='/foo/bar/staging_dir', delivery_repo=self.mock_delivery_repo, - dds_project_repo=self.mock_dds_project_repo, session_factory=self.mock_session_factory, - dds_conf=self.mock_dds_config + dds_conf=self.mock_dds_config, ) - # Inject separate external runner instances for the tests, since they need to return - # different information + # Inject separate external runner instances for the tests, since they + # need to return different information self.dds_service.dds_external_program_service = self.mock_dds_runner + self.dds_service.external_program_service = self.mock_dds_runner + + self.token_file = tempfile.NamedTemporaryFile(mode='w+') + self.token_file.write('ddstoken') + self.token_file.flush() super(TestDDSService, self).setUp() @gen_test - def test_deliver_by_staging_id(self): + def test_dds_put(self): source = '/foo/bar' staging_target = '/staging/dir/bar' - staging_order = StagingOrder(source=source, staging_target=staging_target) + project_id = 'snpseq00001' + deadline = '90' + + staging_order = StagingOrder( + source=source, + staging_target=staging_target) staging_order.status = StagingStatus.staging_successful self.mock_staging_service.get_stage_order_by_id.return_value = staging_order self.mock_staging_service.get_delivery_order_by_id.return_value = self.delivery_order + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id=project_id) + with patch('shutil.rmtree') as mock_rmtree: - res = yield self.dds_service.deliver_by_staging_id( - staging_id=1, - delivery_project='snpseq00001', - token_path='token_path', - ) - mock_rmtree.assert_called_once_with(staging_target) + with patch( + 'delivery.models.project' + '.DDSProject.get_ngi_project_name', + new_callable=AsyncMock, + return_value='AB-1234'): + yield dds_project.put( + staging_id=1, + deadline=deadline, + ) + + def _get_delivery_order(): + return self.delivery_order.delivery_status + + assert_eventually_equals( + self, 1, + _get_delivery_order, + DeliveryStatus.delivery_successful) + + mock_rmtree.assert_called_once_with(staging_target) + + self.mock_dds_runner.run.assert_has_calls([ + call([ + 'dds', + '--token-path', self.token_file.name, + '--log-file', '/foo/bar/log', + '--no-prompt', + 'data', 'put', + '--mount-dir', '/foo/bar/staging_dir', + '--source', staging_target, + '--project', project_id, + '--silent' + ]), + call([ + 'dds', + '--token-path', self.token_file.name, + '--log-file', '/foo/bar/log', + '--no-prompt', + 'project', 'status', 'release', + '--project', project_id, + '--no-mail', + '--deadline', deadline, + ]), + ]) - def _get_delivery_order(): - return self.delivery_order.delivery_status - assert_eventually_equals(self, 1, _get_delivery_order, DeliveryStatus.delivery_successful) - self.mock_dds_runner.run.assert_called_with([ - 'dds', - '--token-path', 'token_path', - '--log-file', '/foo/bar/log', - '--no-prompt', - 'data', 'put', - '--mount-dir', '/foo/bar/staging_dir', - '--source', '/staging/dir/bar', - '--project', 'snpseq00001', - '--silent' - ]) + @gen_test + def test_dds_put_no_release(self): + source = '/foo/bar' + staging_target = '/staging/dir/bar' + project_id = 'snpseq00001' + deadline = '90' + + staging_order = StagingOrder( + source=source, + staging_target=staging_target) + staging_order.status = StagingStatus.staging_successful + self.mock_staging_service.get_stage_order_by_id.return_value = staging_order + + self.mock_staging_service.get_delivery_order_by_id.return_value = self.delivery_order + + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id=project_id) + + with patch('shutil.rmtree') as mock_rmtree: + with patch( + 'delivery.models.project' + '.DDSProject.get_ngi_project_name', + new_callable=AsyncMock, + return_value='AB-1234'): + yield dds_project.put( + staging_id=1, + deadline=deadline, + release=False, + ) + + def _get_delivery_order(): + return self.delivery_order.delivery_status + + assert_eventually_equals( + self, 1, + _get_delivery_order, + DeliveryStatus.delivery_successful) + + mock_rmtree.assert_called_once_with(staging_target) + + self.mock_dds_runner.run.assert_called_once_with([ + 'dds', + '--token-path', self.token_file.name, + '--log-file', '/foo/bar/log', + '--no-prompt', + 'data', 'put', + '--mount-dir', '/foo/bar/staging_dir', + '--source', staging_target, + '--project', project_id, + '--silent' + ]) + + def test_dds_project_with_token_string(self): + expected_token_string = "supersecretstring" + + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=expected_token_string, + dds_project_id='snpseq00001') + + with open(dds_project.temporary_token.name) as token: + actual_token_string = token.read() + + self.assertEqual(actual_token_string, expected_token_string) + self.assertEqual( + dds_project.temporary_token.name, + dds_project._base_cmd[2]) @gen_test - def test_deliver_by_staging_id_raises_on_non_existent_stage_id(self): + def test_dds_put_raises_on_non_existent_stage_id(self): self.mock_staging_service.get_stage_order_by_id.return_value = None with self.assertRaises(InvalidStatusException): - - yield self.dds_service.deliver_by_staging_id( - staging_id=1, - delivery_project='snpseq00001', - token_path='token_path', - ) + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id='snpseq00001') + + with patch( + 'delivery.models.project' + '.DDSProject.get_ngi_project_name', + new_callable=AsyncMock, + return_value='AB-1234'): + yield dds_project.put(staging_id=1) @gen_test - def test_deliver_by_staging_id_raises_on_non_successful_stage_id(self): + def test_dds_put_raises_on_non_successful_stage_id(self): staging_order = StagingOrder() staging_order.status = StagingStatus.staging_failed self.mock_staging_service.get_stage_order_by_id.return_value = staging_order with self.assertRaises(InvalidStatusException): - - yield self.dds_service.deliver_by_staging_id( - staging_id=1, - delivery_project='snpseq00001', - token_path='token_path', - ) + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id='snpseq00001') + + with patch( + 'delivery.models.project' + '.DDSProject.get_ngi_project_name', + new_callable=AsyncMock, + return_value='AB-1234'): + yield dds_project.put(staging_id=1) def test_delivery_order_by_id(self): - delivery_order = DeliveryOrder(id=1, - delivery_source='src', - delivery_project='snpseq00001', - delivery_status=DeliveryStatus.delivery_in_progress, - staging_order_id=11, - ) + delivery_order = DeliveryOrder( + id=1, + delivery_source='src', + delivery_project='snpseq00001', + delivery_status=DeliveryStatus.delivery_in_progress, + staging_order_id=11, + ) self.mock_delivery_repo.get_delivery_order_by_id.return_value = delivery_order actual = self.dds_service.get_delivery_order_by_id(1) self.assertEqual(actual.id, 1) + @gen_test def test_possible_to_delivery_by_staging_id_and_skip_delivery(self): - - staging_order = StagingOrder(source='/foo/bar', staging_target='/staging/dir/bar') + source = '/foo/bar' + staging_target = '/staging/dir/bar' + staging_order = StagingOrder( + source=source, + staging_target=staging_target) staging_order.status = StagingStatus.staging_successful self.mock_staging_service.get_stage_order_by_id.return_value = staging_order self.mock_staging_service.get_delivery_order_by_id.return_value = self.delivery_order - self.dds_service.deliver_by_staging_id( - staging_id=1, - delivery_project='snpseq00001', - token_path='token_path', - skip_delivery=True, - ) + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id='snpseq00001') + + with patch( + 'delivery.models.project' + '.DDSProject.get_ngi_project_name', + new_callable=AsyncMock, + return_value='AB-1234'): + yield dds_project.put(staging_id=1, skip_delivery=True) def _get_delivery_order(): return self.delivery_order.delivery_status - assert_eventually_equals(self, 1, _get_delivery_order, DeliveryStatus.delivery_skipped) + + assert_eventually_equals( + self, + 1, + _get_delivery_order, + DeliveryStatus.delivery_skipped) def test_parse_dds_project_id(self): dds_output = """Current user: bio @@ -213,10 +295,12 @@ def test_parse_dds_project_id(self): Invitation sent to email@adress.com. The user should have a valid account to be added to a project""" - self.assertEqual(DDSService._parse_dds_project_id(dds_output), "snpseq00003") + self.assertEqual( + DDSProject._parse_dds_project_id(dds_output), + "snpseq00003") @gen_test - def test_create_project_token_file(self): + def test_create_project(self): project_name = "AA-1221" project_metadata = { "description": "Dummy project", @@ -226,38 +310,75 @@ def test_create_project_token_file(self): "non-sensitive": False, } - token_path = "/foo/bar/auth" - with patch( - 'delivery.services.external_program_service' - '.ExternalProgramService.run_and_wait', - new_callable=AsyncMock) as mock_run,\ - patch( - 'delivery.services.dds_service' - '.DDSService._parse_dds_project_id' + 'delivery.models.project' + '.DDSProject._parse_dds_project_id' ) as mock_parse_dds_project_id: - mock_run.return_value.status_code = 0 mock_parse_dds_project_id.return_value = "snpseq00001" - yield self.dds_service.create_dds_project( + yield DDSProject.new( project_name, project_metadata, - token_path) + auth_token=self.token_file.name, + dds_service=self.dds_service) - mock_run.assert_called_once_with([ + self.mock_dds_runner.run.assert_called_with([ 'dds', - '--token-path', token_path, + '--token-path', self.token_file.name, '--log-file', '/foo/bar/log', '--no-prompt', 'project', 'create', - '--title', project_name, + '--title', project_name.replace('-', ''), '--description', f'"{project_metadata["description"]}"', '-pi', project_metadata['pi'], '--owner', project_metadata['owners'][0], '--researcher', project_metadata['researchers'][0], '--researcher', project_metadata['researchers'][1], ]) - self.mock_dds_project_repo.add_dds_project\ - .assert_called_once_with( - project_name=project_name, - dds_project_id=mock_parse_dds_project_id.return_value) + + @gen_test + def test_release_project(self): + project_id = 'snpseq00001' + deadline = '90' + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id=project_id) + + yield dds_project.release(deadline=deadline) + + self.mock_dds_runner.run.assert_called_with([ + 'dds', + '--token-path', self.token_file.name, + '--log-file', '/foo/bar/log', + '--no-prompt', + 'project', 'status', 'release', + '--project', project_id, + '--no-mail', + '--deadline', deadline, + ]) + + @gen_test + def test_get_dds_project_title(self): + mock_dds_project = [{ + "Access": True, + "Last updated": "Fri, 01 Jul 2022 14:31:13 CEST", + "PI": "matilda.aslin@medsci.uu.se", + "Project ID": "snpseq00025", + "Size": 25856185058, + "Status": "In Progress", + "Title": "AB1234" + }] + + with patch( 'delivery.models.project.DDSProject._run', + new_callable=AsyncMock, + return_value=json.dumps(mock_dds_project), + ): + dds_project = DDSProject( + dds_service=self.dds_service, + auth_token=self.token_file.name, + dds_project_id=mock_dds_project[0]["Project ID"], + ) + + ngi_project_name = yield dds_project.get_ngi_project_name() + self.assertEqual(ngi_project_name, "AB-1234") From f8046f5a98890428ffd864835e0c87896acc2066 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 13 Jul 2022 13:19:38 +0200 Subject: [PATCH 06/11] Ignore args from sys.argv when testing --- requirements/prod | 2 +- tests/integration_tests/base.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/requirements/prod b/requirements/prod index a298f45..b35a29a 100644 --- a/requirements/prod +++ b/requirements/prod @@ -4,5 +4,5 @@ tornado==6.1 sqlalchemy==1.4.35 alembic==1.7.7 enum34==1.1.10 -arteria==1.1.3 +arteria==1.1.4 dds-cli==1.0.6 diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index ea70282..085e4f4 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -78,8 +78,10 @@ def get_app(self): # app service start method to start up the the application path_to_this_file = os.path.abspath( os.path.dirname(os.path.realpath(__file__))) - app_svc = AppService.create(product_name="test_delivery_service", - config_root="{}/../../config/".format(path_to_this_file)) + app_svc = AppService.create( + product_name="test_delivery_service", + config_root="{}/../../config/".format(path_to_this_file), + args=[]) config = app_svc.config_svc composed_application = compose_application(config) From 05b18744ddf59abf91b203df70720e247dde7832 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Wed, 20 Jul 2022 17:00:23 +0200 Subject: [PATCH 07/11] Lint code --- delivery/services/staging_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delivery/services/staging_service.py b/delivery/services/staging_service.py index 54fa63b..0052487 100644 --- a/delivery/services/staging_service.py +++ b/delivery/services/staging_service.py @@ -103,7 +103,7 @@ def _copy_dir(staging_order_id, external_program_service, session_factory, stagi # Parse the file size from the output of rsync stats: # Total file size: 207,707,566 bytes - match = re.search('Total file size: ([\d,]+) bytes', + match = re.search(r'Total file size: ([\d,]+) bytes', execution_result.stdout, re.MULTILINE) size_of_transfer = match.group(1) From 89b2ca2caa9f944808f203b71adfbb4930e19481 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 8 Sep 2022 08:22:08 +0200 Subject: [PATCH 08/11] Add test to make sure staging dir is not deleted early This checks that the staging target still exists after the mocked version of dds terminates. --- tests/integration_tests/base.py | 5 ++ .../integration_tests/test_integration_dds.py | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index 085e4f4..982d7e5 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -118,6 +118,11 @@ def mock_delivery(cmd): new_cmd += ['&&', 'echo', f"'{dds_output}'"] new_cmd = " ".join(new_cmd) shell = True + elif 'put' in cmd: + source_file = cmd[cmd.index("--source") + 1] + new_cmd += ['&&', 'test', '-e', source_file] + new_cmd = " ".join(new_cmd) + shell = True else: new_cmd = cmd diff --git a/tests/integration_tests/test_integration_dds.py b/tests/integration_tests/test_integration_dds.py index 4d76d73..cdd8761 100644 --- a/tests/integration_tests/test_integration_dds.py +++ b/tests/integration_tests/test_integration_dds.py @@ -301,6 +301,55 @@ def test_mock_duration_is_2(self): stop = time.time() self.assertTrue(stop - start >= self.mock_duration) + @gen_test(timeout=5) + def test_can_delivery_data_asynchronously(self): + with tempfile.TemporaryDirectory( + dir='./tests/resources/runfolders/', + prefix='160930_ST-E00216_0111_BH37CWALXX_') as tmp_dir: + + dir_name = os.path.basename(tmp_dir) + self._create_projects_dir_with_random_data(tmp_dir) + self._create_checksums_file(tmp_dir) + + url = "/".join([self.API_BASE, "stage", "runfolder", dir_name]) + response = yield self.http_client.fetch( + self.get_url(url), + method='POST', + body='') + + response_json = json.loads(response.body) + + staging_order_project_and_id = response_json.get("staging_order_ids") + + for project, staging_id in staging_order_project_and_id.items(): + delivery_url = '/'.join([ + self.API_BASE, 'deliver', 'stage_id', str(staging_id)]) + delivery_body = { + 'delivery_project_id': 'snpseq00025', + 'ngi_project_name': 'AB-1234', + 'dds': True, + 'auth_token': '1234', + 'skip_mover': False, + } + + delivery_resp = yield self.http_client.fetch( + self.get_url(delivery_url), + method='POST', + body=json.dumps(delivery_body)) + + delivery_resp_as_json = json.loads(delivery_resp.body) + delivery_link = delivery_resp_as_json['delivery_order_link'] + + while True: + status_response = yield self.http_client.fetch( + delivery_link) + if (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_successful.name): + break + elif (json.loads(status_response.body)["status"] + == DeliveryStatus.delivery_failed.name): + raise Exception("Delivery failed") + class TestIntegrationDDSLongWait(BaseIntegration): def __init__(self, *args): From e5960f04008ecfe0daac6db354368f22578add93 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 8 Sep 2022 11:06:54 +0200 Subject: [PATCH 09/11] Unpin dds-cli's version On Miarka, the latest version of dds is installed through the module system and we want to mimic that here. --- requirements/prod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/prod b/requirements/prod index b35a29a..fbec63a 100644 --- a/requirements/prod +++ b/requirements/prod @@ -5,4 +5,4 @@ sqlalchemy==1.4.35 alembic==1.7.7 enum34==1.1.10 arteria==1.1.4 -dds-cli==1.0.6 +dds-cli From 803880d17afa3ec18671973f3bc4a82694a03c62 Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Fri, 23 Sep 2022 12:51:44 +0200 Subject: [PATCH 10/11] Add test to make sure that token exists when delivering --- tests/integration_tests/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index 982d7e5..eee9377 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -120,7 +120,9 @@ def mock_delivery(cmd): shell = True elif 'put' in cmd: source_file = cmd[cmd.index("--source") + 1] + auth_token = cmd[cmd.index("--token-path") + 1] new_cmd += ['&&', 'test', '-e', source_file] + new_cmd += ['&&', 'test', '-e', auth_token] new_cmd = " ".join(new_cmd) shell = True else: From 728cfbc3d594c8a914765a4f3a4b3bc77422983d Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Thu, 13 Oct 2022 08:55:29 +0200 Subject: [PATCH 11/11] Apply Matilda's comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Matilda Åslin --- tests/integration_tests/base.py | 2 +- tests/unit_tests/services/test_dds.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/base.py b/tests/integration_tests/base.py index eee9377..afdccdc 100644 --- a/tests/integration_tests/base.py +++ b/tests/integration_tests/base.py @@ -109,7 +109,7 @@ def mock_delivery(cmd): dds_output = json.dumps([{ "Access": True, "Last updated": "Fri, 01 Jul 2022 14:31:13 CEST", - "PI": "matilda.aslin@medsci.uu.se", + "PI": "pi@email.com", "Project ID": "snpseq00025", "Size": 25856185058, "Status": "In Progress", diff --git a/tests/unit_tests/services/test_dds.py b/tests/unit_tests/services/test_dds.py index 5fe54ae..53eedf8 100644 --- a/tests/unit_tests/services/test_dds.py +++ b/tests/unit_tests/services/test_dds.py @@ -363,14 +363,15 @@ def test_get_dds_project_title(self): mock_dds_project = [{ "Access": True, "Last updated": "Fri, 01 Jul 2022 14:31:13 CEST", - "PI": "matilda.aslin@medsci.uu.se", + "PI": "pi@email.com", "Project ID": "snpseq00025", "Size": 25856185058, "Status": "In Progress", "Title": "AB1234" }] - with patch( 'delivery.models.project.DDSProject._run', + with patch( + 'delivery.models.project.DDSProject._run', new_callable=AsyncMock, return_value=json.dumps(mock_dds_project), ):