Skip to content

Commit

Permalink
Create the cloudevent-recorder module.
Browse files Browse the repository at this point in the history
This module makes it easy to configure a recorder service that pulls events of particular types off of a
`cloudevent-broker`'s topics, rotates them into GCS, and sets up a BigQuery DTS job to import them into a
schematized table of those events.

Signed-off-by: Matt Moore <[email protected]>
  • Loading branch information
mattmoor committed Dec 12, 2023
1 parent 68a8ebb commit 4941654
Show file tree
Hide file tree
Showing 12 changed files with 963 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
- authorize-private-service
- cloudevent-broker
- cloudevent-trigger
- cloudevent-recorder
- networking

steps:
Expand Down
89 changes: 89 additions & 0 deletions cloudevent-recorder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# `cloudevent-recorder`

This module provisions a regionalized cloudevents sink that consumes events of
particular types from each of the regional brokers, and writes them into a
regional GCS bucket where a periodic BigQuery Data-Transfer Service Job will
pull events from into a BigQuery table schematized for that event type. The
intended usage of this module for publishing events is something like this:

```hcl
// Create a network with several regional subnets
module "networking" {
source = "chainguard-dev/glue/cloudrun//networking"
name = "my-networking"
project_id = var.project_id
regions = [...]
}
// Create the Broker abstraction.
module "cloudevent-broker" {
source = "chainguard-dev/glue/cloudrun//cloudevent-broker"
name = "my-broker"
project_id = var.project_id
regions = module.networking.regional-networks
}
// Record cloudevents of type com.example.foo and com.example.bar
module "foo-emits-events" {
source = "chainguard-dev/glue/cloudrun//cloudevent-recorder"
name = "my-recorder"
project_id = var.project_id
regions = module.networking.regional-networks
broker = module.cloudevent-broker.broker
retention-period = 30 // keep around 30 days worth of event data
types = {
"com.example.foo": file("${path.module}/foo.schema.json"),
"com.example.bar": file("${path.module}/bar.schema.json"),
}
}
```


<!-- BEGIN_TF_DOCS -->
## Requirements

No requirements.

## Providers

