From 0e15a7466cd30fbac0740c7dd0b4c990356b5dc2 Mon Sep 17 00:00:00 2001 From: Dan Straw Date: Sun, 4 Feb 2024 16:20:20 +0000 Subject: [PATCH] Adding a pattern to copy an object between two S3 buckets using S3 Event Notification to SQS to EventBridge Pipes to Step Function resolves: #2086 --- s3-sqs-eventbridge-pipe-sfn-s3/README.md | 90 +++++++ .../s3-sqs-eventbridge-pipe-sfn-s3.json | 61 +++++ s3-sqs-eventbridge-pipe-sfn-s3/template.yaml | 239 ++++++++++++++++++ .../s3-sqs-eventbridge-pipe-sfn-s3.asl.json | 58 +++++ 4 files changed, 448 insertions(+) create mode 100644 s3-sqs-eventbridge-pipe-sfn-s3/README.md create mode 100644 s3-sqs-eventbridge-pipe-sfn-s3/s3-sqs-eventbridge-pipe-sfn-s3.json create mode 100644 s3-sqs-eventbridge-pipe-sfn-s3/template.yaml create mode 100644 s3-sqs-eventbridge-pipe-sfn-s3/workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json diff --git a/s3-sqs-eventbridge-pipe-sfn-s3/README.md b/s3-sqs-eventbridge-pipe-sfn-s3/README.md new file mode 100644 index 000000000..760d4be47 --- /dev/null +++ b/s3-sqs-eventbridge-pipe-sfn-s3/README.md @@ -0,0 +1,90 @@ +# Copying a file from S3 to S3 via Eventbridge Pipes & Step Function + +This pattern shows how to use S3 Event Notifications, queue them on SQS, and then use EventBridge Pipes to launch a +StepFunctions state machine and copy the file from the source S3 bucket to a destination. Modifying the state machine +would allow manipulation of the file. + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +2. Change directory to the pattern directory: + ``` + cd s3-sqs-eventbridge-pipe-sfn-s3 + ``` +3. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file: + ``` + sam deploy --guided + ``` +4. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you + can use `sam deploy` in future to use these defaults. + +5. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +When a file is created in the source S3 bucket, an +[S3 Event Notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html) fires with +[this structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). +This is enqueued onto a standard Amazon SQS Queue. + +The EventBridge service polls the SQS Queue and invokes the EventBridge Pipe synchronously with an event that contains queue +messages. EventBridge reads messages in batches and invokes the pipe once for each batch. When the pipe successfully +processes a batch, EventBridge deletes its messages from the queue. +The structure of the batch is [described here](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html) + +The EventBridge Pipe then executes the +[AWS Step Functions Express Workflow](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html) +state machine +[described in s3-sqs-eventbridge-pipe-sfn-s3.asl.json](./workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json). +This loops through each message in the batch dequeued from SQS using a Map state, loops through each record from the S3 +Event Notification using another Map state, and then copies the file from the source S3 Bucket to the destination S3 +Bucket. In a real world scenario the state machine would be modified to manipulate the file as desired. + + +## Testing + +Provide steps to trigger the integration and show what should be observed if successful. + +1. Stream logs from StepFunctions LogGroup + +``` +sam logs --cw-log-group --tail +``` + +2. Put a file into the source bucket + +``` +aws s3api put-object --bucket --key --body +``` + +3. Observe the logs for the new execution. + +## Cleanup + +1. Delete the stack + + ```bash + sam delete + ``` + +---- +Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/s3-sqs-eventbridge-pipe-sfn-s3/s3-sqs-eventbridge-pipe-sfn-s3.json b/s3-sqs-eventbridge-pipe-sfn-s3/s3-sqs-eventbridge-pipe-sfn-s3.json new file mode 100644 index 000000000..2d482265e --- /dev/null +++ b/s3-sqs-eventbridge-pipe-sfn-s3/s3-sqs-eventbridge-pipe-sfn-s3.json @@ -0,0 +1,61 @@ +{ + "title": "Copy an object between two S3 buckets using S3 Event Notification to SQS to EventBridge Pipes to Step Function", + "description": "Create a Step Functions workflow to query Amazon Athena.", + "language": "", + "level": "200", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This sample project demonstrates how to copy an object between two S3 buckets and manipulate it in transit.", + "When an object is created in the source S3 bucket, an S3 Event Notification is fires and enqueued onto an SQS Queue.", + "The EventBridge service polls the SQS Queue and invokes an EventBridge pipe.", + "The EventBridge Pipe then executes an AWS Step Functions Express Workflow state machine.", + "This copies the file from the source S3 Bucket to the destination S3 Bucket.", + "In a real world scenario the state machine would be modified to manipulate the file as desired" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/s3-sqs-eventbridge-pipe-sfn-s3", + "templateURL": "serverless-patterns/s3-sqs-eventbridge-pipe-sfn-s3", + "projectFolder": "s3-sqs-eventbridge-pipe-sfn-s3", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Configuring an S3 bucket for notifications", + "link": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html" + }, + { + "text": "Amazon Simple Queue Service as an EventBridge Pipe source", + "link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html" + } + ] + }, + "deploy": { + "text": [ + "sam deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Dan Straw", + "image": "https://avatars.githubusercontent.com/u/271028", + "bio": "Senior Solutions Architect at AWS", + "linkedin": "https://www.linkedin.com/in/danstraw/" + } + ] +} diff --git a/s3-sqs-eventbridge-pipe-sfn-s3/template.yaml b/s3-sqs-eventbridge-pipe-sfn-s3/template.yaml new file mode 100644 index 000000000..8f71705d8 --- /dev/null +++ b/s3-sqs-eventbridge-pipe-sfn-s3/template.yaml @@ -0,0 +1,239 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Serverless patterns - S3 to SQS to EventBridge Pipes to Step Function to S3 + +Parameters: + SourceBucketName: + Type: String + DestinationBucketName: + Type: String + +Resources: + ## Source S3 bucket + SourceBucket: + Type: AWS::S3::Bucket + DependsOn: + - NotificationQueuePolicy + Properties: + BucketName: !Ref SourceBucketName + NotificationConfiguration: + QueueConfigurations: + - Event: "s3:ObjectCreated:*" + Queue: !GetAtt NotificationQueue.Arn + + ## Destination S3 Bucket + DestinationBucket: + Type: AWS::S3::Bucket + Properties: + BucketName: !Ref DestinationBucketName + + ## SQS Queue + NotificationQueue: + Type: AWS::SQS::Queue + Properties: + RedrivePolicy: + deadLetterTargetArn: !GetAtt NotificationQueueDLQ.Arn + maxReceiveCount: 5 + + # DLQ for source + NotificationQueueDLQ: + Type: AWS::SQS::Queue + + ## Policy allowing S3 Event Notifications to be in SQS + NotificationQueuePolicy: + Type: AWS::SQS::QueuePolicy + Properties: + PolicyDocument: + Version: "2012-10-17" + Id: QueuePolicy + Statement: + - Sid: Allow-SendMessage-To-Queue-From-S3-Event-Notification + Effect: Allow + Principal: + Service: "s3.amazonaws.com" + Action: + - "sqs:SendMessage" + Resource: !GetAtt NotificationQueue.Arn + Condition: + ArnLike: + aws:SourceArn: !Join [ "",[ 'arn:aws:s3:::',!Ref SourceBucketName ] ] + StringEquals: + aws:SourceAccount: !Ref AWS::AccountId + Queues: + - Ref: NotificationQueue + + # Logs for EventBridge Pipe + EventBridgePipeLogGroup: + Type: AWS::Logs::LogGroup + Properties: + RetentionInDays: 7 + LogGroupName: /s3-sqs-eventbridge-pipe-sfn-s3/EventBridgePipe + + # Eventbridge Pipe Definition + S3FileCopy: + Type: AWS::Pipes::Pipe + Properties: + RoleArn: !GetAtt EventBridgePipesRole.Arn + Name: s3-sqs-eventbridge-pipe-sfn-s3 + DesiredState: RUNNING + Source: !GetAtt NotificationQueue.Arn + SourceParameters: + SqsQueueParameters: + BatchSize: 1 + LogConfiguration: + CloudwatchLogsLogDestination: + LogGroupArn: !GetAtt EventBridgePipeLogGroup.Arn + IncludeExecutionData: + - ALL + Level: TRACE + Target: !Ref TargetStateMachine + + ## Role for EventBridge Pipes to read from SQS and execute Step Function + EventBridgePipesRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - pipes.amazonaws.com + Action: + - sts:AssumeRole + Policies: + - PolicyName: CloudWatchLogs + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - 'logs:CreateLogDelivery' + - 'logs:GetLogDelivery' + - 'logs:UpdateLogDelivery' + - 'logs:DeleteLogDelivery' + - 'logs:ListLogDeliveries' + - 'logs:PutResourcePolicy' + - 'logs:DescribeResourcePolicies' + - 'logs:DescribeLogGroups' + Resource: '*' + - PolicyName: ReadSQS + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - 'sqs:ReceiveMessage' + - 'sqs:DeleteMessage' + - 'sqs:GetQueueAttributes' + Resource: !GetAtt NotificationQueue.Arn + - PolicyName: ExecuteSFN + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - 'states:StartExecution' + - 'states:StartSyncExecution' + Resource: !Ref TargetStateMachine + + # Logs for StepFunctions + TargetStateMachineLogGroup: + Type: AWS::Logs::LogGroup + Properties: + RetentionInDays: 7 + LogGroupName: /s3-sqs-eventbridge-pipe-sfn-s3/StateMachine + + # Execution Role for StepFunctions + TargetStateMachineRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - states.amazonaws.com + Action: + - sts:AssumeRole + Policies: + - PolicyName: CloudWatchLogs + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 'logs:CreateLogDelivery' + - 'logs:GetLogDelivery' + - 'logs:UpdateLogDelivery' + - 'logs:DeleteLogDelivery' + - 'logs:ListLogDeliveries' + - 'logs:PutResourcePolicy' + - 'logs:DescribeResourcePolicies' + - 'logs:DescribeLogGroups' + Resource: '*' + - PolicyName: SourceBucketList + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 's3:ListBucket' + Resource: !GetAtt SourceBucket.Arn + - PolicyName: SourceBucketReadOnly + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 's3:Get*' + - 's3:List*' + - 's3:Describe*' + Resource: !Join + - '' + - - !GetAtt SourceBucket.Arn + - /* + - PolicyName: DestinationBucketList + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 's3:ListBucket' + Resource: !GetAtt DestinationBucket.Arn + - PolicyName: DestinationBucketReadWrite + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 's3:*Object' + Resource: !Join + - '' + - - !GetAtt DestinationBucket.Arn + - /* + + # Step Function Definition + TargetStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Type: EXPRESS + DefinitionUri: workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json + DefinitionSubstitutions: + DestinationBucketName: !Ref DestinationBucketName + Logging: + Destinations: + - CloudWatchLogsLogGroup: + LogGroupArn: !GetAtt TargetStateMachineLogGroup.Arn + Level: ALL + IncludeExecutionData: true + Role: !GetAtt TargetStateMachineRole.Arn + +Outputs: + SourceBucketName: + Value: !Ref SourceBucketName + Description: S3 Bucket for object storage + DestinationBucketName: + Value: !Ref DestinationBucketName + Description: S3 destination Bucket for object storage \ No newline at end of file diff --git a/s3-sqs-eventbridge-pipe-sfn-s3/workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json b/s3-sqs-eventbridge-pipe-sfn-s3/workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json new file mode 100644 index 000000000..4c35e46fd --- /dev/null +++ b/s3-sqs-eventbridge-pipe-sfn-s3/workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json @@ -0,0 +1,58 @@ +{ + "Comment": "Copies file from source to destination bucket", + "StartAt": "Loop through each SQS Message", + "States": { + "Loop through each SQS Message": { + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Extract and decode S3 Event Notification JSON", + "States": { + "Extract and decode S3 Event Notification JSON": { + "Type": "Pass", + "Next": "Loop through each S3 Event Notification Record", + "Parameters": { + "Body.$": "States.StringToJson($.body)" + } + }, + "Loop through each S3 Event Notification Record": { + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Get S3 Object from Source Bucket", + "States": { + "Get S3 Object from Source Bucket": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.s3.bucket.name", + "Key.$": "$.s3.object.key" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:getObject", + "Next": "Write S3 Object to Destination Bucket", + "ResultPath": "$.Object" + }, + "Write S3 Object to Destination Bucket": { + "Type": "Task", + "End": true, + "Parameters": { + "Body.$": "$.Object.Body", + "Bucket": "${DestinationBucketName}", + "Key.$": "$.s3.object.key" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:putObject" + } + } + }, + "ItemsPath": "$.Body.Records", + "End": true + } + } + }, + "End": true + } + } +} \ No newline at end of file