diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index ba713babb..632866775 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -21,7 +21,7 @@ from iblutil.util import flatten, ensure_list from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository -from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH +from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher _logger = logging.getLogger(__name__) @@ -747,6 +747,38 @@ def cleanUp(self): os.unlink(file) +class RemoteEC2DataHandler(DataHandler): + def __init__(self, session_path, signature, one=None): + """ + Data handler for running tasks on remote compute node. Will download missing data via http using ONE + + :param session_path: path to session + :param signature: input and output file signatures + :param one: ONE instance + """ + super().__init__(session_path, signature, one=one) + + def setUp(self): + """ + Function to download necessary data to run tasks using ONE + :return: + """ + df = super().getData() + self.one._check_filesystem(df) + + def uploadData(self, outputs, version, **kwargs): + """ + Function to upload and register data of completed task via FTP patcher + :param outputs: output files from task to register + :param version: ibllib version + :return: output info of registered datasets + """ + versions = super().uploadData(outputs, version) + s3_patcher = S3Patcher(one=self.one) + return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, + versions=versions, **kwargs) + + class RemoteHttpDataHandler(DataHandler): def __init__(self, session_path, signature, one=None): """ diff --git a/ibllib/oneibl/patcher.py b/ibllib/oneibl/patcher.py index 3738d7bcf..22f682df4 100644 --- a/ibllib/oneibl/patcher.py +++ b/ibllib/oneibl/patcher.py @@ -34,13 +34,13 @@ import globus_sdk import iblutil.io.params as iopar from iblutil.util import ensure_list -from one.alf.files import get_session_path, add_uuid_string +from one.alf.files import get_session_path, add_uuid_string, full_path_parts from one.alf.spec import is_uuid_string, is_uuid from one import params from one.webclient import AlyxClient from one.converters import path_from_dataset from one.remote import globus -from one.remote.aws import url2uri +from one.remote.aws import url2uri, get_s3_from_alyx from ibllib.oneibl.registration import register_dataset @@ -633,3 +633,55 @@ def _scp(self, local_path, remote_path, dry=True): def _rm(self, flatiron_path, dry=True): raise PermissionError("This Patcher does not have admin permissions to remove data " "from the FlatIron server") + + +class S3Patcher(Patcher): + + def __init__(self, one=None): + assert one + super().__init__(one=one) + self.s3_repo = 's3_patcher' + self.s3_path = 'patcher' + + # Instantiate boto connection + self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo) + + def check_datasets(self, file_list): + # Here we want to check if the datasets exist, if they do we don't want to patch unless we force. + exists = [] + for file in file_list: + collection = full_path_parts(file, as_dict=True)['collection'] + dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name, + collection=collection, clobber=True) + if len(dset) > 0: + exists.append(file) + + return exists + + def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs): + + exists = self.check_datasets(file_list) + if len(exists) > 0 and not force: + _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True') + return + + response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) + # TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires + # changing the the alyx.data.register_view + for ds in response: + frs = ds['file_records'] + fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) + # Update the flatiron file record to be false + self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], + data={'exists': False}) + + def _scp(self, local_path, remote_path, dry=True): + + aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT)) + _logger.info(f'Transferring file {local_path} to {aws_remote_path}') + self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) + + return 0, '' + + def _rm(self, *args, **kwargs): + raise PermissionError("This Patcher does not have admin permissions to remove data.") diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index 505092e6c..d592d5c29 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -528,6 +528,8 @@ def get_data_handler(self, location=None): dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one) elif location == 'popeye': dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one) + elif location == 'ec2': + dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one) else: raise ValueError(f'Unknown location "{location}"') return dhandler