| Name | Version |
|------|---------|
| <a name="provider_cosign"></a> [cosign](#provider\_cosign) | n/a |
| <a name="provider_google"></a> [google](#provider\_google) | n/a |
| <a name="provider_ko"></a> [ko](#provider\_ko) | n/a |

## Modules

No modules.

## Resources

| Name | Type |
|------|------|
| [cosign_sign.this](https://registry.terraform.io/providers/chainguard-dev/cosign/latest/docs/resources/sign) | resource |
| [google_cloud_run_v2_service.this](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/cloud_run_v2_service) | resource |
| [google_pubsub_topic.this](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource |
| [google_pubsub_topic_iam_binding.ingress-publishes-events](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic_iam_binding) | resource |
| [google_service_account.this](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource |
| [ko_build.this](https://registry.terraform.io/providers/ko-build/ko/latest/docs/resources/build) | resource |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_name"></a> [name](#input\_name) | n/a | `string` | n/a | yes |
| <a name="input_project_id"></a> [project\_id](#input\_project\_id) | n/a | `string` | n/a | yes |
| <a name="input_regions"></a> [regions](#input\_regions) | A map from region names to a network and subnetwork. A pub/sub topic and ingress service (publishing to the respective topic) will be created in each region, with the ingress service configured to egress all traffic via the specified subnetwork. | <pre>map(object({<br> network = string<br> subnet = string<br> }))</pre> | n/a | yes |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_broker"></a> [broker](#output\_broker) | A map from each of the input region names to the name of the Broker topic in each region. These broker names are intended for use with the cloudevent-trigger module's broker input. |
| <a name="output_ingress"></a> [ingress](#output\_ingress) | An object holding the name of the ingress service, which can be used to authorize callers to publish cloud events. |
<!-- END_TF_DOCS -->
100 changes: 100 additions & 0 deletions cloudevent-recorder/bigquery.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// The BigQuery dataset that will hold the recorded cloudevents.
resource "google_bigquery_dataset" "this" {
project = var.project_id
dataset_id = "cloudevents_${replace(var.name, "-", "_")}"
location = var.location

default_partition_expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000
}

// A BigQuery table for each of the cloudevent types with the specified
// schema for that type.
resource "google_bigquery_table" "types" {
for_each = var.types

project = var.project_id
dataset_id = google_bigquery_dataset.this.dataset_id
table_id = replace(each.key, ".", "_")
schema = each.value

require_partition_filter = false

time_partitioning {
type = "DAY"

expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000
}

deletion_protection = var.deletion_protection
}

// Create an identity that will be used to run the BQ DTS job,
// which we will grant the necessary permissions to.
resource "google_service_account" "import-identity" {
project = var.project_id
account_id = "${var.name}-import"
display_name = "BigQuery import identity"
}

// Grant the import identity permission to manipulate the dataset's tables.
resource "google_bigquery_table_iam_member" "import-writes-to-tables" {
for_each = var.types

project = var.project_id
dataset_id = google_bigquery_dataset.this.dataset_id
table_id = google_bigquery_table.types[each.key].table_id
role = "roles/bigquery.admin"
member = "serviceAccount:${google_service_account.import-identity.email}"
}

// Grant the import identity permission to read the event data from
// the regional GCS buckets.
resource "google_storage_bucket_iam_member" "import-reads-from-gcs-buckets" {
for_each = var.regions

bucket = google_storage_bucket.recorder[each.key].name
role = "roles/storage.objectViewer"
member = "serviceAccount:${google_service_account.import-identity.email}"
}

// Grant the BQ DTS service account for this project permission to assume
// the identity we are assigning to the DTS job.
resource "google_service_account_iam_member" "bq-dts-assumes-import-identity" {
service_account_id = google_service_account.import-identity.name
role = "roles/iam.serviceAccountShortTermTokenMinter"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com"
}

// Only users that can "act as" the service account can set the service account on a transfer job.
resource "google_service_account_iam_member" "provisioner-acts-as-import-identity" {
service_account_id = google_service_account.import-identity.name
role = "roles/iam.serviceAccountUser"
member = var.provisioner
}

// Create a BQ DTS job for each of the regions x types pulling from the appropriate buckets and paths.
resource "google_bigquery_data_transfer_config" "import-job" {
for_each = local.regional-types

depends_on = [google_service_account_iam_member.provisioner-acts-as-import-identity]

project = var.project_id
display_name = "${var.name}-${each.key}"
location = google_bigquery_dataset.this.location // These must be colocated
service_account_name = google_service_account.import-identity.email
disabled = false

data_source_id = "google_cloud_storage"
schedule = "every 15 minutes"
destination_dataset_id = google_bigquery_dataset.this.dataset_id

// TODO(mattmoor): Bring back pubsub notification.
# notification_pubsub_topic = google_pubsub_topic.bq_notification[each.key].id
params = {
data_path_template = "gs://${google_storage_bucket.recorder[each.value.region].name}/${each.value.type}/*"
destination_table_name_template = google_bigquery_table.types[each.value.type].table_id
file_format = "JSON"
max_bad_records = 0
delete_source_files = false
}
}
34 changes: 34 additions & 0 deletions cloudevent-recorder/cmd/logrotate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2023 Chainguard, Inc.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"log"
"time"

"github.com/chainguard-dev/terraform-cloudrun-glue/pkg/rotate"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/signals"
)

type rotateConfig struct {
Bucket string `envconfig:"BUCKET" required:"true"`
FlushInterval time.Duration `envconfig:"FLUSH_INTERVAL" default:"3m"`
LogPath string `envconfig:"LOG_PATH" required:"true"`
}

func main() {
var rc rotateConfig
if err := envconfig.Process("", &rc); err != nil {
log.Fatalf("Error processing environment: %v", err)
}

uploader := rotate.NewUploader(rc.LogPath, rc.Bucket, rc.FlushInterval)

if err := uploader.Run(signals.NewContext()); err != nil {
log.Fatalf("Failed to run the uploader: %v", err)
}
}
46 changes: 46 additions & 0 deletions cloudevent-recorder/cmd/recorder/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2023 Chainguard, Inc.
SPDX-License-Identifier: Apache-2.0
*/

package main

import (
"context"
"log"
"os"
"path/filepath"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/signals"
)

type envConfig struct {
Port int `envconfig:"PORT" default:"8080" required:"true"`
LogPath string `envconfig:"LOG_PATH" required:"true"`
}

func main() {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Panicf("failed to process env var: %s", err)
}
ctx := signals.NewContext()

c, err := cloudevents.NewClientHTTP(cloudevents.WithPort(env.Port))
if err != nil {
log.Panicf("failed to create event client, %v", err)
}
if err := c.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) error {
dir := filepath.Join(env.LogPath, event.Type())
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}

filename := filepath.Join(dir, event.ID())
return os.WriteFile(filename, event.Data(), 0600)
}); err != nil {
log.Panic(err)
}
}
48 changes: 48 additions & 0 deletions cloudevent-recorder/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
terraform {
required_providers {
ko = { source = "ko-build/ko" }
cosign = { source = "chainguard-dev/cosign" }
}
}

data "google_project" "project" {
project_id = var.project_id
}

locals {
regional-types = merge([
for region in keys(var.regions) : merge([
for type in keys(var.types) : {
"${region}-${type}" : {
region = region
type = type
}
}
]...)
]...)
}

resource "random_id" "suffix" {
byte_length = 2
}

resource "google_storage_bucket" "recorder" {
for_each = var.regions

name = "${var.name}-${each.key}-${random_id.suffix.hex}"
project = var.project_id
location = each.key
force_destroy = !var.deletion_protection

uniform_bucket_level_access = true

lifecycle_rule {
action {
type = "Delete"
}
condition {
// 1 week + 1 day buffer
age = 8
}
}
}

Check failure on line 48 in cloudevent-recorder/main.tf

View workflow job for this annotation

GitHub Actions / Lint

[EOF Newline] reported by reviewdog 🐶 Missing newline Raw Output: cloudevent-recorder/main.tf:48: Missing newline
Loading

0 comments on commit 4941654

Please sign in to comment.