Skip to content

Commit

Permalink
Delete S3 files
Browse files Browse the repository at this point in the history
  • Loading branch information
k1o0 committed Sep 11, 2023
1 parent d97ccd1 commit 64e72f3
Showing 1 changed file with 54 additions and 13 deletions.
67 changes: 54 additions & 13 deletions ibllib/oneibl/patcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import ftplib
from pathlib import Path, PurePosixPath, WindowsPath
from collections import defaultdict
from itertools import groupby, starmap
from itertools import starmap
from subprocess import Popen, PIPE, STDOUT
from urllib.parse import urlparse
import subprocess
import logging
from getpass import getpass
Expand Down Expand Up @@ -32,6 +34,13 @@
SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp')


def url2uri(data_path):
parsed = urlparse(data_path)
assert parsed.netloc and parsed.scheme and parsed.path
bucket_name = parsed.netloc.split('.')[0]
return f's3://{bucket_name}{parsed.path}'


def _run_command(cmd, dry=True):
_logger.info(cmd)
if dry:
Expand Down Expand Up @@ -387,23 +396,55 @@ def delete_dataset(self, dataset, dry=False):
else:
did = dataset['url'].split('/')[-1]

files_by_repo = defaultdict(list) # uuid.UUID -> [pathlib.PurePosixPath]
def is_aws(repository_name):
return repository_name.startswith('aws_')

files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath]
s3_files = []
file_records = filter(lambda x: x['exists'], dataset['file_records'])
for repo, record in groupby(file_records, lambda x: x['data_repository']):
if not record['globus_id']:
raise NotImplementedError
if repo not in self.endpoints:
self.add_endpoint(repo, alyx=self.alyx)
filepath = PurePosixPath(record['relative_path'])
if 'flatiron' in repo:
filepath = add_uuid_string(filepath, did)
files_by_repo[repo].append(filepath)
for record in file_records:
repo = self.repo_from_alyx(record['data_repository'], self.alyx)
# Handle S3 files
if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver':
if is_aws(repo['name']):
s3_files.append(url2uri(record['data_url']))
files_by_repo[repo['name']].append(PurePosixPath(record['relative_path']))
else:
_logger.error('Unable to delete from %s', repo['name'])
else:
# Handle Globus files
if repo['name'] not in self.endpoints:
self.add_endpoint(repo['name'], alyx=self.alyx)
filepath = PurePosixPath(record['relative_path'])
if 'flatiron' in repo['name']:
filepath = add_uuid_string(filepath, did)
files_by_repo[repo['name']].append(filepath)

# Remove S3 files
if s3_files:
cmd = ['aws', 's3', 'rm', *s3_files, '--profile', 'ibladmin']
if dry:
cmd.append('--dryrun')
if _logger.level > logging.DEBUG:
log_function = _logger.error
cmd.append('--only-show-errors') # Suppress verbose output
else:
log_function = _logger.debug
cmd.append('--no-progress') # Suppress progress info, estimated time, etc.
_logger.debug(' '.join(cmd))
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
with process.stdout:
for line in iter(process.stdout.readline, b''):
log_function(line.decode().strip())
assert process.wait() == 0

if dry:
return [], files_by_repo

# Delete the files
task_ids = list(starmap(self.delete_data, files_by_repo.items()))
# Remove Globus files
globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items())
task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map)))

# Delete the dataset from Alyx
self.alyx.rest('datasets', 'delete', id=did)
return task_ids, files_by_repo
Expand Down

0 comments on commit 64e72f3

Please sign in to comment.