Skip to content

Commit

Permalink
Create the cloudevent-recorder module.
Browse files Browse the repository at this point in the history
This module makes it easy to configure a recorder service that pulls events of particular types off of a
`cloudevent-broker`'s topics, rotates them into GCS, and sets up a BigQuery DTS job to import them into a
schematized table of those events.

Signed-off-by: Matt Moore <[email protected]>
  • Loading branch information
mattmoor committed Dec 12, 2023
1 parent 68a8ebb commit 15b9f7a
Show file tree
Hide file tree
Showing 12 changed files with 982 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
- authorize-private-service
- cloudevent-broker
- cloudevent-trigger
- cloudevent-recorder
- networking

steps:
Expand Down
108 changes: 108 additions & 0 deletions cloudevent-recorder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# `cloudevent-recorder`

This module provisions a regionalized cloudevents sink that consumes events of
particular types from each of the regional brokers, and writes them into a
regional GCS bucket where a periodic BigQuery Data-Transfer Service Job will
pull events from into a BigQuery table schematized for that event type. The
intended usage of this module for publishing events is something like this:

```hcl
// Create a network with several regional subnets
module "networking" {
source = "chainguard-dev/glue/cloudrun//networking"
name = "my-networking"
project_id = var.project_id
regions = [...]
}
// Create the Broker abstraction.
module "cloudevent-broker" {
source = "chainguard-dev/glue/cloudrun//cloudevent-broker"
name = "my-broker"
project_id = var.project_id
regions = module.networking.regional-networks
}
// Record cloudevents of type com.example.foo and com.example.bar
module "foo-emits-events" {
source = "chainguard-dev/glue/cloudrun//cloudevent-recorder"
name = "my-recorder"
project_id = var.project_id
regions = module.networking.regional-networks
broker = module.cloudevent-broker.broker
retention-period = 30 // keep around 30 days worth of event data
types = {
"com.example.foo": file("${path.module}/foo.schema.json"),
"com.example.bar": file("${path.module}/bar.schema.json"),
}
}
```

<!-- BEGIN_TF_DOCS -->
## Requirements

No requirements.

## Providers

