Skip to content

Commit

Permalink
feat(transform): Add Send for AWS Lambda (#153)
Browse files Browse the repository at this point in the history
* feat(transform): Add Send for AWS Lambda

* docs(examples): Add S3 Retry on Failure
  • Loading branch information
jshlbrd authored Apr 3, 2024
1 parent f9b52fa commit c517ea5
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 8 deletions.
12 changes: 12 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,18 @@
type: 'send_aws_kinesis_data_stream',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(s))),
},
lambda(settings={}): {
local default = {
batch: $.config.batch,
auxiliary_transforms: null,
aws: $.config.aws,
retry: $.config.retry,
function_name: null,
},

type: 'send_aws_lambda',
settings: std.mergePatch(default, $.helpers.abbv(settings)),
},
s3(settings={}): {
local default = {
batch: $.config.batch,
Expand Down
24 changes: 24 additions & 0 deletions examples/terraform/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,30 @@ flowchart LR
sendS3y --> bucket
```

## Retry on Failure

Deploys a data pipeline that reads data from an S3 bucket and automatically retries failed events using an SQS queue as a [failure destination](https://aws.amazon.com/blogs/compute/introducing-aws-lambda-destinations/). This example will retry forever until the error is resolved.

```mermaid
flowchart LR
%% resources
bucket([S3 Bucket])
queue([SQS Queue])
%% connections
bucket --> handler
N -.-> queue
queue --> R
rTransforms --> handler
subgraph N["Substation Node"]
handler[[Handler]] --> transforms[Transforms]
end
subgraph R["Substation Retrier"]
rHandler[[Handler]] --> rTransforms[Transforms]
end
```

## SNS

Deploys a data pipeline that reads data from an S3 bucket via an SNS topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// This config generates an error to engage the retry on failure feature.
// The pipeline will retry forever until the error is resolved. Change the
// transform to `sub.tf.send.stdout()` to resolve the error and print the logs
// from S3.
local sub = import '../../../../../../../build/config/substation.libsonnet';

{
concurrency: 1,
transforms: [
sub.tf.util.err(settings={ message: 'simulating error to trigger retries' }),
// sub.tf.send.stdout(),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This config transforms the failure record sent by the node Lambda function
// so that it becomes a new request. The new request bypasses S3 and is sent
// directly to the Lambda function.
//
// Additional information is available in the payload and can be used to make
// decisions about the new request or notify external systems about the failure.
local sub = import '../../../../../../../build/config/substation.libsonnet';

{
concurrency: 1,
transforms: [
// If needed, then use other information from the failure record to
// decide what to do or notify external systems about the failure.
sub.tf.obj.cp(settings={ object: { source_key: 'requestPayload' } }),
sub.tf.send.aws.lambda(settings={ function_name: 'node' }),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
provider "aws" {
# profile = "default"
region = "us-east-1"
}
57 changes: 57 additions & 0 deletions examples/terraform/aws/s3/retry_on_failure/terraform/_resources.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module "appconfig" {
source = "../../../../../../build/terraform/aws/appconfig"

config = {
name = "substation"
environments = [{name = "example"}]
}
}

module "ecr" {
source = "../../../../../../build/terraform/aws/ecr"

config = {
name = "substation"
force_delete = true
}
}

resource "random_uuid" "s3" {}

# Monolithic S3 bucket used to store all data.
module "s3" {
source = "../../../../../../build/terraform/aws/s3"

config = {
# Bucket name is randomized to avoid collisions.
name = "${random_uuid.s3.result}-substation"
}

access = [
# Reads objects from the bucket.
module.lambda_node.role.name,
]
}

module "sqs" {
source = "../../../../../../build/terraform/aws/sqs"

config = {
name = "substation_retries"

# Delay for 30 seconds to allow the pipeline to recover.
delay = 30

# Timeout should be at least 6x the timeout of any consuming Lambda functions, plus the batch window.
# Refer to the Lambda documentation for more information:
# https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html.
timeout = 60
}

access = [
# Sends messages to the queue.
module.lambda_retrier.role.name,
# Receives messages from the queue.
module.lambda_node.role.name,
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
module "lambda_node" {
source = "../../../../../../build/terraform/aws/lambda"
appconfig = module.appconfig

config = {
name = "node"
description = "Substation node that reads data from S3. The node will retry forever if it fails."
image_uri = "${module.ecr.url}:latest"
image_arm = true

env = {
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/node"
"SUBSTATION_LAMBDA_HANDLER" : "AWS_S3"
"SUBSTATION_DEBUG" : true
}
}

# The retrier Lambda must be able to invoke this
# Lambda function to retry failed S3 events.
access = [
module.lambda_retrier.role.name,
]
}

resource "aws_lambda_permission" "node" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = module.lambda_node.name
principal = "s3.amazonaws.com"
source_arn = module.s3.arn
}

resource "aws_s3_bucket_notification" "node" {
bucket = module.s3.id

lambda_function {
lambda_function_arn = module.lambda_node.arn
events = ["s3:ObjectCreated:*"]
}

depends_on = [
aws_lambda_permission.node
]
}

# Configures the Lambda function to send failed events to the SQS queue.
resource "aws_lambda_function_event_invoke_config" "node" {
function_name = module.lambda_node.name

# This example disables the built-in retry mechanism.
maximum_retry_attempts = 0

destination_config {
on_failure {
destination = module.sqs.arn
}
}
}

module "lambda_retrier" {
source = "../../../../../../build/terraform/aws/lambda"
appconfig = module.appconfig

config = {
name = "retrier"
description = "Substation node that receives events from the retry queue and invokes the original Lambda function."
image_uri = "${module.ecr.url}:latest"
image_arm = true

# This value should be 1/6th of the visibility timeout of the SQS queue.
timeout = 5

env = {
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/retrier"
"SUBSTATION_LAMBDA_HANDLER" : "AWS_SQS"
"SUBSTATION_DEBUG" : true
}
}
}

resource "aws_lambda_event_source_mapping" "retrier" {
event_source_arn = module.sqs.arn
function_name = module.lambda_retrier.arn
maximum_batching_window_in_seconds = 30
batch_size = 10
}
18 changes: 17 additions & 1 deletion internal/aws/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,26 @@ func (a *API) Invoke(ctx aws.Context, function string, payload []byte) (resp *la
InvocationType: aws.String("RequestResponse"),
Payload: payload,
})

if err != nil {
return nil, fmt.Errorf("invoke function %s: %v", function, err)
}

return resp, nil
}

// InvokeAsync is a convenience wrapper for asynchronously invoking a Lambda function.
func (a *API) InvokeAsync(ctx aws.Context, function string, payload []byte) (resp *lambda.InvokeOutput, err error) {
ctx = context.WithoutCancel(ctx)
resp, err = a.Client.InvokeWithContext(
ctx,
&lambda.InvokeInput{
FunctionName: aws.String(function),
InvocationType: aws.String("Event"),
Payload: payload,
})
if err != nil {
return nil, fmt.Errorf("invoke_async function %s: %v", function, err)
}

return resp, nil
}
44 changes: 37 additions & 7 deletions internal/aws/lambda/lambda_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package lambda

import (
"bytes"
"context"
"testing"

Expand All @@ -20,16 +19,16 @@ func (m mockedInvoke) InvokeWithContext(ctx aws.Context, input *lambda.InvokeInp
return &m.Resp, nil
}

func TestPutItem(t *testing.T) {
func TestInvoke(t *testing.T) {
tests := []struct {
resp lambda.InvokeOutput
expected []byte
expected int64
}{
{
resp: lambda.InvokeOutput{
Payload: []byte("foo"),
StatusCode: aws.Int64(200),
},
expected: []byte("foo"),
expected: 200,
},
}

Expand All @@ -45,8 +44,39 @@ func TestPutItem(t *testing.T) {
t.Fatalf("%d, unexpected error", err)
}

if c := bytes.Compare(resp.Payload, test.expected); c != 0 {
t.Errorf("expected %+v, got %s", resp.Payload, test.expected)
if *resp.StatusCode != test.expected {
t.Errorf("expected %+v, got %d", resp.Payload, test.expected)
}
}
}

func TestInvokeAsync(t *testing.T) {
tests := []struct {
resp lambda.InvokeOutput
expected int64
}{
{
resp: lambda.InvokeOutput{
StatusCode: aws.Int64(202),
},
expected: 202,
},
}

ctx := context.TODO()

for _, test := range tests {
a := API{
mockedInvoke{Resp: test.resp},
}

resp, err := a.Invoke(ctx, "", nil)
if err != nil {
t.Fatalf("%d, unexpected error", err)
}

if *resp.StatusCode != test.expected {
t.Errorf("expected %+v, got %d", resp.Payload, test.expected)
}
}
}
Loading

0 comments on commit c517ea5

Please sign in to comment.