diff --git a/src/api/src/backend/views/ETLPipelines.py b/src/api/src/backend/views/ETLPipelines.py index fc129ae8..f0fdd119 100644 --- a/src/api/src/backend/views/ETLPipelines.py +++ b/src/api/src/backend/views/ETLPipelines.py @@ -43,6 +43,7 @@ class ETLPipelines(RestrictedAPIView): def post(self, request, group_id, *_, **__): """ETL Pipelines""" + print("START") # Get the group group = group_service.get(group_id, request.tenant_id) if group == None: @@ -51,7 +52,7 @@ def post(self, request, group_id, *_, **__): # Check that the user belongs to the group if not group_service.user_in_group(request.username, group_id, request.tenant_id): return Forbidden(message="You do not have access to this group") - + print("AFTER GROUP CHECK") # Git repository that contains the pipeline and task definitions for the # tapis etl pipeline uses = Uses( @@ -61,36 +62,37 @@ def post(self, request, group_id, *_, **__): branch=TAPIS_ETL_TEMPLATE_REPO_BRANCH ) ) - + print("AFTER USES") # Validate the request body based on the type of pipeline specified prepared_request = self.prepare( TapisETLPipeline, uses=uses ) - + print("AFTER REQUEST PREP") # Return the failure view instance if validation failed if not prepared_request.is_valid: return prepared_request.failure_view - + print("AFTER PREPARED REQUEST CHECK") # Get the JSON encoded body from the validation result body = prepared_request.body # Check that the id of the pipeline is unique if PipelineModel.objects.filter(id=body.id, group=group).exists(): return Conflict(f"A Pipeline already exists with the id '{body.id}'") - + print("AFTER PIPELINE CONFLICT CHECK") # Clone the git repository that contains the pipeline and task definitions that will be used tapis_owe_templates_dir = "/tmp/git/tapis-owe-templates" try: Repo.clone(uses.source.url, tapis_owe_templates_dir) except Exception as e: return ServerErrorResp(f"Error cloning the Tapis OWE Template repository: {str(e)}") - + print("AFTER GIT CLONE") try: # Open the owe-config.json file with open(os.path.join(tapis_owe_templates_dir, "owe-config.json")) as file: owe_config = json.loads(file.read()) + print("AFTER CONFIG LOAD") # Open the etl pipeline schema.json with open( os.path.join( @@ -99,10 +101,10 @@ def post(self, request, group_id, *_, **__): ) ) as file: pipeline_template = json.loads(file.read()) + print("AFTER SCHEMA LOAD") except Exception as e: return UnprocessableEntity(f"Configuration Error (owe-config.json): {str(e)}") - # Create the pipeline try: pipeline = PipelineModel.objects.create( @@ -177,6 +179,7 @@ def post(self, request, group_id, *_, **__): return BadRequest(message=e.__cause__) except Exception as e: return ServerErrorResp(f"{e}") + print("AFTER PIPELINE CREATE") # Fetch the archives specified in the request then create relations # between them and the pipline @@ -205,7 +208,7 @@ def post(self, request, group_id, *_, **__): # Delete the pipeline archive relationships that were just created [pipeline_archive.delete() for pipeline_archive in pipeline_archives] return BadRequest(message=e.__cause__) - + print("AFTER ARCHIVE CREATE") # The first tapis job should be dependent on the gen-inbound-manifests task last_task_id = "gen-inbound-manifests" @@ -226,6 +229,7 @@ def post(self, request, group_id, *_, **__): # Add the tasks from the template to the tasks list tasks.extend([TemplateTask(**task) for task in pipeline_template.tasks]) + print("AFTER TASK REQUEST CREATE") # Update the dependecies of the gen-outbound-manifests task to # include the last tapis job task gen_outbound_manifests_task = next(filter(lambda t: t.id == "gen-outbound-manifests", tasks)) @@ -251,7 +255,7 @@ def post(self, request, group_id, *_, **__): task_service.delete(tasks) pipeline.delete() return ServerErrorResp(message=e) - + print("AFTER TASK CREATE") return ResourceURLResponse( url=resource_url_builder(request.url, pipeline.id) )