diff --git a/hamlet/s3support/extensions/s3_inventory_copy_batch/extension.ftl b/hamlet/s3support/extensions/s3_inventory_copy_batch/extension.ftl index 6157d17..d8ef2ad 100644 --- a/hamlet/s3support/extensions/s3_inventory_copy_batch/extension.ftl +++ b/hamlet/s3support/extensions/s3_inventory_copy_batch/extension.ftl @@ -37,7 +37,8 @@ [@Settings [ - "DESTINATION_BUCKET_NAME" + "DESTINATION_BUCKET_NAME", + "MOVE_OBJECTS" ] /] diff --git a/hamlet/s3support/extensions/s3_inventory_copy_event/extension.ftl b/hamlet/s3support/extensions/s3_inventory_copy_event/extension.ftl index 87dc461..8ff9ad5 100644 --- a/hamlet/s3support/extensions/s3_inventory_copy_event/extension.ftl +++ b/hamlet/s3support/extensions/s3_inventory_copy_event/extension.ftl @@ -24,6 +24,7 @@ deploymentFramework=CLOUD_FORMATION_DEPLOYMENT_FRAMEWORK /] + [#local s3BatchRoleId = formatResourceId(AWS_IAM_ROLE_RESOURCE_TYPE, occurrence.Core.Id, "s3", "batchoperations") ] [#local s3BatchPolicies = getLinkTargetsOutboundRoles(_context.Links) ] @@ -52,6 +53,14 @@ ) /] + [@Policy + getPolicyStatement( + [ + "s3:CreateJob" + ] + ) + /] + [@Settings [ "S3_BATCH_JOB_LAMBDA_ARN", diff --git a/hamlet/s3support/modules/s3_inventory_copy/module.ftl b/hamlet/s3support/modules/s3_inventory_copy/module.ftl index 45b652a..e7cbaad 100644 --- a/hamlet/s3support/modules/s3_inventory_copy/module.ftl +++ b/hamlet/s3support/modules/s3_inventory_copy/module.ftl @@ -23,6 +23,12 @@ "Types" : STRING_TYPE, "Default" : "default" }, + { + "Names" : "moveObjects", + "Description" : "Once the file has been copied removed the original", + "Types" : BOOLEAN_TYPE, + "Default" : false + }, { "Names" : "s3KeyPrefix", "Description" : "Creates a key prefix based on the deployment context", @@ -103,6 +109,7 @@ instance sourceBucketLink destinationBucketLink + moveObjects s3KeyPrefix s3KeySuffix s3InventoryPrefix @@ -149,7 +156,8 @@ "Prefix" : s3KeyPrefix, "Suffix" : s3KeySuffix } - } + }, + "MOVE_OBJECTS" : moveObjects } } ] @@ -181,6 +189,11 @@ "s3event": { "Handler": "src/lambda.s3event_lambda_handler", "Extensions": [ "_noenv", "_s3_inventory_copy_event" ], + "Environment" : { + "Json" : { + "Escaped" : false + } + }, "Links" : { "S3_BATCH_JOB_LAMBDA" : { "Tier" : tier, diff --git a/s3-inventory-copy/src/lambda.py b/s3-inventory-copy/src/lambda.py index a3dd28b..d0f33bb 100644 --- a/s3-inventory-copy/src/lambda.py +++ b/s3-inventory-copy/src/lambda.py @@ -1,8 +1,11 @@ import os import boto3 import urllib +import logging from botocore.exceptions import ClientError +logger = logging.getLogger() +logger.setLevel(logging.INFO) def s3event_lambda_handler(event, context): ''' @@ -25,34 +28,51 @@ def s3event_lambda_handler(event, context): # Only trigger a new batch operation when we have a new checksum if key.endswith('manifest.checksum') and event == 'ObjectCreated:Put': + logger.info('manfifest checksum file - submitting batch job') manifest_key = os.path.splitext(key)[0] + '.json' - manifest_arn = bucket_arn + os.path.abspath(manifest_key) - manifest_details = s3Client.head_object( - Bucket=bucket, - Key=manifestKey - ) - - manifest_etag = manifest_details.ETag - - job_response = s3ControlClient.create_job( - Operation={ - 'LambdaInvoke' : { - 'FunctionArn' : lambda_arn - } - }, - Manifest={ - 'Spec' : { - 'Format' : 'S3InventoryReport_CSV_20161130', + manifest_arn = f'{bucket_arn}/{manifest_key}' + + try: + manifest_details = s3Client.head_object( + Bucket=bucket, + Key=manifest_key + ) + manifest_etag = manifest_details['ETag'] + + except Exception as e: + logger.fatal(str(e)) + raise e + + try: + job_response = s3ControlClient.create_job( + AccountId=boto3.client('sts').get_caller_identity()['Account'], + ConfirmationRequired=False, + Operation={ + 'LambdaInvoke' : { + 'FunctionArn' : lambda_arn + } }, - 'Location' : { - 'ObjectArn' : manifest_arn, - 'ETag' : manifest_etag + Manifest={ + 'Spec' : { + 'Format' : 'S3InventoryReport_CSV_20161130', + }, + 'Location' : { + 'ObjectArn' : manifest_arn, + 'ETag' : manifest_etag + }, }, - 'Description' : f'Rename using file suffix - {manifest_key}', - 'Priority' : batch_priority, - 'RoleArn' : batch_role_arn - } - ) + Report={ + 'Enabled' : False + }, + RoleArn=batch_role_arn, + Description=f'Rename using file suffix - {manifest_key}', + Priority=batch_priority, + ) + logger.info(f'batch job id: {job_response["JobId"]}') + + except Exception as e: + logger.fatal(str(e)) + raise e def s3batch_lambda_handler(event, context): @@ -89,15 +109,27 @@ def s3batch_lambda_handler(event, context): resultString = None # Construct New Key - newKey = rename_key(s3Key) - newBucket = os.environ.get('DESTINAION_BUCKET_NAME') + new_key = rename_key(s3Key) + new_bucket = os.environ.get('DESTINATION_BUCKET_NAME') # Copy Object to New Bucket response = s3Client.copy_object( CopySource = copySrc, - Bucket = newBucket, - Key = newKey + Bucket = new_bucket, + Key = new_key ) + logger.info(f'Copying file from {copySrc} -> s3://{new_bucket}/{new_key}') + + # Delete the original object if move objects is enabled + if str(os.environ.get('MOVE_OBJECTS')).lower() == 'true' and response is not None: + + # Avoid copies to the same location from being removed + if s3Bucket != new_bucket and s3Key != new_key: + response = s3Client.delete_object( + Bucket=s3Bucket, + Key=s3Key, + ) + logger.info(f'removing file s3://{s3Bucket}/{s3Key}') # Mark as succeeded resultCode = 'Succeeded' @@ -114,10 +146,12 @@ def s3batch_lambda_handler(event, context): else: resultCode = 'PermanentFailure' resultString = f'{errorCode}: {errorMessage}' + logger.fatal(str(e)) except Exception as e: # Catch all exceptions to permanently fail the task resultCode = 'PermanentFailure' resultString = f'Exception: {e}' + logger.fatal(str(e)) finally: results.append({ 'taskId': taskId,