diff --git a/service/controllers/auth.py b/service/controllers/auth.py new file mode 100644 index 0000000..760c857 --- /dev/null +++ b/service/controllers/auth.py @@ -0,0 +1,137 @@ +from flask_restful import Resource +from flask import Flask, request +from tapisservice.errors import AuthenticationError, BaseTapisError + +import globus_sdk +from globus_sdk import TransferAPIError + +from utils import * +from errors import * + +logger = get_logger(__name__) +app = Flask(__name__) + + +class AuthURLResource(Resource): + """ + Return authorization URL given client Id + """ + + def get(self, client_id, endpoint_id): + # TODO: a call to get_identities needs to be authenticated + # there might not be a way to figure out if the client_id + # is valid or not without letting the user go to the url and find out + try: + client = globus_sdk.NativeAppAuthClient(client_id) + except Exception as e: + msg = f'Encountered exception while initializing globus_sdk for client {client_id}\n\t{e}' + logger.error(msg) + raise InternalServerError(msg=msg) + + DEPENDENT_SCOPE = f"https://auth.globus.org/scopes/{endpoint_id}/data_access" + SCOPE = f"urn:globus:auth:scope:transfer.api.globus.org:all[*{DEPENDENT_SCOPE}]" + + session_client = client.oauth2_start_flow(refresh_tokens=True, requested_scopes=SCOPE) + + authorize_url = client.oauth2_get_authorize_url() + # authorize_url = start_auth_flow(client) + logger.debug(f"successfully got auth url for client {client_id}") + return utils.ok( + result = {"url": authorize_url, "session_id": session_client.verifier}, + msg = f'Please go to the URL and login' + ) + +class TokensResource(Resource): + """ + exchange client_id, session_id, & auth code for access and refresh tokens + """ + + def get(self, client_id, session_id, auth_code): + + try: + client = globus_sdk.NativeAppAuthClient(client_id) + session_client = client.oauth2_start_flow(verifier=session_id) + except Exception as e: + msg = f'Encountered exception while initializing globus_sdk with client id:: {client_id}\n\t{e}\n\t' + logger.error(msg) + raise InternalServerError(msg=msg) + + + # get access / refresh tokens + try: + token_response = client.oauth2_exchange_code_for_tokens(auth_code).by_resource_server + except globus_sdk.services.auth.errors.AuthAPIError: + msg = f'Invalid auth code given for client {client_id}' + logger.error(msg) + raise AuthenticationError(msg=msg, code=401) + else: + try: + access_token = token_response['transfer.api.globus.org']['access_token'] + refresh_token = token_response['transfer.api.globus.org']['refresh_token'] + except KeyError as e: + msg = f'Could not parse tokens from response:: {e}' + logger.error(msg) + raise InternalServerError(msg=msg) + + return utils.ok( + result = {"access_token": access_token, "refresh_token": refresh_token}, + msg = "successfully authorized globus client" + ) + +class CheckTokensResource(Resource): + """ + check validity of auth / refresh tokens and exchange if needed + """ + + def get(self, endpoint_id): + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + + if access_token is None or refresh_token is None: + return utils.error( + msg = 'Access token and refresh token must be provided as query parameters' + ) + + # check access token + # check refresh token + # if access token is valid but refresh is not, return access token + # if access token is invalid but refresh token is valid, get new token + # if access token and refresh token is invalid, error + + ac_token = '' + rf_token = '' + + ### 1/23/2024 oauth2_validate_token has been deprecated + # check if given tokens are valid + client = globus_sdk.NativeAppAuthClient(endpoint_id) + # try: + # ac_response = client.oauth2_validate_token(access_token) + # rf_response = client.oauth2_validate_token(refresh_token) + # except: + # logger.debug('Unknown error checking validity of tokens') + # return utils.error( + # msg = 'Unable to check validity of given tokens' + # ) + # else: + # logger.debug(f'ac:: {ac_response}') + # logger.debug(f'rf:: {rf_response}') + # ac_token = access_token if ac_response['active'] == True else None + # rf_token = refresh_token if rf_response['active'] == True else None + + # if either token is none, get new tokens + if ac_token is None or rf_token is None: + try: + refresh_response = client.oauth2_refresh_token(str(refresh_token)) + except: + logger.debug('Unknown error generating new tokens. Please try again later.') + else: + logger.debug(f'have token response:: {refresh_response}') + return utils.ok( + msg = 'Successfully refreshed tokens', + result = {"access_token": refresh_response['access_token'], "refresh_token": rf_token} + ) + + return utils.ok( + result = {"access_token": ac_token, "refresh_token": rf_token}, + msg = 'Successfully validated tokens' + ) \ No newline at end of file diff --git a/service/controllers/healthcheck.py b/service/controllers/healthcheck.py new file mode 100644 index 0000000..93c2025 --- /dev/null +++ b/service/controllers/healthcheck.py @@ -0,0 +1,5 @@ +from tapisservice.tapisflask.resources import ReadyResource + + +class HealthcheckResource(ReadyResource): + pass \ No newline at end of file diff --git a/service/controllers/ops.py b/service/controllers/ops.py new file mode 100644 index 0000000..e606b9f --- /dev/null +++ b/service/controllers/ops.py @@ -0,0 +1,307 @@ +import os +from functools import partial +from typing import List + +from flask_restful import Resource +from flask import Flask, request +from tapisservice.errors import AuthenticationError, BaseTapisError + +import globus_sdk +from globus_sdk import TransferAPIError +from globus_sdk.experimental.auth_requirements_error import GlobusAuthRequirementsError + +from utils import * +from errors import * + +logger = get_logger(__name__) +app = Flask(__name__) + + +class OpsResource(Resource): + + def retry( + self, + fn: partial, + attempts=0, + max_attempts=1, + reraise: List[Exception] = [] + ): + try: + result = fn() + if result.http_status != 200: + raise Exception(f"Expected http status code 200 | Recieved {str(result.http_status)}") + return result + except Exception as e: + if type(e) in reraise: + logger.debug(f"Re-raising exception | {e}") + raise e + logger.warn(f"Error while retrying: {str(e)}") + attempts += 1 + if attempts == max_attempts: + logger.error(f"Error: Max retries reached | Last Error: {e}") + raise e + + return self.retry(fn, attempts=attempts, max_attempts=max_attempts, reraise=reraise) + + + # ls + def do_ls(self, transfer_client, endpoint_id, path, filter_, query_dict): + logger.debug(f'in do_ls with {endpoint_id} {path} {filter_} {query_dict}') + result = None + try: + result = transfer_client.operation_ls( + endpoint_id=(endpoint_id), + path=path, + show_hidden=False, + filter=filter_, + query_params=query_dict if query_dict != {} else None + ) + except globus_sdk.TransferAPIError as e: + if e.code == 'ConsentRequired': + pass # TODO + logger.debug(f'Got transfer api error in do_ls {e.code}\n\t{e}') + raise e + except Exception as e: + logger.error(f'exception while doing ls:: {e}') + raise e + + if not result: + logger.debug(f'ls operation returned no results with path {path}, filter {filter_}, and query {query_dict}') + else: + logger.debug(f'ls operation returned result {result.http_status}') + return result + + def get(self, client_id, endpoint_id, path): + # parse args & perform precheck + path = format_path(path) + logger.debug(f'beginning ls with client:: {client_id} & path:: {path}') + transfer_client = None + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + query_dict = { + "limit": request.args.get('limit'), + "offset": request.args.get('offset') + } + filter_ = request.args.get('filter') + + try: + transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) + except PythonAuthenticationError: + logger.error(f'Invalid token given for client {client_id}') + msg='Access token invalid. Please provide valid token.' + raise AuthenticationError(msg=msg, code=401) + + # perform ls op + logger.debug(f"performing ls with client {client_id} on endpoint {endpoint_id} and path {path}") + + list_files_op = partial( + self.do_ls, + transfer_client=transfer_client, + endpoint_id=endpoint_id, + path=path, + filter_=filter_, + query_dict=query_dict + ) + try: + result = self.retry(list_files_op, max_attempts=5, reraise=[TransferAPIError]) + except TransferAPIError as e: + logger.debug(f'encountered {e.code} while checking {path}') + if e.code == 'ExternalError.DirListingFailed.NotDirectory': + print('got not directory error - trying parent dir') + # requested object is not a dir - assume single file instead + + parent_dir, filename = os.path.split(path) + # If parent dir is empty string, assume it's root + parent_dir = parent_dir or "/" + logger.debug(f'trying single file filter "name:{filename}" on parent dir "{parent_dir}" / filename "{filename}"') + list_files_op = partial( + self.do_ls, + transfer_client=transfer_client, + endpoint_id=endpoint_id, + path=parent_dir, + filter_=f'name:{filename}', + query_dict=query_dict + ) + + result = self.retry(list_files_op, max_attempts=5) + + return utils.ok( + msg='Successfully listed files', + result=result.data, + metadata={'access_token': access_token} + ) + + raise handle_transfer_error(e) + except GlobusAuthRequirementsError as e: + logger.error(f'Got Globus auth requirements error in GET: {e}') + raise handle_transfer_error(e) + except Exception as e: + logger.error(f'this should not print. You got:: {e}') + raise handle_transfer_error(e) + # TODO: if the tokens get refreshed live, we could have a race condition + # figure out a way to test if refreshed access tokens will cause a call to fail + # especially for concurrent ops + # logger.debug(f'result is {result}') + logger.info(f'Successfully listed files on path:: {path} for client: {client_id}') + return utils.ok( + msg='Successfully listed files', + result=result.data, + metadata={'access_token': access_token} + ) + + # mkdir + def do_mkdir(self, transfer_client, endpoint_id, path): + result = transfer_client.operation_mkdir( + endpoint_id=endpoint_id, + path=path + ) + return result + + def post(self, client_id, endpoint_id, path): + # parse args and perform precheck + logger.debug(f'in mkdir, have path:: {path}') + transfer_client = None + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + + if not access_token or not refresh_token: + logger.error(f'error parsing args for client {client_id}') + raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') + + try: + transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) + except PythonAuthenticationError: + logger.error(f'Invalid token given for client {client_id}') + raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') + except TransferAPIError as e: + logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') + # api errors come through as specific codes, each one can be handled separately + raise handle_transfer_error(e) + except Exception as e: + logger.error(f'exception performing mkdir for client {client_id} at path {path}:: {e}') + raise InternalServerError(msg=f"Unkown error performing mkdir at path {path}. Please check request syntax and try again.") + + # perform mkdir op + try: + result = self.do_mkdir(transfer_client, endpoint_id, path) + except TransferAPIError as e: + logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') + # api errors come through as specific codes, each one can be handled separately + raise handle_transfer_error(e) + # TODO: handle intermediate folder creation? e.g. path/to/thing instead of path/ + except Exception as e: + logger.error(f'exception while performing mkdir operation for client {client_id} at path {path}:: {e}') + raise InternalServerError(msg=f'Unknown error while performing mkdir operation at path {path}') + + logger.info(f'successfull mkdir for client {client_id} at path {path}') + return utils.ok( + msg=f'Successfully created directory at {path}', + result=result.data, + metadata={'access_token': access_token} + ) + + # delete + def do_delete(self, transfer_client, delete_task): + result = transfer_client.submit_delete(delete_task) + return result + + def delete(self, client_id, endpoint_id, path): + # parse args and perform precheck + logger.debug(f'in delete, have path:: {path}') + transfer_client = None + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + recurse = request.args.get('recurse', False) + + if not access_token or not refresh_token: + logger.error('error parsing args') + raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') + + try: + transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) + except PythonAuthenticationError: + logger.error(f'Invalid token given for client {client_id}') + raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') + except Exception as e: + logger.error(f'exception while performing delete operation for client {client_id} at path {path}:: {e}') + raise InternalServerError() + # TODO: handle more exceptions? + + + # perform delete + try: + delete_task = globus_sdk.DeleteData( + transfer_client=transfer_client, + endpoint=endpoint_id, + recursive=bool(recurse) + ) + delete_task.add_item(path) + logger.debug(f'have delete task:: {delete_task}') + # result = transfer_client.submit_delete(delete_task) + result = self.do_delete(transfer_client, delete_task) + except TransferAPIError as e: + logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') + # api errors come through as specific codes, each one can be handled separately + raise handle_transfer_error(e) + except Exception as e: + # TODO: make sure path exists on endpoint + logger.error(f'exception while performing delete operation for client {client_id} at path {path}:: {e}') + raise InternalServerError() + logger.info(f'Successful delete for client {client_id} at path {path}') + return utils.ok( + result=result.data, + msg='Success' + ) + + + # rename + def do_rename(self, transfer_client, endpoint_id, oldpath, newpath): + result = transfer_client.operation_rename(endpoint_id, oldpath=oldpath, newpath=newpath) + return result + + def put(self, client_id, endpoint_id, path): + logger.debug(f'In rename op with client {client_id}, endpoint {endpoint_id}, and oldpath {path}') + # parse args and perform precheck + transfer_client = None + logger.debug(f'args:: {request.args}\njson:: {request.json}') + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + dest = request.json.get('destination') + + if not access_token or not refresh_token: + logger.error('error parsing args') + raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') + + try: + transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) + except PythonAuthenticationError: + logger.error(f'Invalid token given for client {client_id}') + raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') + except Exception as e: + return utils.error(f'exception while performing rename operation for client {client_id} at path {path}:: {e}') + # TODO: handle more exceptions? + + # perform rename + try: + # result = transfer_client.operation_rename(endpoint_id, oldpath=path, newpath=dest) + result = self.do_rename(transfer_client, endpoint_id, path, dest) + except TransferAPIError as e: + logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') + if e.code == 'EndpointError': + logger.error(f'e code is endpoint error. text is {e.text} \nand reason is {e.http_reason} \nand message is {e.message}') + if 'No such file or directory' in e.text: + raise handle_transfer_error(PathNotFoundError()) + if e.http_reason == 'PATH_NOT_FOUND': + raise handle_transfer_error(PathNotFoundError()) + else: + logger.error(f'exception code is endpoint error but "No such file or directory" not in exception text') + logger.error(f'bro idk whats going on here') + raise handle_transfer_error(e) + except Exception as e: + logger.error(f'exception in rename with client {client_id}, endpoint {endpoint_id} and path {path}:: {e}') + raise handle_transfer_error(e) + logger.info(f'Successful rename for client {client_id} at path {path}') + return utils.ok( + msg=f'Success', + result = result.data + ) diff --git a/service/controllers/transfers.py b/service/controllers/transfers.py new file mode 100644 index 0000000..cabd83a --- /dev/null +++ b/service/controllers/transfers.py @@ -0,0 +1,107 @@ +from flask_restful import Resource +from flask import Flask, request +from tapisservice.errors import AuthenticationError, BaseTapisError + +import globus_sdk +from globus_sdk import TransferAPIError + +from utils import * +from errors import * + +logger = get_logger(__name__) +app = Flask(__name__) + + +class TransferResource(Resource): + def post(self, client_id): + logger.info(f'Beginning creation of transfer task for client: {client_id} with args: {request.args} and json: {request.json}') + logger.debug(f'request headers:: {request.headers}') + + ## parse args + try: + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + src = request.json.get('source_endpoint') + dest = request.json.get('destination_endpoint') + items = request.json.get('transfer_items') + except Exception as e: + logger.error(f'Error parsing args for request:: {e}') + + logger.debug(f'have setup args \n{access_token}\n{refresh_token}\n{src}\n{dest}\n{items}') + + if not access_token or not refresh_token: + logger.error('error parsing args') + raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') + + ## get xfer client + transfer_client = None + try: + transfer_client = precheck(client_id, [src, dest], access_token, refresh_token) + logger.debug(f'have tc:: {transfer_client}') + except TransferAPIError as e: + logger.error(f'got TransferAPIError trying to submit transfer job:: {e}') + raise handle_transfer_error(e.http_reason) + except GlobusError as e: + raise handle_transfer_error(e) + except Exception as e: + logger.error(f'failed to authenticate transfer client :: {e}') + raise InternalServerError(msg='Failed to authenticate transfer client') + + ## perform request + result = (transfer(transfer_client, src, dest, items)) + if "File Transfer Failed" in result: + logger.error(f'File transfer failed due to {result}') + raise GlobusError(msg='File transfer failed') + + logger.info(f'Successful transfer for client {client_id} from {src} to {dest}') + return utils.ok( + result=result.data, + msg=f'Success' + ) + +class ModifyTransferResource(Resource): + # TODO: + # Make sure tokens are valid + # + def get(self, client_id, task_id): + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + + try: + # transfer_client = get_transfer_client(client_id, refresh_token, access_token) + transfer_client = precheck(client_id, None, access_token, refresh_token) + except Exception as e: + logger.error(f'error while getting transfer client for client id {client_id}: {e}') + raise InternalServerError(msg='Error while authenticating globus client') + try: + result = transfer_client.get_task(task_id) + except Exception as e: + logger.error(f'error while getting transfer task with id {task_id}: {e}') + raise InternalServerError(msg='Error retrieving transfer task') + logger.info(f'Successful modify with client {client_id} of task {task_id}') + return utils.ok( + result=result.data, + msg='successfully retrieved transfer task' + ) + + + def delete(self, client_id, task_id): + access_token = request.args.get('access_token') + refresh_token = request.args.get('refresh_token') + + try: + # transfer_client = get_transfer_client(client_id, refresh_token, access_token) + transfer_client = precheck(client_id, None, access_token, refresh_token) + except Exception as e: + logger.error(f'error while getting transfer client for client id {client_id}: {e}') + raise AuthenticationError(msg='Error while authenticating globus client') + try: + result = transfer_client.cancel_task(task_id) + except Exception as e: + logger.error(f'error while canceling transfer task with id {task_id}: {e}') + raise AuthenticationError(msg='Error retrieving transfer task') + logger.info(f'Successful delete with client {client_id} of task {task_id}') + return utils.ok( + result=result.data, + msg='successfully canceled transfer task' + ) diff --git a/service/errors.py b/service/errors.py index ff1ba9e..0035a50 100644 --- a/service/errors.py +++ b/service/errors.py @@ -34,9 +34,11 @@ class GlobusConsentRequired(BaseTapisError): def __init__(self, msg="Endpoint requires consent", code=407): super().__init__(msg, code) +class GlobusUnauthorized(BaseTapisError): + def __init__(self, msg="Permission denied", code=407): + msg=f"You do not have permission to perform that operation on this endpoint:: {msg}" + super().__init__(msg, code) + class GlobusPathExists(BaseTapisError): def __init__(self, msg="A directory with given path already exists", code=409): super().__init__(msg, code) - - - diff --git a/service/tests.py b/service/tests.py index 2e8062d..7493dbb 100644 --- a/service/tests.py +++ b/service/tests.py @@ -40,19 +40,9 @@ def load_config(self): return self.config -def get_endpoint_test(): - base = Base() - try: - tc = get_transfer_client(base.cid, base.rt, base.at) - # print(f'have tc:: {tc}') - except Exception as e: - print(f'get tc fail! {e}') - - try: - info = tc.get_endpoint(base.gcp_eid) - print(info) - except Exception as e: - print(f'get endpoint fail! {e}') +def get_endpoint_test(epid): + info = get_collection_type(epid) + print(f'got ep info:: {info}') def ls_test(base, path): url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/{path}' @@ -100,6 +90,12 @@ def rm_xfer_test(base): base_path = os.path.expandvars('~') try: + # base tests + # try: + # get_endpoint_test(base.gcp_eid) + # except Exception as e: + # print(e) + # fails['base_test_1'] = e # ls tests try: ls_test(base, base_path) diff --git a/service/utils.py b/service/utils.py index 407e96f..d5712ee 100644 --- a/service/utils.py +++ b/service/utils.py @@ -2,6 +2,7 @@ from multiprocessing import AuthenticationError as PythonAuthenticationError from datetime import datetime, timedelta import json +import os ## globus import globus_sdk @@ -17,6 +18,28 @@ logger = get_logger(__name__) +def get_collection_type(endpoint_id): # TODO + ''' + Given endpoint id, return type of collection + Requires that we have a client ID and client secret in the /globus-proxy/env file + ''' + # _user = os.environ.get("") + # _pass = os.environ.get("") + client_id = '700bc50b-241c-4805-a4fe-6bd72e50062e' + client_secret = 'eHmt/LxxbQqua73tyyQX7G0zJpDXOMQ0oBP/ld+SrS0=' + auth_client = globus_sdk.ConfidentialAppAuthClient(client_id, client_secret) + token_response = auth_client.oauth2_client_credentials_tokens().by_resource_server + logger.debug(f'in get_collection_type, got token resp: {token_response}') + + scopes = "urn:globus:auth:scope:transfer.api.globus.org:all" + at = token_response["transfer.api.globus.org"]["access_token"] + logger.debug(f'got at: {at}') + cc_authorizer = globus_sdk.ClientCredentialsAuthorizer(auth_client, scopes) + tk_authorizer = globus_sdk.AccessTokenAuthorizer(at) + # create a new client + transfer_client = globus_sdk.TransferClient(authorizer=tk_authorizer) + transfer_client.get_endpoint(endpoint_id) + def activate_endpoint(tc, ep_id, username, password): ''' @@ -143,7 +166,7 @@ def format_path(path, default_dir=None): def handle_transfer_error(exception, endpoint_id=None, msg=None): '''Tanslates transfer api errors into the configured basetapiserrors in ./errors.py''' - # logger.debug(f'\nhandling transfer API error:: {exception.code}:: with message {exception.message}\n') + logger.critical(f'\nhandling transfer API error:: {exception.code}:: with message {exception.message}\n') error = InternalServerError(msg='Interal server error', code=500) if getattr(exception, "code", None) == None: logger.debug(f'exception {exception} has no code, therefore returning InternalServerError') @@ -160,6 +183,11 @@ def handle_transfer_error(exception, endpoint_id=None, msg=None): error = GlobusConsentRequired(msg=f'Endpoint {endpoint_id} requires additonal consent. Auth flow ust be manually re-run.', code=407) if exception.code == 'ExternalError.MkdirFailed.Exists': error = GlobusPathExists(msg=f'Directory with given path already exists.', code=409) + if exception.code == 'EndpointPermissionDenied': + error = GlobusUnauthorized(msg=e.http_reason) + if exception.code == 'EndpointError': + if exception.http_reason == 'Bad Gateway': + return return error def is_endpoint_activated(tc, ep):