Skip to content

Commit

Permalink
docs(examples): Update Kinesis Time Travel Example (#163)
Browse files Browse the repository at this point in the history
* docs(examples): Update Kinesis Time Travel Example

* docs: Add Sample Data

* docs: Update Shard Count
  • Loading branch information
jshlbrd authored Apr 29, 2024
1 parent b463156 commit 6ca3ae8
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
local sub = import '../../../../../../build/config/substation.libsonnet';

{
is_process: [
sub.cnd.str.eq({ obj: { src: 'event.category' }, value: 'process' }),
sub.cnd.str.eq({ obj: { src: 'event.type' }, value: 'start' }),
],
kv_store: sub.kv_store.aws_dynamodb({
table_name: 'substation',
attributes: { partition_key: 'PK', sort_key: 'SK', ttl: 'TTL', value: 'cache' },
}),
field: 'context',
field_exists: sub.cnd.num.len.gt({ obj: { src: $.field }, value: 0 }),
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
// Puts process metadata into the KV store.
local sub = import '../../../../../../../build/config/substation.libsonnet';
local const = import '../const.libsonnet';

{
// The concurrency is set to 1 to ensure that the KV store is not updated in parallel.
concurrency: 1,
transforms: [
// If the field exists, then put the value into the KV store. If the data stream is
// at risk of write heavy activity, then consider first querying the KV store to see
// if the value already exists and only writing if it does not.
// If the event is a process, then store the process metadata in the KV store
// indexed by the PID. The data is stored in the KV store for 90 days.
sub.pattern.tf.conditional(
condition=sub.cnd.all(const.field_exists),
// The ttl_offset is low for the purposes of this example. It should be set to a
// value that is appropriate for the data stream (usually hours or days).
transform=sub.tf.enrich.kv_store.set({ obj: { src: 'ip', trg: const.field }, ttl_offset: '30s', kv_store: const.kv_store }),
condition=sub.cnd.all(const.is_process),
transform=sub.tf.enrich.kv_store.set({ obj: { src: 'process.pid', trg: 'process' }, prefix: 'process', ttl_offset: std.format('%dh', 24 * 90), kv_store: const.kv_store, close_kv_store: false }),
),
],
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// All values in the KV store were put there by the `enrichment` function.
local sub = import '../../../../../../../build/config/substation.libsonnet';
local const = import '../const.libsonnet';

{
concurrency: 2,
transforms: [
// process.*
//
// This is only applied to non-process events.
sub.pattern.tf.conditional(
condition=sub.cnd.none(const.is_process),
transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.pid', trg: 'process' }, prefix: 'process', kv_store: const.kv_store }),
),
// process.parent.*
sub.pattern.tf.conditional(
condition=sub.cnd.num.len.gt({ obj: { src: 'process.parent.pid' }, value: 0 }),
transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.parent.pid', trg: 'process.parent' }, prefix: 'process', kv_store: const.kv_store }),
),
// process.parent.parent.*
sub.pattern.tf.conditional(
condition=sub.cnd.num.len.gt({ obj: { src: 'process.parent.parent.pid' }, value: 0 }),
transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.parent.parent.pid', trg: 'process.parent.parent' }, prefix: 'process', kv_store: const.kv_store }),
),
// Print the results.
sub.tf.send.stdout(),
],
}
4 changes: 4 additions & 0 deletions examples/terraform/aws/kinesis/time_travel/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"event":{"category":"network","type":"connection"},"process":{"name":"Spotify","pid":"d3a6c0b9d3751559f206e12fb1b8f226"},"server":{"ip":"35.186.224.39","port":443},"@timestamp":"2024-03-29T04:02:38.470000Z"}
{"event":{"category":"process","type":"start"},"process":{"command_line":"/sbin/launchd","name":"launchd","pid":"f23e8b548d2e5e1ef3e122a9c5e08a63","start":"2024-03-13T16:17:45.000000Z","parent":{"pid":"b745f7a7c3a98ac5f087be7420e6e3f9"}}}
{"event":{"category":"process","type":"start"},"process":{"command_line":"/usr/libexec/runningboardd","name":"runningboardd","pid":"8faae8aa27f9b4faff6fd98e60201e3d","start":"2024-03-13T16:17:49.000000Z","parent":{"pid":"f23e8b548d2e5e1ef3e122a9c5e08a63"}}}
{"event":{"category":"process","type":"start"},"process":{"command_line":"/Applications/Spotify.app/Contents/MacOS/Spotify","name":"Spotify","pid":"d3a6c0b9d3751559f206e12fb1b8f226","start":"2024-03-13T16:29:17.000000Z","parent":{"pid":"8faae8aa27f9b4faff6fd98e60201e3d"}}}
2 changes: 2 additions & 0 deletions examples/terraform/aws/kinesis/time_travel/post_deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
sleep 5
AWS_DEFAULT_REGION=$AWS_REGION python3 ../build/scripts/aws/kinesis/put_records.py substation terraform/aws/kinesis/time_travel/data.jsonl --print-response
81 changes: 6 additions & 75 deletions examples/terraform/aws/kinesis/time_travel/terraform/_resources.tf
Original file line number Diff line number Diff line change
@@ -1,68 +1,15 @@
data "aws_caller_identity" "caller" {}

# KMS encryption key that is shared by all Substation infrastructure
module "kms" {
source = "../../../../../../build/terraform/aws/kms"

config = {
name = "alias/substation"
policy = data.aws_iam_policy_document.kms.json
}
}

# This policy is required to support encrypted SNS topics.
# More information: https://repost.aws/knowledge-center/cloudwatch-receive-sns-for-alarm-trigger
data "aws_iam_policy_document" "kms" {
# Allows CloudWatch to access encrypted SNS topic.
statement {
sid = "CloudWatch"
effect = "Allow"
actions = [
"kms:Decrypt",
"kms:GenerateDataKey"
]

principals {
type = "Service"
identifiers = ["cloudwatch.amazonaws.com"]
}

resources = ["*"]
}

# Default key policy for KMS.
# https://docs.aws.amazon.com/kms/latest/developerguide/determining-access-key-policy.html
statement {
sid = "KMS"
effect = "Allow"
actions = [
"kms:*",
]

principals {
type = "AWS"
identifiers = ["arn:aws:iam::${data.aws_caller_identity.caller.account_id}:root"]
}

resources = ["*"]
}
}

module "appconfig" {
source = "../../../../../../build/terraform/aws/appconfig"

config = {
name = "substation"
environments = [{
name = "example"
}]
name = "substation"
environments = [{ name = "example" }]
}
}

# Repository for the core Substation application.
module "ecr" {
source = "../../../../../../build/terraform/aws/ecr"
kms = module.kms

config = {
name = "substation"
Expand All @@ -73,7 +20,6 @@ module "ecr" {
# Repository for the autoscaling application.
module "ecr_autoscale" {
source = "../../../../../../build/terraform/aws/ecr"
kms = module.kms

config = {
name = "autoscale"
Expand All @@ -83,45 +29,30 @@ module "ecr_autoscale" {

# SNS topic for Kinesis Data Stream autoscaling alarms.
resource "aws_sns_topic" "autoscaling_topic" {
name = "autoscale"
kms_master_key_id = module.kms.id
}

# API Gateway that sends data to Kinesis.
module "gateway" {
source = "../../../../../../build/terraform/aws/api_gateway/kinesis_data_stream"
# Always required for the Kinisis Data Stream integration.
kinesis_data_stream = module.kinesis

config = {
name = "gateway"
}
name = "autoscale"
}

# Kinesis Data Stream that stores data sent from pipeline sources.
module "kinesis" {
source = "../../../../../../build/terraform/aws/kinesis_data_stream"
kms = module.kms

config = {
name = "substation"
autoscaling_topic = aws_sns_topic.autoscaling_topic.arn
shards = 1
}

access = [
# Autoscales the stream.
module.lambda_autoscaling.role.name,
# Consumes data from the stream.
module.lambda_enrichment.role.name,
module.lambda_subscriber.role.name,
# Publishes data to the stream.
module.gateway.role.name,
module.lambda_transform.role.name,
]
}

module "dynamodb" {
source = "../../../../../../build/terraform/aws/dynamodb"
kms = module.kms

config = {
name = "substation"
Expand All @@ -143,6 +74,6 @@ module "dynamodb" {

access = [
module.lambda_enrichment.role.name,
module.lambda_subscriber.role.name,
module.lambda_transform.role.name,
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ module "lambda_autoscaling" {
image_uri = "${module.ecr_autoscale.url}:v1.2.0"
image_arm = true
}

depends_on = [
module.appconfig.name,
module.ecr_autoscale.url,
]
}

resource "aws_sns_topic_subscription" "autoscaling_subscription" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ module "lambda_enrichment" {
"SUBSTATION_DEBUG" : true
}
}

depends_on = [
module.appconfig.name,
module.ecr.url,
]
}

resource "aws_lambda_event_source_mapping" "lambda_enrichment" {
Expand All @@ -27,5 +22,8 @@ resource "aws_lambda_event_source_mapping" "lambda_enrichment" {
maximum_batching_window_in_seconds = 5
batch_size = 100
parallelization_factor = 1
starting_position = "LATEST"
# In this example, we start from the beginning of the stream,
# but in a prod environment, you may want to start from the end
# of the stream to avoid processing old data ("LATEST").
starting_position = "TRIM_HORIZON"
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
module "lambda_subscriber" {
module "lambda_transform" {
source = "../../../../../../build/terraform/aws/lambda"
appconfig = module.appconfig

config = {
name = "subscriber"
name = "transform"
description = "Substation node that reads from Kinesis with a delay to support enrichment"
image_uri = "${module.ecr.url}:v1.2.0"
image_arm = true

env = {
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/subscriber"
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/transform"
"SUBSTATION_LAMBDA_HANDLER" : "AWS_KINESIS_DATA_STREAM"
"SUBSTATION_DEBUG" : true
}
}

depends_on = [
module.appconfig.name,
module.ecr.url,
]
}

resource "aws_lambda_event_source_mapping" "lambda_subscriber" {
resource "aws_lambda_event_source_mapping" "lambda_transform" {
event_source_arn = module.kinesis.arn
function_name = module.lambda_subscriber.arn
function_name = module.lambda_transform.arn
maximum_batching_window_in_seconds = 15
batch_size = 100
parallelization_factor = 1
starting_position = "LATEST"
# In this example, we start from the beginning of the stream,
# but in a prod environment, you may want to start from the end
# of the stream to avoid processing old data ("LATEST").
starting_position = "TRIM_HORIZON"
}

0 comments on commit 6ca3ae8

Please sign in to comment.