-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add aws utils #187
base: master
Are you sure you want to change the base?
Add aws utils #187
Changes from 24 commits
b5cacaf
c28a842
5bd4f0d
97e952f
6f5070e
9e6bcbf
3b04c1d
85d0dc7
b4b6d49
69f20ec
a51f672
ec33b1b
8211ac9
6b0d92a
560bd0f
f0c461b
7f51861
94bbc81
819a73e
694a5d1
b94771d
4875658
191da5e
57f4c91
3c48aa4
782cb0c
535ab07
edd309c
e40114e
bccd7b7
673d6ab
308ee30
7e55a28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
""" Functions to support general aws operations | ||
|
||
Function names should generally follow a convention to include a short string | ||
to indicate the service that they are designed to operate with eg. 's3' for AWS s3. | ||
""" | ||
import boto3 | ||
import logging | ||
import mimetypes | ||
|
||
from botocore.exceptions import ClientError | ||
# from .exceptions import () | ||
from .s3_utils import s3Utils | ||
|
||
########################### | ||
# Config | ||
########################### | ||
logging.basicConfig() | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def s3_bucket_head(*, bucket_name, s3=None): | ||
""" Gets head info for a bucket if it exists | ||
|
||
:param bucket_name: name of the bucket - string | ||
:param s3: AWS s3 client | ||
:return: dict: head response or None | ||
""" | ||
try: | ||
s3 = s3 or s3Utils().s3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would not do this this way as it will implicitly pull in a bunch of other things when calling |
||
info = s3.head_bucket(Bucket=bucket_name) | ||
return info | ||
except ClientError: | ||
return None | ||
|
||
|
||
def s3_bucket_exists(*, bucket_name, s3=None): | ||
""" Does a bucket exist? | ||
|
||
:param bucket_name: name of the bucket - string | ||
:param s3: AWS s3 client | ||
:return: boolean - True if exists, False if not | ||
""" | ||
return bool(s3_bucket_head(bucket_name=bucket_name, s3=s3)) | ||
|
||
|
||
def s3_bucket_object_count(bucket_name): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're using the |
||
""" Number of objects in the given s3 bucket | ||
|
||
NB: this works with locally stored credentials - not sure | ||
if something needs to be added or if it is possible to create | ||
a resource from a client or provide credentials if not locally stored | ||
|
||
:param bucket_name: name of the bucket - string | ||
:return: int - number of objects in bucket | ||
""" | ||
bucket = boto3.resource('s3').Bucket(bucket_name) | ||
# get only head of objects so we can count them | ||
return sum(1 for _ in bucket.objects.all()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work recursively or will it only count folders at top level? Thinking about how it would count things in our files/wfoutput buckets for example which all have a folder based hierarchy. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless I'm missing something s3 does not store objects with a hierarchy (folders are just a convenience in console display?) so this would count all objects in the given bucket regardless of folder. I expect if empty folders were created then these would be included in the count so maybe checking that 0 size objects are not included may be warranted? If I am correct about this then what may indeed be useful is to support passing in a prefix and only return a count of those objects that share said prefix - for example in the dcic account in the '4dn-dcic-open-data-transfer-logs' bucket if a parameter '2022-08-01' was provided then a count of the 32 object in that folder/with that shared prefix would be returned. |
||
|
||
|
||
def s3_object_head(*, object_key, bucket_name, s3=None): | ||
""" Gets head info for a object if it exists in provided bucket | ||
|
||
:param object_key: key for the object - string | ||
:param bucket_name: name of the bucket to check for the object - string | ||
:param s3: AWS s3 client | ||
:return: dict - head response or None | ||
""" | ||
try: | ||
s3 = s3 or s3Utils().s3 | ||
info = s3.head_object(Bucket=bucket_name, Key=object_key) | ||
return info | ||
except ClientError: | ||
return None | ||
|
||
|
||
def s3_object_exists(*, object_key, bucket_name, s3=None): | ||
""" Does an object exist in the given bucket? | ||
|
||
:param object_key: key for the object - string | ||
:param bucket_name: name of the bucket - string | ||
:param s3: AWS s3 client | ||
:return: boolean - True if exists, False if not | ||
""" | ||
return bool(s3_object_head(object_key=object_key, bucket_name=bucket_name, s3=s3)) | ||
|
||
|
||
def s3_put_object(*, object_key, obj, bucket_name, acl=None, s3=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should handle the |
||
""" Add an object to the given bucket | ||
|
||
NB: add specfic upload functions that use this? | ||
|
||
:param object_key: key for the object - string | ||
:param obj: object data - bytes or seekable file-like object | ||
:param bucket_name: name of the bucket to check for the object - string | ||
:param acl: The (optional) canned ACL to apply to the object. | ||
:param s3: AWS s3 client | ||
:return: ETag of the put object | ||
""" | ||
s3 = s3 or s3Utils().s3 | ||
# try to guess content type | ||
content_type = mimetypes.guess_type(object_key)[0] | ||
if content_type is None: | ||
content_type = 'binary/octet-stream' | ||
# TODO: ? do we want to calc md5sum and check against that to ensure full upload ? | ||
# perhaps as optional parameter | ||
try: | ||
if acl: | ||
return s3.put_object(Bucket=bucket_name, | ||
Key=object_key, | ||
Body=obj, | ||
ContentType=content_type, | ||
ACL=acl) | ||
else: | ||
return s3.put_object(Bucket=bucket_name, | ||
Key=object_key, | ||
Body=obj, | ||
ContentType=content_type) | ||
except ClientError: | ||
# how to handle errors here? | ||
return None | ||
|
||
|
||
def delete_mark_s3_object(*, object_key, bucket_name, s3=None): | ||
""" Delete Mark an object in the given bucket | ||
Versioning must be enabled on the bucket | ||
|
||
:param object_key: key for the object - string | ||
:param bucket_name: name of the bucket - string | ||
:param s3: AWS s3 client | ||
:return: string - versionId of the delete marker | ||
""" | ||
s3 = s3 or s3Utils().s3 | ||
try: | ||
# Check that versioning is enabled | ||
if not s3.get_bucket_versioning(Bucket=bucket_name).get('Status') == 'Enabled': | ||
# TODO: This error will not be caught and will just be propagated out. Is that OK? -kmp 14-Sep-2021 | ||
raise RuntimeError(f"versioning is disabled on {bucket_name} - cannot delete mark {object_key}") | ||
return s3.delete_object(Bucket=bucket_name, Key=object_key) | ||
except ClientError: | ||
return None | ||
|
||
|
||
def delete_s3_object_version(*, object_key, bucket_name, version_id=None, s3=None): | ||
""" Delete the version of an object in the given bucket if the bucket is version enabled | ||
Or delete the object if is in an unversioned bucket. If you do not provide a | ||
version_id and a version enabled bucket an Exception is raised. 'null' is returned | ||
as the version_id for an version disabled bucket delete | ||
NB: providing 'null' as version_id is allowed for version disable buckets | ||
NB: This is currently agnostic as to whether the object exists or not | ||
|
||
:param object_key: key for the object - string | ||
:param bucket_name: name of the bucket - string | ||
:param version_id: version id for version to delete - string | ||
:param s3: AWS s3 client | ||
:return: string - versionId of the deleted version | ||
""" | ||
s3 = s3 or s3Utils().s3 | ||
try: | ||
versioning = s3.get_bucket_versioning(Bucket=bucket_name).get('Status') | ||
except (ClientError, AttributeError) as e: | ||
logger.error(str(e)) | ||
return None | ||
|
||
try: | ||
if versioning == 'Enabled' and version_id and version_id != 'null': | ||
logger.info(f"Deleting version {version_id} of object {object_key} from version enabled {bucket_name}") | ||
res = s3.delete_object(Bucket=bucket_name, Key=object_key, VersionId=version_id) | ||
elif not version_id or version_id == 'null': | ||
logger.info(f"Deleting object {object_key} from version disabled {bucket_name}") | ||
res = s3.delete_object(Bucket=bucket_name, Key=object_key) | ||
else: | ||
# TODO: You need to do something here for two reasons: | ||
# (1) You probably don't want to fall through as success. | ||
# (2) You use the res variable below, so if you do fall through, you have to assign it in this branch. | ||
# -kmp 14-Sep-2021 | ||
raise ValueError(f"Incompatible arguments: versioning={versioning!r}, version_id={version_id!r}") | ||
except ClientError as e: | ||
logger.error(str(e)) | ||
return None | ||
|
||
if res.get('ResponseMetadata').get('HTTPStatusCode') == 204: | ||
# the object.version is no longer in the bucket (or maybe never was) | ||
if 'VersionId' in res: | ||
return res.get('VersionId') | ||
return 'null' # TODO: Is 'null' really right here? Is that not supposed to be None? | ||
else: | ||
# what's a good thing to do here? logging, raise exception | ||
# TODO: There are situations above where you log and return None as if caller is expecting no error. | ||
# Consistency may be the way to go here? Not sure. | ||
# If you do raise something, it should be an exception, not a string. I added RuntimeError in | ||
# the commented-out part here. -kmp 14-Sep-2021 | ||
# raise RuntimeError(f"Unexpected response status - {res}") | ||
# return None | ||
logger.info(f"Unexpected response status - {res}") | ||
return None | ||
|
||
|
||
def delete_s3_object_completely(*, object_key, bucket_name, s3): | ||
""" Delete all the versions of an object in the given bucket | ||
|
||
:param object_key: key for the object - string | ||
:param bucket_name: name of the bucket - string | ||
:param s3: AWS s3 client | ||
:return: boolean - True if all expected versions were deleted | ||
""" | ||
s3 = s3 or s3Utils().s3 | ||
expected_cnt = None | ||
deleted_cnt = 0 | ||
if s3.get_bucket_versioning(Bucket=bucket_name).get('Status') == 'Disabled': | ||
expected_cnt = 1 | ||
if delete_s3_object_version(object_key=object_key, bucket_name=bucket_name, s3=s3): | ||
deleted_cnt += 1 | ||
else: | ||
ver_res = s3.list_object_versions(Bucket=bucket_name, Prefix=object_key) | ||
if ver_res.get('ResponseMetadata').get('HTTPStatusCode') == 200: | ||
if ver_res.get('ResponseMetadata').get('IsTruncated'): | ||
logger.warning(f"Too many versions of {object_key} in {bucket_name} - incomplete delete") | ||
delete_markers = ver_res.get('DeleteMarkers', []) | ||
versions = ver_res.get('Versions', []) | ||
versions.extend(delete_markers) | ||
expected_cnt = len(versions) | ||
for version in versions: | ||
version_id = version.get('VersionId') | ||
res = delete_s3_object_version(object_key=object_key, bucket_name=bucket_name, version_id=version_id, | ||
s3=s3) | ||
if not res: | ||
logger.warning(f"Problem with delete of {object_key} - version id {version_id} from {bucket_name}") | ||
else: | ||
deleted_cnt += 1 | ||
if expected_cnt: | ||
if expected_cnt == deleted_cnt: | ||
logger.info(f"Deleted {deleted_cnt} versions of {object_key} from {bucket_name}") | ||
return True | ||
else: | ||
logger.warning(f"Expected to delete {expected_cnt} and DELETED {deleted_cnt}") | ||
return False | ||
return False | ||
|
||
# def read_s3_object(): | ||
# pass | ||
# response = self.s3.get_object(Bucket=self.outfile_bucket, Key=filename) | ||
# logger.info(str(response)) | ||
# return response['Body'].read() | ||
# | ||
# def s3_read_dir(self, prefix): | ||
# return self.s3.list_objects(Bucket=self.outfile_bucket, Prefix=prefix) | ||
# | ||
# def s3_delete_dir(self, prefix): | ||
# # one query get list of all the files we want to delete | ||
# obj_list = self.s3.list_objects(Bucket=self.outfile_bucket, Prefix=prefix) | ||
# files = obj_list.get('Contents', []) | ||
# | ||
# # morph file list into format that boto3 wants | ||
# delete_keys = {'Objects': [{'Key': k} | ||
# for k in [obj['Key'] | ||
# for obj in files]]} | ||
# | ||
# # second query deletes all the files, NOTE: Max 1000 files | ||
# if delete_keys['Objects']: | ||
# self.s3.delete_objects(Bucket=self.outfile_bucket, Delete=delete_keys) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[tool.poetry] | ||
name = "dcicutils" | ||
version = "5.2.1" | ||
version = "5.2.1b" | ||
description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources" | ||
authors = ["4DN-DCIC Team <[email protected]>"] | ||
license = "MIT" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would reorder arguments here so
s3
comes first, and rename its3_client