Skip to content


Merge pull request #140 from sennetconsortium/libpitt/138-uploads
Browse files Browse the repository at this point in the history
Libpitt/138 uploads
  • Loading branch information
maxsibilla authored Sep 20, 2023
2 parents 6a6d7bf + 3f62b84 commit 533b9c9
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 3 deletions.
227 changes: 225 additions & 2 deletions src/routes/entity_CRUD/
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ def get_ds_path(ds_uuid: str,
group_uuid = __get_dict_prop(dset, 'group_uuid')
if ent_type is None or ent_type.strip() == '':
raise ResponseException(f"Entity with uuid:{ds_uuid} needs to be a Dataset or Upload.", 400)
# if ent_type.lower().strip() == 'upload':
# return ingest_helper.get_upload_directory_absolute_path(group_uuid=group_uuid, upload_uuid=ds_uuid)
if ent_type.lower().strip() == 'upload':
return ingest_helper.get_upload_directory_absolute_path(group_uuid=group_uuid, upload_uuid=ds_uuid)
is_phi = __get_dict_prop(dset, 'contains_human_genetic_sequences')
if ent_type is None or not (ent_type.lower().strip() == 'dataset' or ent_type.lower().strip() == 'publication'):
raise ResponseException(f"Entity with uuid:{ds_uuid} is not a Dataset, Publication or Upload", 400)
Expand Down Expand Up @@ -876,6 +876,229 @@ def dataset_is_primary(dataset_uuid):
return True

## Uploads API Endpoints

# This creates a new protected Uploads folder once a user creates a new Uploads datagroup
# example url: https://my.endpoint.server/uploads
# inputs:
# - The title of the new folder
# - The UUID
# - A valid nexus token in a authorization bearer header
# returns
# 200 json with Details about the new folder (@TODO: paste in once authed again)
# 400 if invalid json sent
# 401 if user does not have hubmap read access or the token is invalid
# Example json response:
# {{
# "created_by_user_displayname": "Eris Pink",
# "created_by_user_email": "[email protected]",
# "created_by_user_sub": "12345678-abba-2468-wdwa-6484IDKSGGFF",
# "created_timestamp": 1587414020,
# "entity_type": "Upload",
# "group_name": "IEC Testing Group",
# "group_uuid": "UUID-OF-GROUP-HERE-0e006b0001e9",
# "sennet_id": "SNT664.XGCF.687",
# "last_modified_timestamp": 1587414020,
# "last_modified_user_displayname": "E Pink",
# "last_modified_user_email": "[email protected]",
# "last_modified_user_sub": "76f777all-abba-6971-hehe-125ea519865",
# "status": "New",
# "title": "TestTitle",
# "uuid": "4a583209bfe9ad6cda851d913ac44833915"
# }

@entity_CRUD_blueprint.route('/uploads', methods=['POST'])
def create_uploadstage():
if not request.is_json:
return Response("json request required", 400)
upload_request = request.json
auth_helper = AuthHelper.configured_instance(current_app.config['APP_CLIENT_ID'], current_app.config['APP_CLIENT_SECRET'])
auth_tokens = auth_helper.getAuthorizationTokens(request.headers)
if isinstance(auth_tokens, Response):
elif isinstance(auth_tokens, str):
token = auth_tokens
elif 'groups_token' in auth_tokens:
token = auth_tokens['groups_token']
return(Response("Valid nexus auth token required", 401))

requested_group_uuid = None
if 'group_uuid' in upload_request:
requested_group_uuid = upload_request['group_uuid']

ingest_helper = IngestFileHelper(current_app.config)
requested_group_uuid = auth_helper.get_write_group_uuid(token, requested_group_uuid)
upload_request['group_uuid'] = requested_group_uuid
post_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['ENTITY_WEBSERVICE_URL']) + 'entities/upload'
response =, json=upload_request, headers=get_auth_header_dict(token), verify=False)
if response.status_code != 200:
return Response(response.text, response.status_code)
new_upload = response.json()
ingest_helper.create_upload_directory(requested_group_uuid, new_upload['uuid'])
return jsonify(new_upload)
except HTTPException as hte:
return Response(hte.get_description(), hte.get_status_code())
except Exception as e:
logger.error(e, exc_info=True)
return Response("Unexpected error while creating a upload: " + str(e) + " Check the logs", 500)

#method to change the status of an Upload to "submitted"
#will also save any changes to title or description that are passed in
@entity_CRUD_blueprint.route('/uploads/<upload_uuid>/submit', methods=['PUT'])
def submit_upload(upload_uuid):
if not request.is_json:
return Response("json request required", 400)

upload_changes = request.json
upload_changes['status'] = 'Submitted'

#get auth info to use in other calls
#add the app specific header info
http_headers = {
'Authorization': request.headers["AUTHORIZATION"],
'Content-Type': 'application/json',

update_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['ENTITY_WEBSERVICE_URL']) + 'entities/' + upload_uuid
# Disable ssl certificate verification
resp = requests.put(update_url, headers=http_headers, json=upload_changes, verify = False)

#disable validations stuff for now...
##call the AirFlow validation workflow
#validate_url = commons_file_helper.ensureTrailingSlashURL(app.config['INGEST_PIPELINE_URL']) + 'uploads/' + upload_uuid + "/validate"
## Disable ssl certificate verification
#resp = requests.put(validate_url, headers=http_headers, json=upload_changes, verify = False)
#if resp.status_code >= 300:
# return Response(resp.text, resp.status_code)

return Response(resp.text, resp.status_code)

#method to validate an Upload
#saves the upload then calls the validate workflow via
#AirFlow interface
@entity_CRUD_blueprint.route('/uploads/<upload_uuid>/validate', methods=['PUT'])
def validate_upload(upload_uuid):
if not request.is_json:
return Response("json request required", 400)

