From 4413109b360161fab967a4b68ee24bc5cc3bb15a Mon Sep 17 00:00:00 2001 From: kevinpricethesecond Date: Mon, 5 Feb 2024 08:39:57 -0600 Subject: [PATCH] huge refactor, better everything --- CHANGELOG.md | 3 +- service/api.py | 7 +- service/controllers.py | 551 ------------------------------- service/errors.py | 9 +- service/resources/openapi_v3.yml | 6 + service/tests.py | 189 +++++------ service/utils.py | 257 +++++++------- 7 files changed, 240 insertions(+), 782 deletions(-) delete mode 100644 service/controllers.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bacbcd4..65764b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,11 @@ Breaking Changes: - auth flow has been reworked to allow for v5 endpoints - users will need to refresh their auth tokens New features: - - created functional tests - unit tests were inadequate for certain functions + - created functional tests - unit tests were inadequate for certain functions. These must be run from inside the container - better handling of personal connect endpoints - initial support for additional consent auth flow - initial support for consent management + - more descriptive response codes for several endpoints Bug fixes: - catch consent required errors instead of returning a 500 code diff --git a/service/api.py b/service/api.py index 8bb8cd6..ba18b0f 100644 --- a/service/api.py +++ b/service/api.py @@ -8,7 +8,8 @@ # from service.controllers import AuthURLResource, TokensResource # from service.controllers import * -from controllers import * +from service.controllers_old import * +from controllers.ops import OpsResource as _OpsResource # from service import app app = Flask(__name__) app.secret_key = os.urandom(16) @@ -32,13 +33,13 @@ def __init__(self, url_map, *items): api.add_resource(AuthURLResource, '/v3/globus-proxy/auth/url/') api.add_resource(TokensResource, '/v3/globus-proxy/auth/tokens///') api.add_resource(CheckTokensResource, '/v3/globus-proxy/auth/check_tokens/') -api.add_resource(OpsResource, '/v3/globus-proxy/ops///') +# api.add_resource(OpsResource, '/v3/globus-proxy/ops///') +api.add_resource(_OpsResource, '/v3/globus-proxy/ops///') # transfer resourced are separated due to inconsistent url pattern api.add_resource(TransferResource, '/v3/globus-proxy/transfers/') api.add_resource(ModifyTransferResource, '/v3/globus-proxy/transfers//') - # Health checks api.add_resource(ReadyResource, '/v3/globus-proxy/ready') api.add_resource(HealthcheckResource, '/v3/globus-proxy/healthcheck') diff --git a/service/controllers.py b/service/controllers.py deleted file mode 100644 index eb779e7..0000000 --- a/service/controllers.py +++ /dev/null @@ -1,551 +0,0 @@ -from ast import Pass -from distutils.log import error -import json -from os import access -import os -import traceback -import time - -from functools import partial -from typing import List - -from flask import Flask, current_app, request -from flask_restful import Resource - -from tapisservice.tapisflask import utils -from tapisservice.tapisflask.resources import ReadyResource -from tapisservice.logs import get_logger -from tapisservice.errors import AuthenticationError, BaseTapisError - -import globus_sdk -from globus_sdk import TransferAPIError -from globus_sdk.experimental.auth_requirements_error import GlobusAuthRequirementsError - -from utils import check_tokens, get_transfer_client, precheck, transfer, is_endpoint_activated, autoactivate_endpoint, is_endpoint_connected, start_auth_flow -from errors import PathNotFoundError, InternalServerError, GlobusError - -from multiprocessing import AuthenticationError as PythonAuthenticationError -from globus_sdk.scopes import TransferScopes - -logger = get_logger(__name__) -app = Flask(__name__) - - -class HealthcheckResource(ReadyResource): - pass - -class AuthURLResource(Resource): - """ - Return authorization URL given client Id - """ - - def get(self, client_id): - # TODO: a call to get_identities needs to be authenticated - # there might not be a way to figure out if the client_id - # is valid or not without letting the user go to the url and find out - try: - client = globus_sdk.NativeAppAuthClient(client_id) - except Exception as e: - msg = f'Encountered exception while initializing globus_sdk for client {client_id}\n\t{e}' - logger.error(msg) - raise InternalServerError(msg=msg) - session_client = client.oauth2_start_flow(refresh_tokens=True, requested_scopes=[TransferScopes.all]) - authorize_url = client.oauth2_get_authorize_url() - # authorize_url = start_auth_flow(client) - logger.debug(f"successfully got auth url for client {client_id}") - return utils.ok( - result = {"url": authorize_url, "session_id": session_client.verifier}, - msg = f'Please go to the URL and login' - ) - -class TokensResource(Resource): - """ - exchange client_id, session_id, & auth code for access and refresh tokens - """ - - def get(self, client_id, session_id, auth_code): - - try: - client = globus_sdk.NativeAppAuthClient(client_id) - session_client = client.oauth2_start_flow(verifier=session_id) - except Exception as e: - msg = f'Encountered exception while initializing globus_sdk with client id:: {client_id}\n\t{e}\n\t' - logger.error(msg) - raise InternalServerError(msg=msg) - - - # get access / refresh tokens - try: - token_response = client.oauth2_exchange_code_for_tokens(auth_code).by_resource_server - except globus_sdk.services.auth.errors.AuthAPIError: - msg = f'Invalid auth code given for client {client_id}' - logger.error(msg) - raise AuthenticationError(msg=msg, code=401) - else: - try: - access_token = token_response['transfer.api.globus.org']['access_token'] - refresh_token = token_response['transfer.api.globus.org']['refresh_token'] - except KeyError as e: - msg = f'Could not parse tokens from response:: {e}' - logger.error(msg) - raise InternalServerError(msg=msg) - - return utils.ok( - result = {"access_token": access_token, "refresh_token": refresh_token}, - msg = "successfully authorized globus client" - ) - -class CheckTokensResource(Resource): - """ - check validity of auth / refresh tokens and exchange if needed - """ - - def get(self, endpoint_id): - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - - if access_token is None or refresh_token is None: - return utils.error( - msg = 'Access token and refresh token must be provided as query parameters' - ) - - # check access token - # check refresh token - # if access token is valid but refresh is not, return access token - # if access token is invalid but refresh token is valid, get new token - # if access token and refresh token is invalid, error - - ac_token = '' - rf_token = '' - - ### 1/23/2024 oauth2_validate_token has been deprecated - # check if given tokens are valid - # client = globus_sdk.NativeAppAuthClient(endpoint_id) - # try: - # ac_response = client.oauth2_validate_token(access_token) - # rf_response = client.oauth2_validate_token(refresh_token) - # except: - # logger.debug('Unknown error checking validity of tokens') - # return utils.error( - # msg = 'Unable to check validity of given tokens' - # ) - # else: - # logger.debug(f'ac:: {ac_response}') - # logger.debug(f'rf:: {rf_response}') - # ac_token = access_token if ac_response['active'] == True else None - # rf_token = refresh_token if rf_response['active'] == True else None - - # if either token is none, get new tokens - if ac_token is None or rf_token is None: - try: - refresh_response = client.oauth2_refresh_token(str(refresh_token)) - except: - logger.debug('Unknown error generating new tokens. Please try again later.') - else: - logger.debug(f'have token response:: {refresh_response}') - return utils.ok( - msg = 'Successfully refreshed tokens', - result = {"access_token": refresh_response['access_token'], "refresh_token": rf_token} - ) - - return utils.ok( - result = {"access_token": ac_token, "refresh_token": rf_token}, - msg = 'Successfully validated tokens' - ) - - -class OpsResource(Resource): - - def handle_transfer_error(self, exception, endpoint_id=None): - '''Tanslates transfer api errors into the configured basetapiserrors in ./errors.py''' - error = InternalServerError(msg='Interal server error', code=500) - if getattr(exception, "code", None) == None: - return error - if exception.code == "AuthenticationFailed": - error = AuthenticationError(msg='Could not authenticate transfer client', code=401) - elif exception.code == "ClientError.NotFound": - error = PathNotFoundError(msg='Path does not exist on given endpoint', code=404) - elif exception.code == "ExternalError.DirListingFailed.GCDisconnected": - error = GlobusError(msg=f'Error connecting to endpoint {endpoint_id}. Please activate endpoint manually', code=407) - return error - - def retry( - self, - fn: partial, - attempts=0, - max_attempts=1, - reraise: List[Exception] = [] - ): - try: - result = fn() - if result.http_status != 200: - raise Exception(f"Expected http status code 200 | Recieved {str(result.http_status)}") - return result - except Exception as e: - if type(e) in reraise: - logger.debug(f"Re-raising exception | {e}") - raise e - logger.warn(f"Error while retrying: {e}") - attempts += 1 - if attempts == max_attempts: - logger.error(f"Error: Max retries reached | Last Error: {e}") - raise e - - return self.retry(fn, attempts=attempts, max_attempts=max_attempts, reraise=reraise) - - - def format_path(self, path, default_dir=None): - ''' - Force absoulte paths for now, due to Globus not ahndling /~/ the same way on all systems - if a user provides a relative path, it will instead be returned as an INCORRECT abs path. - ''' - logger.info(f'building path with path {path} and default {default_dir} ') - - return f"/{path.rstrip('/').lstrip('/')}" - - # ls - def do_ls(self, transfer_client, endpoint_id, path, filter_, query_dict): - logger.debug(f'in do_ls with {endpoint_id} {path} {filter_} {query_dict}') - result = None - try: - result = transfer_client.operation_ls( - endpoint_id=(endpoint_id), - path=path, - show_hidden=False, - filter=filter_, - query_params=query_dict if query_dict != {} else None - ) - except globus_sdk.TransferAPIError as e: - # if the error is something other than consent_required, reraise it - if not e.info.consent_required: - # FIXME: do consent auth flow - raise e - raise self.handle_transfer_error(e) - except Exception as e: - logger.error(f'exception while doing ls:: {e}') - raise e - - return result - - def get(self, client_id, endpoint_id, path): - # parse args & perform precheck - path = self.format_path(path) - logger.debug(f'beginning ls with client:: {client_id} & path:: {path}') - transfer_client = None - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - query_dict = { - "limit": request.args.get('limit'), - "offset": request.args.get('offset') - } - filter_ = request.args.get('filter') - - if not access_token or not refresh_token: - msg='Could not parse token from request parameters. Please provide valid token.' - logger.error(f'error parsing tokens from args for client {client_id}') - raise AuthenticationError(msg=msg, code=401) - - try: - transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) - except PythonAuthenticationError: - logger.error(f'Invalid token given for client {client_id}') - msg='Access token invalid. Please provide valid token.' - raise AuthenticationError(msg=msg, code=401) - - # perform ls op - logger.debug(f"performing ls with client {client_id} on endpoint {endpoint_id} and path {path}") - - list_files_op = partial( - self.do_ls, - transfer_client=transfer_client, - endpoint_id=endpoint_id, - path=path, - filter_=filter_, - query_dict=query_dict - ) - try: - result = self.retry(list_files_op, max_attempts=5, reraise=[TransferAPIError]) - except TransferAPIError as e: - logger.debug(f'encountered {e.code} while checking {path}') - if e.code == 'ExternalError.DirListingFailed.NotDirectory': - print('got not directory error - trying parent dir') - # requested object is not a dir - assume single file instead - - parent_dir, filename = os.path.split(path) - # If parent dir is empty string, assume it's root - parent_dir = parent_dir or "/" - logger.debug(f'trying single file filter "name:{filename}" on parent dir "{parent_dir}" / filename "{filename}"') - list_files_op = partial( - self.do_ls, - transfer_client=transfer_client, - endpoint_id=endpoint_id, - path=parent_dir, - filter_=f'name:{filename}', - query_dict=query_dict - ) - - result = self.retry(list_files_op, max_attempts=5) - - return utils.ok( - msg='Successfully listed files', - result=result.data, - metadata={'access_token': access_token} - ) - raise self.handle_transfer_error(e) - except Exception as e: - logger.error(f'this should not print. You got:: {e}') - raise self.handle_transfer_error(e) - # TODO: if the tokens get refreshed live, we could have a race condition - # figure out a way to test if refreshed access tokens will cause a call to fail - # especially for concurrent ops - # logger.debug(f'result is {result}') - logger.info(f'Successfully listed files on path:: {path} for client {client_id}') - return utils.ok( - msg='Successfully listed files', - result=result.data, - metadata={'access_token': access_token} - ) - - # mkdir - def do_mkdir(self, transfer_client, endpoint_id, path): - result = transfer_client.operation_mkdir( - endpoint_id=endpoint_id, - path=path - ) - return result - - def post(self, client_id, endpoint_id, path): - # parse args and perform precheck - logger.debug(f'in mkdir, have path:: {path}') - transfer_client = None - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - - if not access_token or not refresh_token: - logger.error(f'error parsing args for client {client_id}') - raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') - - try: - transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) - except PythonAuthenticationError: - logger.error(f'Invalid token given for client {client_id}') - raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') - except TransferAPIError as e: - logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') - logger.error(f' tb:: {traceback.print_tb}') - # api errors come through as specific codes, each one can be handled separately - raise self.handle_transfer_error(e) - except Exception as e: - logger.error(f'exception performing mkdir for client {client_id} at path {path}:: {e}') - raise InternalServerError(msg=f"Unkown error performing mkdir at path {path}. Please check request syntax and try again.") - - # perform mkdir op - try: - result = self.do_mkdir(transfer_client, endpoint_id, path) - except TransferAPIError as e: - logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') - # api errors come through as specific codes, each one can be handled separately - raise self.handle_transfer_error(e) - # TODO: handle intermediate folder creation? e.g. path/to/thing instead of path/ - except Exception as e: - logger.error(f'exception while performing mkdir operation for client {client_id} at path {path}:: {e}') - raise InternalServerError(msg=f'Unknown error while performing mkdir operation at path {path}') - - logger.info(f'successfull mkdir for client {client_id} at path {path}') - return utils.ok( - msg=f'Successfully created directory at {path}', - result=result.data, - metadata={'access_token': access_token} - ) - - # delete - def do_delete(self, transfer_client, delete_task): - result = transfer_client.submit_delete(delete_task) - return result - - def delete(self, client_id, endpoint_id, path): - # parse args and perform precheck - logger.debug(f'in delete, have path:: {path}') - transfer_client = None - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - recurse = request.args.get('recurse', False) - - if not access_token or not refresh_token: - logger.error('error parsing args') - raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') - - try: - transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) - except PythonAuthenticationError: - logger.error(f'Invalid token given for client {client_id}') - raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') - except Exception as e: - logger.error(f'exception while performing delete operation for client {client_id} at path {path}:: {e}') - raise InternalServerError() - # TODO: handle more exceptions? - - - # perform delete - try: - delete_task = globus_sdk.DeleteData( - transfer_client=transfer_client, - endpoint=endpoint_id, - recursive=bool(recurse) - ) - delete_task.add_item(path) - logger.debug(f'have delete task:: {delete_task}') - # result = transfer_client.submit_delete(delete_task) - result = self.do_delete(transfer_client, delete_task) - except TransferAPIError as e: - logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') - # api errors come through as specific codes, each one can be handled separately - raise self.handle_transfer_error(e) - except Exception as e: - # TODO: make sure path exists on endpoint - logger.error(f'exception while performing delete operation for client {client_id} at path {path}:: {e}') - raise InternalServerError() - logger.info(f'Successful delete for client {client_id} at path {path}') - return utils.ok( - result=result.data, - msg='Success' - ) - - - # rename - def do_rename(self, transfer_client, endpoint_id, oldpath, newpath): - result = transfer_client.operation_rename(endpoint_id, oldpath=path, newpath=dest) - return result - - def put(self, client_id, endpoint_id, path): - # parse args and perform precheck - transfer_client = None - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - dest = request.json.get('destination') - - if not access_token or not refresh_token: - logger.error('error parsing args') - raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') - - try: - transfer_client = precheck(client_id, endpoint_id, access_token, refresh_token) - except PythonAuthenticationError: - logger.error(f'Invalid token given for client {client_id}') - raise AuthenticationError(msg='Given tokens are not valid. Try again with active auth tokens') - except Exception as e: - return utils.error(f'exception while performing rename operation for client {client_id} at path {path}:: {e}') - # TODO: handle more exceptions? - - # perform rename - try: - # result = transfer_client.operation_rename(endpoint_id, oldpath=path, newpath=dest) - result = self.do_rename(transfer_client, endpoint_id, path, dest) - except TransferAPIError as e: - logger.error(f'transfer api error for client {client_id}: {e}, code:: {e.code}') - # api errors come through as specific codes, each one can be handled separately - raise self.handle_transfer_error(e) - except Exception as e: - logger.error(f'exception in rename with client {client_id}, endpoint {endpoint_id} and path {path}:: {e}') - raise InternalServerError(msg='Unknown error while performing rename') - logger.info(f'Successful rename for client {client_id} at path {path}') - return utils.ok( - msg=f'Success', - result = result.data - ) - - -class TransferResource(Resource): - def post(self, client_id): - logger.info(f'Beginning creation of transfer task for client: {client_id} with args: {request.args} and json: {request.json}') - logger.debug(f'request headers:: {request.headers}') - - ## parse args - try: - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - src = request.json.get('source_endpoint') - dest = request.json.get('destination_endpoint') - items = request.json.get('transfer_items') - except Exception as e: - logger.error(f'Error parsing args for request:: {e}') - - logger.debug(f'have setup args \n{access_token}\n{refresh_token}\n{src}\n{dest}\n{items}') - - if not access_token or not refresh_token: - logger.error('error parsing args') - raise AuthenticationError(msg='Exception while parsing request parameters. Please check your request syntax and try again') - - ## get xfer client - transfer_client = None - try: - transfer_client = precheck(client_id, [src, dest], access_token, refresh_token) - logger.debug(f'have tc:: {transfer_client}') - except GlobusError as e: - raise e - except Exception as e: - logger.error(f'failed to authenticate transfer client :: {e}') - raise InternalServerError(msg='Failed to authenticate transfer client') - - ## perform request - result = (transfer(transfer_client, src, dest, items)) - if "File Transfer Failed" in result: - logger.error(f'File transfer failed due to {result}') - raise GlobusError(msg='File transfer failed') - - logger.info(f'Successful transfer for client {client_id} from {src} to {dest}') - return utils.ok( - result=result.data, - msg=f'Success' - ) - -class ModifyTransferResource(Resource): - # TODO: - # Make sure tokens are valid - # - def get(self, client_id, task_id): - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - - try: - # transfer_client = get_transfer_client(client_id, refresh_token, access_token) - transfer_client = precheck(client_id, None, access_token, refresh_token) - except Exception as e: - logger.error(f'error while getting transfer client for client id {client_id}: {e}') - raise InternalServerError(msg='Error while authenticating globus client') - try: - result = transfer_client.get_task(task_id) - except Exception as e: - logger.error(f'error while getting transfer task with id {task_id}: {e}') - raise InternalServerError(msg='Error retrieving transfer task') - logger.info(f'Successful modify with client {client_id} of task {task_id}') - return utils.ok( - result=result.data, - msg='successfully retrieved transfer task' - ) - - - def delete(self, client_id, task_id): - access_token = request.args.get('access_token') - refresh_token = request.args.get('refresh_token') - - try: - # transfer_client = get_transfer_client(client_id, refresh_token, access_token) - transfer_client = precheck(client_id, None, access_token, refresh_token) - except Exception as e: - logger.error(f'error while getting transfer client for client id {client_id}: {e}') - raise AuthenticationError(msg='Error while authenticating globus client') - try: - result = transfer_client.cancel_task(task_id) - except Exception as e: - logger.error(f'error while canceling transfer task with id {task_id}: {e}') - raise AuthenticationError(msg='Error retrieving transfer task') - logger.info(f'Successful delete with client {client_id} of task {task_id}') - return utils.ok( - result=result.data, - msg='successfully canceled transfer task' - ) - - - - diff --git a/service/errors.py b/service/errors.py index 8a9f351..ff1ba9e 100644 --- a/service/errors.py +++ b/service/errors.py @@ -19,23 +19,24 @@ class InternalServerError(BaseTapisError): """Internal server error""" def __init__(self, msg="Internal Server Error", code=500): super().__init__(msg, code) - pass class PathNotFoundError(BaseTapisError): """Given path is not found on the endpoint""" def __init__(self, msg="Given path is not found on the endpoint", code=404): super().__init__(msg, code) - pass class GlobusError(BaseTapisError): """General error with the Globus SDK""" def __init__(self, msg="Uncaught Globus error", code=407): super().__init__(msg, code) - pass class GlobusConsentRequired(BaseTapisError): def __init__(self, msg="Endpoint requires consent", code=407): super().__init__(msg, code) - pass + +class GlobusPathExists(BaseTapisError): + def __init__(self, msg="A directory with given path already exists", code=409): + super().__init__(msg, code) + diff --git a/service/resources/openapi_v3.yml b/service/resources/openapi_v3.yml index 6d21c25..ebb08ea 100644 --- a/service/resources/openapi_v3.yml +++ b/service/resources/openapi_v3.yml @@ -509,6 +509,12 @@ paths: application/json: schema: $ref: '#/components/schemas/RespEndpointArray' + '409': + description: Directory already exists. + content: + application/json: + schema: + $ref: '#/components/schemas/RespBasic' '500': description: Server error. content: diff --git a/service/tests.py b/service/tests.py index da1f4a5..1db7496 100644 --- a/service/tests.py +++ b/service/tests.py @@ -3,6 +3,8 @@ from datetime import datetime import json import requests +from typing import List +from functools import partial # tapis from tapisservice.tapisflask import utils @@ -37,68 +39,8 @@ def load_config(self): self.base_url = self.config["base_url"] return self.config -def transfer_test(): - base = Base() - - data = { - "source_endpoint": base.source_eid, - "destination_endpoint": base.gcp_eid, - "transfer_items": [ - - { - "source_path": "/1M.dat", - "destination_path": "/home/kprice/tmp/globus_xfer/1M.dat", - "recursive": "true" - } - ] - } - query = { - "access_token": base.at, - "refresh_token": base.rt - } - headers = { - "Content-Type": "application/json" - } - - try: - response = requests.post(f"{base.base_url}/transfers/{base.cid}", json=data, params=query, headers=headers) - except Exception as e: - print(f'exception calling globus-proxy:: {e}') - print(response.status_code) - try: - assert response.status_code == 200 - except AssertionError as e: - print(f'assertion failed. Status code is {response.status_code}.\n\t{response.text}') - - task_id = response.json()['result']['task_id'] - logger.debug(f'Submitted transfer job with client {base.cid} and task id {task_id}') - - # poll for status of transfer - exit = False - runs = 0 - while not exit: - response = requests.get(f'{base.base_url}/transfers/{base.cid}/{task_id}?access_token={base.at}&refresh_token={base.rt}') - if not response.ok: - continue ## this means we got some other bogus code so just try again - logger.debug(f'got response:: {response.text}') - response = response.json() - status = None - try: - status = response['result']['status'] - except Exception as e: - print(f'IDK bro {e}') - exit = True - - if status == 'SUCCEEDED': - logger.info('polling completed. Transfer success') - exit = True - elif runs == 10: - exit = True - logger.info(f'polling ran 10 times but didn\'t get success state. Bummer.') - - -def endpoint_test(): +def get_endpoint_test(): base = Base() try: tc = get_transfer_client(base.cid, base.rt, base.at) @@ -112,67 +54,92 @@ def endpoint_test(): except Exception as e: print(f'get endpoint fail! {e}') - -def ls_test(): - logger.info(f' ~~~ listing "~" ~~~') - base = Base() - try: - tc = get_transfer_client(base.cid, base.rt, base.at) - # print(f'have tc:: {tc}') - except Exception as e: - print(f'get tc fail! {e}') - - url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/~' +def ls_test(base, path): + url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/{path}' query = {"access_token": base.at, "refresh_token": base.rt} - logger.debug(f'have url:: {url} and params {query}') response = requests.get(url, params=query) - logger.debug(response) - logger.debug(response.text) - logger.info(' ~~~ done ~~~\n') + if response.status_code != 200: + raise Exception(f'{response.status_code}:: {response.text}') -def ls_test_empty(): - logger.info(f' ~~~ listing "" ~~~') - base = Base() - try: - tc = get_transfer_client(base.cid, base.rt, base.at) - # print(f'have tc:: {tc}') - except Exception as e: - print(f'get tc fail! {e}') +def rm_test(base, path): + pass - url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/' +def mv_test(base, src, dest): + url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/{src}' + body = {"destination": f'\"{dest}\"'} query = {"access_token": base.at, "refresh_token": base.rt} - logger.debug(f'have url:: {url} and params {query}') - response = requests.get(url, params=query) - logger.debug(response) - logger.debug(response.text) - logger.info(' ~~~ done ~~~\n') - -def ls_test_file(): - path = 'home/kprice/gpsettings.json' - logger.info(f' ~~~ listing "{path}" ~~~') - base = Base() - try: - tc = get_transfer_client(base.cid, base.rt, base.at) - # print(f'have tc:: {tc}') - except Exception as e: - print(f'get tc fail! {e}') + logger.debug(f'trying mv with src {src}, and dest {dest}') + response = requests.post(url, data=body, params=query) + if response.status_code != 200: + raise Exception(f'{response.status_code}:: {response.text}') +def mkdir_test(base, path): url = f'{base.base_url}/ops/{base.cid}/{base.gcp_eid}/{path}' + body = {} query = {"access_token": base.at, "refresh_token": base.rt} - logger.debug(f'have url:: {url} and params {query}') - response = requests.get(url, params=query) - logger.debug(response) - logger.debug(response.text) - logger.info(' ~~~ done ~~~\n') + response = requests.post(url, data=body, params=query) + if response.status_code != 200: + raise Exception(f'{response.status_code}:: {response.text}') - +def make_xfer_test(base): + pass + +def get_xfer_test(base): + pass + +def rm_xfer_test(base): + pass if __name__ == '__main__': - # endpoint_test() - # transfer_test() - ls_test() - ls_test_empty() - ls_test_file() + base = Base() + fails = {} + base_path = os.path.expandvars('~') + + try: + # ls tests + try: + ls_test(base, base_path) + except Exception as e: + print(e) + fails['ls_test_1'] = e + try: + ls_test(base, f'{base_path}/test') + except Exception as e: + print(e) + fails['ls_test_2'] = e + try: + ls_test(base, f'{base_path}/test.py') + except Exception as e: + print(e) + fails['ls_test_3'] = e + # mkdir tests + try: + mkdir_test(base, f'{base_path}/mkdirtest') + except Exception as e: + print(e) + fails['mkdir_test_1'] = e + # rename tests + try: + mv_test(base, f'{base_path}/mkdirtest', f'{base_path}/mkdirtest2') + except Exception as e: + print(e) + fails['mv_test_1'] = e + # rm tests + try: + rm_test(base, f'{base_path}/mkdirtest2') + except Exception as e: + print(e) + fails['rm_test_1'] = e + + + except Exception as e: + print(f'Unknown error running tests:: {e}') + exit(1) + + if len(fails) > 0: + print(f'One or more tests failed::\n{fails}') + else: + print('All tests successful') diff --git a/service/utils.py b/service/utils.py index 2753477..a332d6e 100644 --- a/service/utils.py +++ b/service/utils.py @@ -9,30 +9,56 @@ ## tapis from tapisservice.logs import get_logger from tapisservice.tapisflask import utils +from tapisservice.errors import AuthenticationError ## local from errors import * logger = get_logger(__name__) -def get_transfer_client(client_id, refresh_token, access_token): - client = globus_sdk.NativeAppAuthClient(client_id) - # check_token(client_id, refresh_token, access_token) - tomorrow = datetime.today() + timedelta(days=1) - expires_at = tomorrow.timestamp() - authorizer = globus_sdk.RefreshTokenAuthorizer( - refresh_token=refresh_token, - auth_client=client, - access_token=access_token, - expires_at=expires_at - ) - transfer_client = globus_sdk.TransferClient(authorizer=authorizer) - return transfer_client -def start_auth_flow(client, scopes=TransferScopes.all): - client.oauth2_start_flow(refresh_tokens=True, requested_scopes=scopes) - authorize_url = client.oauth2_get_authorize_url() - return authorize_url +def activate_endpoint(tc, ep_id, username, password): + ''' + ... with username and password + ''' + activation_req = tc.endpoint_get_activation_requirements(ep_id).data + for data in activation_req["DATA"]: + if data["type"] == "myproxy": + if data["name"] == "username": + data["value"] = username + if data["name"] == "passphrase": + data["value"] = password + try: + tr = tc.endpoint_activate(ep_id, activation_req) + except Exception as e: + print(e) + print("Endpoint requires manual activation, please open " + "the following URL in a browser to activate the " + "endpoint: \n") + print("https://app.globus.org/file-manager?origin_id=%s" % ep_id) + input("Press ENTER after activating the endpoint:") + r = tc.endpoint_autoactivate(ep_id, if_expires_in=3600) + print(r) + return r + return tr + +def autoactivate_endpoint(transfer_client, endpoint_id): + try: + logger.info(f'Trying to autoactivate endpoint {endpoint_id}') + result = transfer_client.endpoint_autoactivate(endpoint_id) + logger.debug(f'have res:: {result}') + except PythonAuthenticationError as e: + logger.error(f'Endpoint activation failed due to invalid token. Endpoint {endpoint_id} must be manuallty activated') + raise PythonAuthenticationError() + except Exception as e: + logger.error(f'Unknown exception activating endpoint with id {endpoint_id} :: {e}') + # raise InternalServerError() + raise GlobusError(msg=f"Endpoint activation failed due to unknown error. Endpoint {endpoint_id} must be manuallty activated") + finally: + if result['code'] == "AutoActivationFailed": + logger.error(f'Endpoint activation failed. Endpoint {endpoint_id} must be manuallty activated') + raise GlobusError(msg=f'Endpoint activation failed. Activate endpoint {endpoint_id} by going to https://app.globus.org/file-manager?origin_id={endpoint_id}') + # TODO: spawn thread that waits for activation? def check_consent_required(client, target): ''' @@ -47,7 +73,6 @@ def check_consent_required(client, target): consent_required_scopes.extend(e.info.consent_required.required_scopes) return consent_required_scopes - def check_tokens(client_id, refresh_token, access_token): ''' *** oauth2_validate_token has been deprecated *** @@ -77,6 +102,20 @@ def check_tokens(client_id, refresh_token, access_token): return access_token, refresh_token +def get_transfer_client(client_id, refresh_token, access_token): + client = globus_sdk.NativeAppAuthClient(client_id) + # check_token(client_id, refresh_token, access_token) + tomorrow = datetime.today() + timedelta(days=1) + expires_at = tomorrow.timestamp() + authorizer = globus_sdk.RefreshTokenAuthorizer( + refresh_token=refresh_token, + auth_client=client, + access_token=access_token, + expires_at=expires_at + ) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) + return transfer_client + def get_valid_token(client_id, refresh_token): ''' Utility function that takes a client id and refresh token @@ -91,70 +130,31 @@ def get_valid_token(client_id, refresh_token): response = client.oauth2_refresh_token(refresh_token) return response['transfer.api.globus.org']['access_token'] -# stolen from neid code - -def ls_endpoint(tc, ep_id, path="~"): - ls = tc.operation_ls(ep_id, path=path) - return ls - - -def transfer(tc, source_endpoint_id, dest_endpoint_id, files='', dirs='', label='Tapisv3', sync_level="size",verify_checksum=False): - ''' - # verify_checksum for now. if there's a bottleneck, reconsider - ''' - ''' - start globus file transfer - files is a list of dictionaries with the format {source: "/source/path/file.txt", dest: "/dest/path/file.txt"} - dirs is a list of dictionaries with the format {source: "/source/path/dir", dest: "/dest/path/dir"} - ''' - tdata = globus_sdk.TransferData(tc, source_endpoint_id, dest_endpoint_id, label=label, sync_level=sync_level, verify_checksum=verify_checksum) - for file in files: - try: - # print('before ls ep init') - ls_endpoint(tc, dest_endpoint_id, path=file['destination_path'].rsplit('/', 1)[0]) - # logger.debug('after ls endpoint init') - except Exception as e: - print(file) - return ('File Transfer Failed: {}'.format(e)) - # logger.debug('before add item') - tdata.add_item(file['source_path'], file['destination_path']) - # logger.debug('after add item') - for dir in dirs: - try: - - ls_endpoint(tc, dest_endpoint_id, _path=dir['source_path']) - except Exception as e: - logger.debug('Problem with remote directory: {}'.format(dir['source_path'])) - return ('Dir Transfer Failed: {}'.format(e)) - tdata.add_item(dir['source_path'], dir['destination_path'], recursive=True) - transfer_result = tc.submit_transfer(tdata) - return transfer_result - -def activate_endpoint(tc, ep_id, username, password): - ''' - ... with username and password - ''' - activation_req = tc.endpoint_get_activation_requirements(ep_id).data - for data in activation_req["DATA"]: - if data["type"] == "myproxy": - if data["name"] == "username": - data["value"] = username - if data["name"] == "passphrase": - data["value"] = password - try: - tr = tc.endpoint_activate(ep_id, activation_req) - except Exception as e: - print(e) - print("Endpoint requires manual activation, please open " - "the following URL in a browser to activate the " - "endpoint: \n") - print("https://app.globus.org/file-manager?origin_id=%s" % ep_id) - input("Press ENTER after activating the endpoint:") - r = tc.endpoint_autoactivate(ep_id, if_expires_in=3600) - print(r) - return r - return tr +def format_path(path, default_dir=None): + ''' + Force absoulte paths for now, due to Globus not ahndling /~/ the same way on all systems + if a user provides a relative path, it will instead be returned as an INCORRECT abs path. + ''' + logger.info(f'building path with path {path} and default {default_dir} ') + + return f"/{path.rstrip('/').lstrip('/')}" +def handle_transfer_error(exception, endpoint_id=None, msg=None): + '''Tanslates transfer api errors into the configured basetapiserrors in ./errors.py''' + error = InternalServerError(msg='Interal server error', code=500) + if getattr(exception, "code", None) == None: + return error + if exception.code == "AuthenticationFailed": + error = AuthenticationError(msg='Could not authenticate transfer client', code=401) + if exception.code == "ClientError.NotFound": + error = PathNotFoundError(msg='Path does not exist on given endpoint', code=404) + if exception.code == "ExternalError.DirListingFailed.GCDisconnected": + error = GlobusError(msg=f'Error connecting to endpoint {endpoint_id}. Please activate endpoint manually', code=407) + if exception.code == 'ConsentRequired': + error = GlobusConsentRequired(msg=f'Endpoint {endpoint_id} requires additonal consent. Auth flow ust be manually re-run.') + if exception.code == 'ExternalError.MkdirFailed.Exists': + error = GlobusPathExists(msg=f'Directory with given path already exists.') + return error def is_endpoint_activated(tc, ep): ''' @@ -164,6 +164,28 @@ def is_endpoint_activated(tc, ep): endpoint = tc.get_endpoint(ep) return endpoint['activated'] +def is_endpoint_connected(transfer_client, endpoint_id): + logger.debug(f'in is_endpoint_connected with tc: {transfer_client}, ep: {endpoint_id}') + try: + result = transfer_client.get_endpoint(endpoint_id) + except Exception as e: + logger.error(f'Unknown exception getting endpoint with id {endpoint_id}:: {e}') + finally: + try: + connected = result["DATA"][0]["is_connected"] + except Exception as e: + logger.error(f'Unknown exception getting endpoint connection status with id: {endpoint_id}: {e}') + logger.debug(f'endpoint connection status:: {connected}') + # return connected + return True + +def ls_endpoint(tc, ep_id, path="~"): + ls = tc.operation_ls(ep_id, path=path) + return ls + +def parse_args(): + pass + def precheck(client_id, endpoints, access_token, refresh_token): ''' Performs several precheck opertations such as @@ -172,13 +194,19 @@ def precheck(client_id, endpoints, access_token, refresh_token): returns authenticated transfer client ''' + # make sure tokens are provided + if not access_token or not refresh_token: + msg='Could not parse token from request parameters. Please provide valid token.' + logger.error(f'error parsing tokens from args for client {client_id}') + raise AuthenticationError(msg=msg, code=401) + # check token validity try: access_token, refresh_token = check_tokens(client_id, refresh_token, access_token) except PythonAuthenticationError: # refresh token is invalid, must redo auth process logger.error(f'exception while validating tokens:: {e}') - raise PythonAuthenticationError(msg='Error while validating tokens. Please redo the Oauth2 process for this client_id') + raise AuthenticationError(msg='Could not validate tokens. Please redo the Oauth2 process for this client_id') # TODO: handle more exceptions, figure out how to make them nice for the calling function # get transfer client @@ -213,40 +241,45 @@ def precheck(client_id, endpoints, access_token, refresh_token): if not is_endpoint_activated(transfer_client, endpoint_id): logger.debug(f'ep {endpoint_id} is not active') autoactivate_endpoint(transfer_client, endpoint_id) + return transfer_client - -def autoactivate_endpoint(transfer_client, endpoint_id): - try: - logger.info(f'Trying to autoactivate endpoint {endpoint_id}') - result = transfer_client.endpoint_autoactivate(endpoint_id) - logger.debug(f'have res:: {result}') - except PythonAuthenticationError as e: - logger.error(f'Endpoint activation failed due to invalid token. Endpoint {endpoint_id} must be manuallty activated') - raise PythonAuthenticationError() - except Exception as e: - logger.error(f'Unknown exception activating endpoint with id {endpoint_id} :: {e}') - # raise InternalServerError() - raise GlobusError(msg=f"Endpoint activation failed due to unknown error. Endpoint {endpoint_id} must be manuallty activated") - finally: - if result['code'] == "AutoActivationFailed": - logger.error(f'Endpoint activation failed. Endpoint {endpoint_id} must be manuallty activated') - raise GlobusError(msg=f'Endpoint activation failed. Activate endpoint {endpoint_id} by going to https://app.globus.org/file-manager?origin_id={endpoint_id}') - # TODO: spawn thread that waits for activation? -def is_endpoint_connected(transfer_client, endpoint_id): - logger.debug(f'in is_endpoint_connected with tc: {transfer_client}, ep: {endpoint_id}') - try: - result = transfer_client.get_endpoint(endpoint_id) - except Exception as e: - logger.error(f'Unknown exception getting endpoint with id {endpoint_id}:: {e}') - finally: +def start_auth_flow(client, scopes=TransferScopes.all): + client.oauth2_start_flow(refresh_tokens=True, requested_scopes=scopes) + authorize_url = client.oauth2_get_authorize_url() + return authorize_url + +def transfer(tc, source_endpoint_id, dest_endpoint_id, files='', dirs='', label='Tapisv3', sync_level="size",verify_checksum=False): + ''' + # verify_checksum for now. if there's a bottleneck, reconsider + ''' + ''' + start globus file transfer + files is a list of dictionaries with the format {source: "/source/path/file.txt", dest: "/dest/path/file.txt"} + dirs is a list of dictionaries with the format {source: "/source/path/dir", dest: "/dest/path/dir"} + ''' + tdata = globus_sdk.TransferData(tc, source_endpoint_id, dest_endpoint_id, label=label, sync_level=sync_level, verify_checksum=verify_checksum) + for file in files: try: - connected = result["DATA"][0]["is_connected"] + # print('before ls ep init') + ls_endpoint(tc, dest_endpoint_id, path=file['destination_path'].rsplit('/', 1)[0]) + # logger.debug('after ls endpoint init') except Exception as e: - logger.error(f'Unknown exception getting endpoint connection status with id: {endpoint_id}: {e}') - logger.debug(f'endpoint connection status:: {connected}') - # return connected - return True + print(file) + return ('File Transfer Failed: {}'.format(e)) + # logger.debug('before add item') + tdata.add_item(file['source_path'], file['destination_path']) + # logger.debug('after add item') + for dir in dirs: + try: + + ls_endpoint(tc, dest_endpoint_id, _path=dir['source_path']) + except Exception as e: + logger.debug('Problem with remote directory: {}'.format(dir['source_path'])) + return ('Dir Transfer Failed: {}'.format(e)) + tdata.add_item(dir['source_path'], dir['destination_path'], recursive=True) + transfer_result = tc.submit_transfer(tdata) + return transfer_result