Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/s3_patcher' into iblsort
Browse files Browse the repository at this point in the history
  • Loading branch information
oliche committed Oct 24, 2024
2 parents 1d11edc + ac7fb6d commit 2204702
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 3 deletions.
34 changes: 33 additions & 1 deletion ibllib/oneibl/data_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from iblutil.util import flatten, ensure_list

from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH
from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -747,6 +747,38 @@ def cleanUp(self):
os.unlink(file)


class RemoteEC2DataHandler(DataHandler):
def __init__(self, session_path, signature, one=None):
"""
Data handler for running tasks on remote compute node. Will download missing data via http using ONE
:param session_path: path to session
:param signature: input and output file signatures
:param one: ONE instance
"""
super().__init__(session_path, signature, one=one)

def setUp(self):
"""
Function to download necessary data to run tasks using ONE
:return:
"""
df = super().getData()
self.one._check_filesystem(df)

def uploadData(self, outputs, version, **kwargs):
"""
Function to upload and register data of completed task via FTP patcher
:param outputs: output files from task to register
:param version: ibllib version
:return: output info of registered datasets
"""
versions = super().uploadData(outputs, version)
s3_patcher = S3Patcher(one=self.one)
return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user,
versions=versions, **kwargs)


class RemoteHttpDataHandler(DataHandler):
def __init__(self, session_path, signature, one=None):
"""
Expand Down
56 changes: 54 additions & 2 deletions ibllib/oneibl/patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import globus_sdk
import iblutil.io.params as iopar
from iblutil.util import ensure_list
from one.alf.files import get_session_path, add_uuid_string
from one.alf.files import get_session_path, add_uuid_string, full_path_parts
from one.alf.spec import is_uuid_string, is_uuid
from one import params
from one.webclient import AlyxClient
from one.converters import path_from_dataset
from one.remote import globus
from one.remote.aws import url2uri
from one.remote.aws import url2uri, get_s3_from_alyx

from ibllib.oneibl.registration import register_dataset

Expand Down Expand Up @@ -633,3 +633,55 @@ def _scp(self, local_path, remote_path, dry=True):
def _rm(self, flatiron_path, dry=True):
raise PermissionError("This Patcher does not have admin permissions to remove data "
"from the FlatIron server")


class S3Patcher(Patcher):

def __init__(self, one=None):
assert one
super().__init__(one=one)
self.s3_repo = 's3_patcher'
self.s3_path = 'patcher'

# Instantiate boto connection
self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo)

def check_datasets(self, file_list):
# Here we want to check if the datasets exist, if they do we don't want to patch unless we force.
exists = []
for file in file_list:
collection = full_path_parts(file, as_dict=True)['collection']
dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name,
collection=collection, clobber=True)
if len(dset) > 0:
exists.append(file)

return exists

def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs):

exists = self.check_datasets(file_list)
if len(exists) > 0 and not force:
_logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True')
return

response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs)
# TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires
# changing the the alyx.data.register_view
for ds in response:
frs = ds['file_records']
fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
# Update the flatiron file record to be false
self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
data={'exists': False})

def _scp(self, local_path, remote_path, dry=True):

aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT))
_logger.info(f'Transferring file {local_path} to {aws_remote_path}')
self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path)))

return 0, ''

def _rm(self, *args, **kwargs):
raise PermissionError("This Patcher does not have admin permissions to remove data.")
2 changes: 2 additions & 0 deletions ibllib/pipes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ def get_data_handler(self, location=None):
dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one)
elif location == 'popeye':
dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one)
elif location == 'ec2':
dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one)
else:
raise ValueError(f'Unknown location "{location}"')
return dhandler
Expand Down

0 comments on commit 2204702

Please sign in to comment.