upload_changes = request.json

#get auth info to use in other calls
#add the app specific header info
http_headers = {
'Authorization': request.headers["AUTHORIZATION"],
'Content-Type': 'application/json',

#update the Upload with any changes from the request
#and change the status to "Processing", the validate
#pipeline will update the status when finished

#run the pipeline validation
upload_changes['status'] = 'Processing'
update_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['ENTITY_WEBSERVICE_URL']) + 'entities/' + upload_uuid

# Disable ssl certificate verification
resp = requests.put(update_url, headers=http_headers, json=upload_changes, verify = False)
if resp.status_code >= 300:
return Response(resp.text, resp.status_code)

#disable validations stuff for now...
##call the AirFlow validation workflow
validate_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['INGEST_PIPELINE_URL']) + 'uploads/' + upload_uuid + "/validate"
## Disable ssl certificate verification
resp2 = requests.put(validate_url, headers=http_headers, json=upload_changes, verify = False)
if resp2.status_code >= 300:
return Response(resp2.text, resp2.status_code)

return Response(resp.text, resp.status_code)

#method to reorganize an Upload
#saves the upload then calls the reorganize workflow via
#AirFlow interface
@entity_CRUD_blueprint.route('/uploads/<upload_uuid>/reorganize', methods=['PUT'])
def reorganize_upload(upload_uuid):

#get auth info to use in other calls
#add the app specific header info
http_headers = {
'Authorization': request.headers["AUTHORIZATION"],
'Content-Type': 'application/json',

#update the Upload with any changes from the request
#and change the status to "Processing", the validate
#pipeline will update the status when finished
upload_changes = {}
upload_changes['status'] = 'Processing'
update_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['ENTITY_WEBSERVICE_URL']) + 'entities/' + upload_uuid

# Disable ssl certificate verification
resp = requests.put(update_url, headers=http_headers, json=upload_changes, verify = False)
if resp.status_code >= 300:
return Response(resp.text, resp.status_code)

#disable validations stuff for now...
##call the AirFlow validation workflow
validate_url = commons_file_helper.ensureTrailingSlashURL(current_app.config['INGEST_PIPELINE_URL']) + 'uploads/' + upload_uuid + "/reorganize"
## Disable ssl certificate verification
resp2 = requests.put(validate_url, headers=http_headers, json=upload_changes, verify = False)
if resp2.status_code >= 300:
return Response(resp2.text, resp2.status_code)

return Response(resp.text, resp.status_code)

@entity_CRUD_blueprint.route('/uploads/data-status', methods=['GET'])
def upload_data_status():
all_uploads_query = (
"MATCH (up:Upload) "
"OPTIONAL MATCH (up)<-[:IN_UPLOAD]-(ds:Dataset) "
"RETURN up.uuid AS uuid, up.group_name AS group_name, up.sennet_id AS sennet_id, up.status AS status, "
"up.title AS title, COLLECT(DISTINCT ds.uuid) AS datasets "

displayed_fields = [
"uuid", "group_name", "sennet_id", "status", "title", "datasets"

with Neo4jHelper.get_instance().session() as session:
results =
for upload in results:
globus_url = get_globus_url('protected', upload.get('group_name'), upload.get('uuid'))
upload['globus_url'] = globus_url
for prop in upload:
if isinstance(upload[prop], list):
upload[prop] = ", ".join(upload[prop])
if isinstance(upload[prop], (bool, int)):
upload[prop] = str(upload[prop])
if upload[prop] and upload[prop][0] == "[" and upload[prop][-1] == "]":
upload[prop] = upload[prop].replace("'",'"')
upload[prop] = json.loads(upload[prop])
upload[prop] = upload[prop][0]
if upload[prop] is None:
upload[prop] = " "
for field in displayed_fields:
if upload.get(field) is None:
upload[field] = " "
# TODO: Once url parameters are implemented in the front-end for the data-status dashboard, we'll need to return a
# TODO: link to the datasets page only displaying datasets belonging to a given upload.
return jsonify(results)

def _get_status_code__by_priority(codes):
if StatusCodes.SERVER_ERR in codes:
return StatusCodes.SERVER_ERR
Expand Down
18 changes: 18 additions & 0 deletions src/routes/entity_CRUD/
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import shutil
import subprocess
import threading

from hubmap_commons.hm_auth import AuthHelper
from hubmap_commons import file_helper
Expand Down Expand Up @@ -187,6 +188,23 @@ def move_dataset_files_for_publishing(self, uuid, group_uuid, dataset_access_lev

return None

def get_upload_directory_absolute_path(self, group_uuid, upload_uuid):
grp_name = AuthHelper.getGroupDisplayName(group_uuid)
base_dir = self.appconfig['GLOBUS_PROTECTED_ENDPOINT_FILEPATH']
abs_path = str(os.path.join(base_dir, grp_name, upload_uuid))
return abs_path

def create_upload_directory(self, group_uuid, upload_uuid):
new_directory_path = self.get_upload_directory_absolute_path(group_uuid, upload_uuid)
IngestFileHelper.make_directory(new_directory_path, None)
x = threading.Thread(target=self.set_dir_permissions, args=['protected', new_directory_path])
except Exception as e:
self.logger.error(e, exc_info=True)

def set_dataset_permissions(self, dataset_uuid, group_uuid, dataset_access_level, published, trial_run=False):
file_path = self.__dataset_directory_absolute_path(dataset_access_level, group_uuid, dataset_uuid, published)
return self.set_dir_permissions(dataset_access_level, file_path, published, trial_run=trial_run)
Expand Down

0 comments on commit 533b9c9

Please sign in to comment.