| Name | Version |
|------|---------|
| <a name="provider_cosign"></a> [cosign](#provider\_cosign) | n/a |
| <a name="provider_google"></a> [google](#provider\_google) | n/a |
| <a name="provider_google-beta"></a> [google-beta](#provider\_google-beta) | n/a |
| <a name="provider_ko"></a> [ko](#provider\_ko) | n/a |
| <a name="provider_random"></a> [random](#provider\_random) | n/a |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_triggers"></a> [triggers](#module\_triggers) | ../cloudevent-trigger | n/a |

## Resources

| Name | Type |
|------|------|
| [cosign_sign.logrotate-image](https://registry.terraform.io/providers/chainguard-dev/cosign/latest/docs/resources/sign) | resource |
| [cosign_sign.recorder-image](https://registry.terraform.io/providers/chainguard-dev/cosign/latest/docs/resources/sign) | resource |
| [google-beta_google_cloud_run_v2_service.recorder-service](https://registry.terraform.io/providers/hashicorp/google-beta/latest/docs/resources/google_cloud_run_v2_service) | resource |
| [google_bigquery_data_transfer_config.import-job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_data_transfer_config) | resource |
| [google_bigquery_dataset.this](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset) | resource |
| [google_bigquery_table.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_table) | resource |
| [google_bigquery_table_iam_member.import-writes-to-tables](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_table_iam_member) | resource |
| [google_service_account.import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource |
| [google_service_account.recorder](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource |
| [google_service_account_iam_member.bq-dts-assumes-import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account_iam_member) | resource |
| [google_service_account_iam_member.provisioner-acts-as-import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account_iam_member) | resource |
| [google_storage_bucket.recorder](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket) | resource |
| [google_storage_bucket_iam_member.import-reads-from-gcs-buckets](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_member) | resource |
| [google_storage_bucket_iam_member.recorder-writes-to-gcs-buckets](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_member) | resource |
| [ko_build.logrotate-image](https://registry.terraform.io/providers/ko-build/ko/latest/docs/resources/build) | resource |
| [ko_build.recorder-image](https://registry.terraform.io/providers/ko-build/ko/latest/docs/resources/build) | resource |
| [random_id.suffix](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) | resource |
| [random_id.trigger-suffix](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) | resource |
| [google_project.project](https://registry.terraform.io/providers/hashicorp/google/latest/docs/data-sources/project) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_broker"></a> [broker](#input\_broker) | A map from each of the input region names to the name of the Broker topic in that region. | `map(string)` | n/a | yes |
| <a name="input_deletion_protection"></a> [deletion\_protection](#input\_deletion\_protection) | Whether to enable deletion protection on data resources. | `bool` | `true` | no |
| <a name="input_location"></a> [location](#input\_location) | The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS. | `string` | `"US"` | no |
| <a name="input_name"></a> [name](#input\_name) | n/a | `string` | n/a | yes |
| <a name="input_project_id"></a> [project\_id](#input\_project\_id) | n/a | `string` | n/a | yes |
| <a name="input_provisioner"></a> [provisioner](#input\_provisioner) | The identity as which this module will be applied (so it may be granted permission to 'act as' the DTS service account). This should be in the form expected by an IAM subject (e.g. user:sally@example.com) | `string` | n/a | yes |
| <a name="input_regions"></a> [regions](#input\_regions) | A map from region names to a network and subnetwork. A recorder service and cloud storage bucket (into which the service writes events) will be created in each region. | <pre>map(object({<br> network = string<br> subnet = string<br> }))</pre> | n/a | yes |
| <a name="input_retention-period"></a> [retention-period](#input\_retention-period) | The number of days to retain data in BigQuery. | `number` | n/a | yes |
| <a name="input_types"></a> [types](#input\_types) | A map from cloudevent types to the BigQuery schema associated with them. | `map(string)` | n/a | yes |

## Outputs

No outputs.
<!-- END_TF_DOCS -->
100 changes: 100 additions & 0 deletions cloudevent-recorder/bigquery.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// The BigQuery dataset that will hold the recorded cloudevents.
resource "google_bigquery_dataset" "this" {
project = var.project_id
dataset_id = "cloudevents_${replace(var.name, "-", "_")}"
location = var.location

default_partition_expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000
}

// A BigQuery table for each of the cloudevent types with the specified
// schema for that type.
resource "google_bigquery_table" "types" {
for_each = var.types

project = var.project_id
dataset_id = google_bigquery_dataset.this.dataset_id
table_id = replace(each.key, ".", "_")
schema = each.value

require_partition_filter = false

time_partitioning {
type = "DAY"

expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000
}

deletion_protection = var.deletion_protection
}

// Create an identity that will be used to run the BQ DTS job,
// which we will grant the necessary permissions to.
resource "google_service_account" "import-identity" {
project = var.project_id
account_id = "${var.name}-import"
display_name = "BigQuery import identity"
}

// Grant the import identity permission to manipulate the dataset's tables.
resource "google_bigquery_table_iam_member" "import-writes-to-tables" {
for_each = var.types

project = var.project_id
dataset_id = google_bigquery_dataset.this.dataset_id
table_id = google_bigquery_table.types[each.key].table_id
role = "roles/bigquery.admin"
member = "serviceAccount:${google_service_account.import-identity.email}"
}

// Grant the import identity permission to read the event data from
// the regional GCS buckets.
resource "google_storage_bucket_iam_member" "import-reads-from-gcs-buckets" {
for_each = var.regions

bucket = google_storage_bucket.recorder[each.key].name
role = "roles/storage.objectViewer"
member = "serviceAccount:${google_service_account.import-identity.email}"
}

// Grant the BQ DTS service account for this project permission to assume
// the identity we are assigning to the DTS job.
resource "google_service_account_iam_member" "bq-dts-assumes-import-identity" {
service_account_id = google_service_account.import-identity.name
role = "roles/iam.serviceAccountShortTermTokenMinter"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com"
}

// Only users that can "act as" the service account can set the service account on a transfer job.
resource "google_service_account_iam_member" "provisioner-acts-as-import-identity" {
service_account_id = google_service_account.import-identity.name
role = "roles/iam.serviceAccountUser"
member = var.provisioner
}

// Create a BQ DTS job for each of the regions x types pulling from the appropriate buckets and paths.
resource "google_bigquery_data_transfer_config" "import-job" {
for_each = local.regional-types

depends_on = [google_service_account_iam_member.provisioner-acts-as-import-identity]

project = var.project_id
display_name = "${var.name}-${each.key}"
location = google_bigquery_dataset.this.location // These must be colocated
service_account_name = google_service_account.import-identity.email
disabled = false

data_source_id = "google_cloud_storage"
schedule = "every 15 minutes"
destination_dataset_id = google_bigquery_dataset.this.dataset_id

// TODO(mattmoor): Bring back pubsub notification.
# notification_pubsub_topic = google_pubsub_topic.bq_notification[each.key].id
params = {
data_path_template = "gs://${google_storage_bucket.recorder[each.value.region].name}/${each.value.type}/*"
destination_table_name_template = google_bigquery_table.types[each.value.type].table_id
file_format = "JSON"
max_bad_records = 0
delete_source_files = false
}
}
34 changes: 34 additions & 0 deletions cloudevent-recorder/cmd/logrotate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2023 Chainguard, Inc.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"log"
"time"

"github.com/chainguard-dev/terraform-cloudrun-glue/pkg/rotate"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/signals"
)

type rotateConfig struct {
Bucket string `envconfig:"BUCKET" required:"true"`
FlushInterval time.Duration `envconfig:"FLUSH_INTERVAL" default:"3m"`
LogPath string `envconfig:"LOG_PATH" required:"true"`
}

func main() {
var rc rotateConfig
if err := envconfig.Process("", &rc); err != nil {
log.Fatalf("Error processing environment: %v", err)
}

uploader := rotate.NewUploader(rc.LogPath, rc.Bucket, rc.FlushInterval)

if err := uploader.Run(signals.NewContext()); err != nil {
log.Fatalf("Failed to run the uploader: %v", err)
}
}
46 changes: 46 additions & 0 deletions cloudevent-recorder/cmd/recorder/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2023 Chainguard, Inc.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"log"
"os"
"path/filepath"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/signals"
)

type envConfig struct {
Port int `envconfig:"PORT" default:"8080" required:"true"`
LogPath string `envconfig:"LOG_PATH" required:"true"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Panicf("failed to process env var: %s", err)
}
ctx := signals.NewContext()

c, err := cloudevents.NewClientHTTP(cloudevents.WithPort(env.Port))
if err != nil {
log.Panicf("failed to create event client, %v", err)
}
if err := c.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) error {
dir := filepath.Join(env.LogPath, event.Type())
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}

filename := filepath.Join(dir, event.ID())
return os.WriteFile(filename, event.Data(), 0600)
}); err != nil {
log.Panic(err)
}
}
48 changes: 48 additions & 0 deletions cloudevent-recorder/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
terraform {
required_providers {
ko = { source = "ko-build/ko" }
cosign = { source = "chainguard-dev/cosign" }
}
}

data "google_project" "project" {
project_id = var.project_id
}

locals {
regional-types = merge([
for region in keys(var.regions) : merge([
for type in keys(var.types) : {
"${region}-${type}" : {
region = region
type = type
}
}
]...)
]...)
}

resource "random_id" "suffix" {
byte_length = 2
}

resource "google_storage_bucket" "recorder" {
for_each = var.regions

name = "${var.name}-${each.key}-${random_id.suffix.hex}"
project = var.project_id
location = each.key
force_destroy = !var.deletion_protection

uniform_bucket_level_access = true

lifecycle_rule {
action {
type = "Delete"
}
condition {
// 1 week + 1 day buffer
age = 8
}
}
}
Loading

0 comments on commit 15b9f7a

Please sign in to comment.