diff --git a/.github/workflows/documentation.yaml b/.github/workflows/documentation.yaml index 3041b949..69cb075e 100644 --- a/.github/workflows/documentation.yaml +++ b/.github/workflows/documentation.yaml @@ -13,6 +13,7 @@ jobs: - authorize-private-service - cloudevent-broker - cloudevent-trigger + - cloudevent-recorder - networking steps: diff --git a/cloudevent-recorder/README.md b/cloudevent-recorder/README.md new file mode 100644 index 00000000..756ab9f4 --- /dev/null +++ b/cloudevent-recorder/README.md @@ -0,0 +1,108 @@ +# `cloudevent-recorder` + +This module provisions a regionalized cloudevents sink that consumes events of +particular types from each of the regional brokers, and writes them into a +regional GCS bucket where a periodic BigQuery Data-Transfer Service Job will +pull events from into a BigQuery table schematized for that event type. The +intended usage of this module for publishing events is something like this: + +```hcl +// Create a network with several regional subnets +module "networking" { + source = "chainguard-dev/glue/cloudrun//networking" + + name = "my-networking" + project_id = var.project_id + regions = [...] +} + +// Create the Broker abstraction. +module "cloudevent-broker" { + source = "chainguard-dev/glue/cloudrun//cloudevent-broker" + + name = "my-broker" + project_id = var.project_id + regions = module.networking.regional-networks +} + +// Record cloudevents of type com.example.foo and com.example.bar +module "foo-emits-events" { + source = "chainguard-dev/glue/cloudrun//cloudevent-recorder" + + name = "my-recorder" + project_id = var.project_id + regions = module.networking.regional-networks + broker = module.cloudevent-broker.broker + + retention-period = 30 // keep around 30 days worth of event data + + types = { + "com.example.foo": file("${path.module}/foo.schema.json"), + "com.example.bar": file("${path.module}/bar.schema.json"), + } +} +``` + + +## Requirements + +No requirements. + +## Providers + +| Name | Version | +|------|---------| +| [cosign](#provider\_cosign) | n/a | +| [google](#provider\_google) | n/a | +| [google-beta](#provider\_google-beta) | n/a | +| [ko](#provider\_ko) | n/a | +| [random](#provider\_random) | n/a | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [triggers](#module\_triggers) | ../cloudevent-trigger | n/a | + +## Resources + +| Name | Type | +|------|------| +| [cosign_sign.logrotate-image](https://registry.terraform.io/providers/chainguard-dev/cosign/latest/docs/resources/sign) | resource | +| [cosign_sign.recorder-image](https://registry.terraform.io/providers/chainguard-dev/cosign/latest/docs/resources/sign) | resource | +| [google-beta_google_cloud_run_v2_service.recorder-service](https://registry.terraform.io/providers/hashicorp/google-beta/latest/docs/resources/google_cloud_run_v2_service) | resource | +| [google_bigquery_data_transfer_config.import-job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_data_transfer_config) | resource | +| [google_bigquery_dataset.this](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset) | resource | +| [google_bigquery_table.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_table) | resource | +| [google_bigquery_table_iam_member.import-writes-to-tables](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_table_iam_member) | resource | +| [google_service_account.import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource | +| [google_service_account.recorder](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource | +| [google_service_account_iam_member.bq-dts-assumes-import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account_iam_member) | resource | +| [google_service_account_iam_member.provisioner-acts-as-import-identity](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account_iam_member) | resource | +| [google_storage_bucket.recorder](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket) | resource | +| [google_storage_bucket_iam_member.import-reads-from-gcs-buckets](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_member) | resource | +| [google_storage_bucket_iam_member.recorder-writes-to-gcs-buckets](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_member) | resource | +| [ko_build.logrotate-image](https://registry.terraform.io/providers/ko-build/ko/latest/docs/resources/build) | resource | +| [ko_build.recorder-image](https://registry.terraform.io/providers/ko-build/ko/latest/docs/resources/build) | resource | +| [random_id.suffix](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) | resource | +| [random_id.trigger-suffix](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) | resource | +| [google_project.project](https://registry.terraform.io/providers/hashicorp/google/latest/docs/data-sources/project) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [broker](#input\_broker) | A map from each of the input region names to the name of the Broker topic in that region. | `map(string)` | n/a | yes | +| [deletion\_protection](#input\_deletion\_protection) | Whether to enable deletion protection on data resources. | `bool` | `true` | no | +| [location](#input\_location) | The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS. | `string` | `"US"` | no | +| [name](#input\_name) | n/a | `string` | n/a | yes | +| [project\_id](#input\_project\_id) | n/a | `string` | n/a | yes | +| [provisioner](#input\_provisioner) | The identity as which this module will be applied (so it may be granted permission to 'act as' the DTS service account). This should be in the form expected by an IAM subject (e.g. user:sally@example.com) | `string` | n/a | yes | +| [regions](#input\_regions) | A map from region names to a network and subnetwork. A recorder service and cloud storage bucket (into which the service writes events) will be created in each region. |
map(object({
network = string
subnet = string
}))
| n/a | yes | +| [retention-period](#input\_retention-period) | The number of days to retain data in BigQuery. | `number` | n/a | yes | +| [types](#input\_types) | A map from cloudevent types to the BigQuery schema associated with them. | `map(string)` | n/a | yes | + +## Outputs + +No outputs. + diff --git a/cloudevent-recorder/bigquery.tf b/cloudevent-recorder/bigquery.tf new file mode 100644 index 00000000..714024ed --- /dev/null +++ b/cloudevent-recorder/bigquery.tf @@ -0,0 +1,100 @@ +// The BigQuery dataset that will hold the recorded cloudevents. +resource "google_bigquery_dataset" "this" { + project = var.project_id + dataset_id = "cloudevents_${replace(var.name, "-", "_")}" + location = var.location + + default_partition_expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000 +} + +// A BigQuery table for each of the cloudevent types with the specified +// schema for that type. +resource "google_bigquery_table" "types" { + for_each = var.types + + project = var.project_id + dataset_id = google_bigquery_dataset.this.dataset_id + table_id = replace(each.key, ".", "_") + schema = each.value + + require_partition_filter = false + + time_partitioning { + type = "DAY" + + expiration_ms = (var.retention-period) * 24 * 60 * 60 * 1000 + } + + deletion_protection = var.deletion_protection +} + +// Create an identity that will be used to run the BQ DTS job, +// which we will grant the necessary permissions to. +resource "google_service_account" "import-identity" { + project = var.project_id + account_id = "${var.name}-import" + display_name = "BigQuery import identity" +} + +// Grant the import identity permission to manipulate the dataset's tables. +resource "google_bigquery_table_iam_member" "import-writes-to-tables" { + for_each = var.types + + project = var.project_id + dataset_id = google_bigquery_dataset.this.dataset_id + table_id = google_bigquery_table.types[each.key].table_id + role = "roles/bigquery.admin" + member = "serviceAccount:${google_service_account.import-identity.email}" +} + +// Grant the import identity permission to read the event data from +// the regional GCS buckets. +resource "google_storage_bucket_iam_member" "import-reads-from-gcs-buckets" { + for_each = var.regions + + bucket = google_storage_bucket.recorder[each.key].name + role = "roles/storage.objectViewer" + member = "serviceAccount:${google_service_account.import-identity.email}" +} + +// Grant the BQ DTS service account for this project permission to assume +// the identity we are assigning to the DTS job. +resource "google_service_account_iam_member" "bq-dts-assumes-import-identity" { + service_account_id = google_service_account.import-identity.name + role = "roles/iam.serviceAccountShortTermTokenMinter" + member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com" +} + +// Only users that can "act as" the service account can set the service account on a transfer job. +resource "google_service_account_iam_member" "provisioner-acts-as-import-identity" { + service_account_id = google_service_account.import-identity.name + role = "roles/iam.serviceAccountUser" + member = var.provisioner +} + +// Create a BQ DTS job for each of the regions x types pulling from the appropriate buckets and paths. +resource "google_bigquery_data_transfer_config" "import-job" { + for_each = local.regional-types + + depends_on = [google_service_account_iam_member.provisioner-acts-as-import-identity] + + project = var.project_id + display_name = "${var.name}-${each.key}" + location = google_bigquery_dataset.this.location // These must be colocated + service_account_name = google_service_account.import-identity.email + disabled = false + + data_source_id = "google_cloud_storage" + schedule = "every 15 minutes" + destination_dataset_id = google_bigquery_dataset.this.dataset_id + + // TODO(mattmoor): Bring back pubsub notification. + # notification_pubsub_topic = google_pubsub_topic.bq_notification[each.key].id + params = { + data_path_template = "gs://${google_storage_bucket.recorder[each.value.region].name}/${each.value.type}/*" + destination_table_name_template = google_bigquery_table.types[each.value.type].table_id + file_format = "JSON" + max_bad_records = 0 + delete_source_files = false + } +} diff --git a/cloudevent-recorder/cmd/logrotate/main.go b/cloudevent-recorder/cmd/logrotate/main.go new file mode 100644 index 00000000..5771052a --- /dev/null +++ b/cloudevent-recorder/cmd/logrotate/main.go @@ -0,0 +1,34 @@ +/* +Copyright 2023 Chainguard, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "log" + "time" + + "github.com/chainguard-dev/terraform-cloudrun-glue/pkg/rotate" + "github.com/kelseyhightower/envconfig" + "knative.dev/pkg/signals" +) + +type rotateConfig struct { + Bucket string `envconfig:"BUCKET" required:"true"` + FlushInterval time.Duration `envconfig:"FLUSH_INTERVAL" default:"3m"` + LogPath string `envconfig:"LOG_PATH" required:"true"` +} + +func main() { + var rc rotateConfig + if err := envconfig.Process("", &rc); err != nil { + log.Fatalf("Error processing environment: %v", err) + } + + uploader := rotate.NewUploader(rc.LogPath, rc.Bucket, rc.FlushInterval) + + if err := uploader.Run(signals.NewContext()); err != nil { + log.Fatalf("Failed to run the uploader: %v", err) + } +} diff --git a/cloudevent-recorder/cmd/recorder/main.go b/cloudevent-recorder/cmd/recorder/main.go new file mode 100644 index 00000000..d801afd3 --- /dev/null +++ b/cloudevent-recorder/cmd/recorder/main.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 Chainguard, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + "os" + "path/filepath" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/kelseyhightower/envconfig" + "knative.dev/pkg/signals" +) + +type envConfig struct { + Port int `envconfig:"PORT" default:"8080" required:"true"` + LogPath string `envconfig:"LOG_PATH" required:"true"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Panicf("failed to process env var: %s", err) + } + ctx := signals.NewContext() + + c, err := cloudevents.NewClientHTTP(cloudevents.WithPort(env.Port)) + if err != nil { + log.Panicf("failed to create event client, %v", err) + } + if err := c.StartReceiver(ctx, func(ctx context.Context, event cloudevents.Event) error { + dir := filepath.Join(env.LogPath, event.Type()) + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + filename := filepath.Join(dir, event.ID()) + return os.WriteFile(filename, event.Data(), 0600) + }); err != nil { + log.Panic(err) + } +} diff --git a/cloudevent-recorder/main.tf b/cloudevent-recorder/main.tf new file mode 100644 index 00000000..bd57c480 --- /dev/null +++ b/cloudevent-recorder/main.tf @@ -0,0 +1,48 @@ +terraform { + required_providers { + ko = { source = "ko-build/ko" } + cosign = { source = "chainguard-dev/cosign" } + } +} + +data "google_project" "project" { + project_id = var.project_id +} + +locals { + regional-types = merge([ + for region in keys(var.regions) : merge([ + for type in keys(var.types) : { + "${region}-${type}" : { + region = region + type = type + } + } + ]...) + ]...) +} + +resource "random_id" "suffix" { + byte_length = 2 +} + +resource "google_storage_bucket" "recorder" { + for_each = var.regions + + name = "${var.name}-${each.key}-${random_id.suffix.hex}" + project = var.project_id + location = each.key + force_destroy = !var.deletion_protection + + uniform_bucket_level_access = true + + lifecycle_rule { + action { + type = "Delete" + } + condition { + // 1 week + 1 day buffer + age = 8 + } + } +} diff --git a/cloudevent-recorder/recorder.tf b/cloudevent-recorder/recorder.tf new file mode 100644 index 00000000..a2785626 --- /dev/null +++ b/cloudevent-recorder/recorder.tf @@ -0,0 +1,122 @@ +// Create an identity as which the recorder service will run. +resource "google_service_account" "recorder" { + project = var.project_id + + account_id = var.name + display_name = "Cloudevents recorder" + description = "Dedicated service account for our recorder service." +} + +// Grant the recorder service account permission to write to the regional GCS buckets. +resource "google_storage_bucket_iam_member" "recorder-writes-to-gcs-buckets" { + for_each = var.regions + + bucket = google_storage_bucket.recorder[each.key].name + role = "roles/storage.admin" + member = "serviceAccount:${google_service_account.recorder.email}" +} + +resource "ko_build" "recorder-image" { + base_image = "cgr.dev/chainguard/static:latest-glibc" + importpath = "./cmd/recorder" + working_dir = path.module +} + +resource "cosign_sign" "recorder-image" { + image = ko_build.recorder-image.image_ref +} + +resource "ko_build" "logrotate-image" { + base_image = "cgr.dev/chainguard/static:latest-glibc" + importpath = "./cmd/logrotate" + working_dir = path.module +} + +resource "cosign_sign" "logrotate-image" { + image = ko_build.logrotate-image.image_ref +} + +resource "google_cloud_run_v2_service" "recorder-service" { + for_each = var.regions + + provider = google-beta # For empty_dir + project = var.project_id + name = var.name + location = each.key + // This service should only be called by our Pub/Sub + // subscription, so flag it as internal only. + ingress = "INGRESS_TRAFFIC_INTERNAL_ONLY" + + launch_stage = "BETA" + + template { + vpc_access { + network_interfaces { + network = each.value.network + subnetwork = each.value.subnet + } + egress = "ALL_TRAFFIC" // This should not egress + } + + service_account = google_service_account.recorder.email + containers { + image = cosign_sign.recorder-image.signed_ref + + ports { + container_port = 8080 + } + + env { + name = "LOG_PATH" + value = "/logs" + } + volume_mounts { + name = "logs" + mount_path = "/logs" + } + } + containers { + image = cosign_sign.logrotate-image.signed_ref + + env { + name = "BUCKET" + value = google_storage_bucket.recorder[each.key].url + } + env { + name = "LOG_PATH" + value = "/logs" + } + volume_mounts { + name = "logs" + mount_path = "/logs" + } + } + volumes { + name = "logs" + empty_dir {} + } + } +} + +resource "random_id" "trigger-suffix" { + for_each = local.regional-types + byte_length = 2 +} + +// Create a trigger for each region x type that sends events to the recorder service. +module "triggers" { + for_each = local.regional-types + + source = "../cloudevent-trigger" + + name = "${var.name}-${random_id.trigger-suffix[each.key].hex}" + project_id = var.project_id + broker = var.broker[each.value.region] + filter = { "type" : each.value.type } + + depends_on = [google_cloud_run_v2_service.recorder-service] + private-service = { + region = each.value.region + name = google_cloud_run_v2_service.recorder-service[each.value.region].name + } +} diff --git a/cloudevent-recorder/variables.tf b/cloudevent-recorder/variables.tf new file mode 100644 index 00000000..6dd3f1da --- /dev/null +++ b/cloudevent-recorder/variables.tf @@ -0,0 +1,45 @@ +variable "project_id" { + type = string +} + +variable "name" { + type = string +} + +variable "location" { + default = "US" + description = "The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS." +} + +variable "provisioner" { + type = string + description = "The identity as which this module will be applied (so it may be granted permission to 'act as' the DTS service account). This should be in the form expected by an IAM subject (e.g. user:sally@example.com)" +} + +variable "retention-period" { + type = number + description = "The number of days to retain data in BigQuery." +} + +variable "deletion_protection" { + default = true + description = "Whether to enable deletion protection on data resources." +} + +variable "regions" { + description = "A map from region names to a network and subnetwork. A recorder service and cloud storage bucket (into which the service writes events) will be created in each region." + type = map(object({ + network = string + subnet = string + })) +} + +variable "broker" { + type = map(string) + description = "A map from each of the input region names to the name of the Broker topic in that region." +} + +variable "types" { + type = map(string) + description = "A map from cloudevent types to the BigQuery schema associated with them." +} diff --git a/go.mod b/go.mod index 2a668680..b92897e8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/cloudevents/sdk-go/v2 v2.14.0 github.com/google/go-cmp v0.6.0 github.com/kelseyhightower/envconfig v1.4.0 + gocloud.dev v0.35.0 + golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb golang.org/x/oauth2 v0.15.0 google.golang.org/api v0.152.0 knative.dev/pkg v0.0.0-20231204120332-9386ad6703ee @@ -17,17 +19,19 @@ require ( cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.5 // indirect + cloud.google.com/go/storage v1.35.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.4.0 // indirect + github.com/google/wire v0.5.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - go.uber.org/multierr v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.16.0 // indirect golang.org/x/net v0.19.0 // indirect @@ -35,9 +39,10 @@ require ( golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/go.sum b/go.sum index 7fb5cf4d..9fcdef47 100644 --- a/go.sum +++ b/go.sum @@ -11,7 +11,49 @@ cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= cloud.google.com/go/kms v1.15.5/go.mod h1:cU2H5jnp6G2TDpUGZyqTCoy1n16fbubHZjmVXSMtwDI= cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g= cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc= +cloud.google.com/go/storage v1.35.1 h1:B59ahL//eDfx2IIKFBeT5Atm9wnNmj3+8xG/W4WB//w= +cloud.google.com/go/storage v1.35.1/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.48.3 h1:btYjT+opVFxUbRz+qSCjJe07cdX82BHmMX/FXYmoL7g= +github.com/aws/aws-sdk-go v1.48.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go-v2 v1.23.1 h1:qXaFsOOMA+HsZtX8WoCa+gJnbyW7qyFFBlPqvTSzbaI= +github.com/aws/aws-sdk-go-v2 v1.23.1/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 h1:ZY3108YtBNq96jNZTICHxN1gSBSbnvIdYwwqnvCV4Mc= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1/go.mod h1:t8PYl/6LzdAqsU4/9tz28V/kU+asFePvpOMkdul0gEQ= +github.com/aws/aws-sdk-go-v2/config v1.25.5 h1:UGKm9hpQS2hoK8CEJ1BzAW8NbUpvwDJJ4lyqXSzu8bk= +github.com/aws/aws-sdk-go-v2/config v1.25.5/go.mod h1:Bf4gDvy4ZcFIK0rqDu1wp9wrubNba2DojiPB2rt6nvI= +github.com/aws/aws-sdk-go-v2/credentials v1.16.4 h1:i7UQYYDSJrtc30RSwJwfBKwLFNnBTiICqAJ0pPdum8E= +github.com/aws/aws-sdk-go-v2/credentials v1.16.4/go.mod h1:Kdh/okh+//vQ/AjEt81CjvkTo64+/zIE4OewP7RpfXk= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 h1:KehRNiVzIfAcj6gw98zotVbb/K67taJE0fkfgM6vzqU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5/go.mod h1:VhnExhw6uXy9QzetvpXDolo1/hjhx4u9qukBGkuUwjs= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.2 h1:3q7vcLhq6JXqTLPpPuDJgw3f+DFqd4p+BWL2DlplRPc= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.2/go.mod h1:9aqZoo/OeMBK/Nf3wzQzTlM92u7Bip256GHpY0oQbX4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 h1:LAm3Ycm9HJfbSCd5I+wqC2S9Ej7FPrgr5CQoOljJZcE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4/go.mod h1:xEhvbJcyUf/31yfGSQBe01fukXwXJ0gxDp7rLfymWE0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.4 h1:4GV0kKZzUxiWxSVpn/9gwR0g21NF1Jsyduzo9rHgC/Q= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.4/go.mod h1:dYvTNAggxDZy6y1AF7YDwXsPuHFy/VNEpEI/2dWK9IU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1 h1:uR9lXYjdPX0xY+NhvaJ4dD8rpSRz5VY81ccIIoNG+lw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.4 h1:40Q4X5ebZruRtknEZH/bg91sT5pR853F7/1X9QRbI54= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.4/go.mod h1:u77N7eEECzUv7F0xl2gcfK/vzc8wcjWobpy+DcrLJ5E= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 h1:rpkF4n0CyFcrJUG/rNNohoTmhtWlFTRI4BsZOh9PvLs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.4 h1:6DRKQc+9cChgzL5gplRGusI5dBGeiEod4m/pmGbcX48= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.4/go.mod h1:s8ORvrW4g4v7IvYKIAoBg17w3GQ+XuwXDXYrQ5SkzU0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.4 h1:rdovz3rEu0vZKbzoMYPTehp0E8veoE9AyfzqCr5Eeao= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.4/go.mod h1:aYCGNjyUCUelhofxlZyj63srdxWUSsBSGg5l6MCuXuE= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.4 h1:o3DcfCxGDIT20pTbVKVhp3vWXOj/VvgazNJvumWeYW0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.4/go.mod h1:Uy0KVOxuTK2ne+/PKQ+VvEeWmjMMksE17k/2RK/r5oM= +github.com/aws/aws-sdk-go-v2/service/s3 v1.44.0 h1:FJTWR2nP1ddLIbk4n7Glw8wGbeWGHaViUwADPzE/EBo= +github.com/aws/aws-sdk-go-v2/service/s3 v1.44.0/go.mod h1:dqJ5JBL0clzgHriH35Amx3LRFY6wNIPUX7QO/BerSBo= +github.com/aws/aws-sdk-go-v2/service/sso v1.17.3 h1:CdsSOGlFF3Pn+koXOIpTtvX7st0IuGsZ8kJqcWMlX54= +github.com/aws/aws-sdk-go-v2/service/sso v1.17.3/go.mod h1:oA6VjNsLll2eVuUoF2D+CMyORgNzPEW/3PyUdq6WQjI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.1 h1:cbRqFTVnJV+KRpwFl76GJdIZJKKCdTPnjUZ7uWh3pIU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.1/go.mod h1:hHL974p5auvXlZPIjJTblXJpbkfK4klBczlsEaMCGVY= +github.com/aws/aws-sdk-go-v2/service/sts v1.25.4 h1:yEvZ4neOQ/KpUqyR+X0ycUTW/kVRNR4nDZ38wStHGAA= +github.com/aws/aws-sdk-go-v2/service/sts v1.25.4/go.mod h1:feTnm2Tk/pJxdX+eooEsxvlvTWBvDm6CasRZ+JOs2IY= +github.com/aws/smithy-go v1.17.0 h1:wWJD7LX6PBV6etBUwO0zElG0nWN9rUhp0WdYeHSHAaI= +github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= @@ -30,7 +72,6 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -40,6 +81,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -51,16 +93,27 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-replayers/grpcreplay v1.1.0 h1:S5+I3zYyZ+GQz68OfbURDdt/+cSMqCK1wrvNx7WBzTE= +github.com/google/go-replayers/grpcreplay v1.1.0/go.mod h1:qzAvJ8/wi57zq7gWqaE6AwLM6miiXUQwP1S+I9icmhk= +github.com/google/go-replayers/httpreplay v1.2.0 h1:VM1wEyyjaoU53BwrOnaf9VhAyQQEEioJvFYxYcLRKzk= +github.com/google/go-replayers/httpreplay v1.2.0/go.mod h1:WahEFFZZ7a1P4VM1qEeHy+tME4bwyqPcwWbNlUI1Mcg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= +github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= +github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= +github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -83,29 +136,38 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +gocloud.dev v0.35.0 h1:x/Gtt5OJdT4j+ir1AXAIXb7bBnFawXAAaJptCUGk3HU= +gocloud.dev v0.35.0/go.mod h1:wbyF+BhfdtLWyUtVEWRW13hFLb1vXnV2ovEhYGQe3ck= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -114,17 +176,25 @@ golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCA golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= @@ -133,21 +203,27 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/api v0.152.0 h1:t0r1vPnfMc260S2Ci+en7kfCZaLOPs5KI0sVV/6jZrY= google.golang.org/api v0.152.0/go.mod h1:3qNJX5eOmhiWYc67jRA/3GsDw97UFb5ivv7Y2PrriAY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= -google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo= -google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f h1:Vn+VyHU5guc9KjB5KrjI2q0wCOWEOIh0OEsleqakHJg= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f/go.mod h1:nWSwAFPb+qfNJXsoeO3Io7zf4tMSfN8EA8RlDA04GhY= +google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY= +google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI= google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/pkg/rotate/blob.go b/pkg/rotate/blob.go new file mode 100644 index 00000000..554ad819 --- /dev/null +++ b/pkg/rotate/blob.go @@ -0,0 +1,178 @@ +/* +Copyright 2023 Chainguard, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package rotate + +import ( + "bufio" + "context" + "fmt" + "io" + "io/fs" + "log" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "gocloud.dev/blob" + + // Add gcsblob support that we need to support gs:// prefixes + _ "gocloud.dev/blob/gcsblob" +) + +type Uploader interface { + Run(ctx context.Context) error +} + +func NewUploader(source, bucket string, flushInterval time.Duration) Uploader { + return &uploader{ + source: source, + bucket: bucket, + flushInterval: flushInterval, + } +} + +type uploader struct { + source string + bucket string + flushInterval time.Duration +} + +func (u *uploader) Run(ctx context.Context) error { + log.Printf("Uploading combined logs from %s to %s every %g minutes", u.source, u.bucket, u.flushInterval.Minutes()) + + done := false + + for { + // This must be Background since we need to be able to upload even + // after receiving SIGTERM. + bgCtx := context.Background() + bucket, err := blob.OpenBucket(bgCtx, u.bucket) + if err != nil { + return err + } + defer bucket.Close() + + fileName := strconv.FormatInt(time.Now().UnixNano(), 10) + + fileMap := make(map[string][]string) + processed := 0 + + if err := filepath.WalkDir(u.source, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + // Skip non-regular files. + if !d.Type().IsRegular() { + return nil + } + relPath, err := filepath.Rel(u.source, path) + if err != nil { + return err + } + dir, base := filepath.Split(relPath) + if _, ok := fileMap[dir]; !ok { + fileMap[dir] = []string{base} + } else { + fileMap[dir] = append(fileMap[dir], base) + } + + return nil + }); err != nil { + return err + } + + for dir, files := range fileMap { + // Setup the GCS object with the filename to write to + writer, err := bucket.NewWriter(bgCtx, filepath.Join(dir, fileName), nil) + if err != nil { + return err + } + + for _, f := range files { + if err := u.BufferWriteToBucket(writer, filepath.Join(u.source, dir, f)); err != nil { + return fmt.Errorf("failed to upload file to blobstore: %s, %w", filepath.Join(dir, fileName), err) + } + processed++ + } + + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to close blob file: %s %w", fileName, err) + } + + for _, f := range files { + path := filepath.Join(u.source, dir, f) + if err := os.Remove(path); err != nil { + return fmt.Errorf("failed to delete file: %s %w", path, err) + } + } + } + + if processed > 0 { + log.Printf("Processed %d files to blobstore", processed) + } + if done { + log.Printf("Exiting flush Run loop") + return nil + } + select { + case <-time.After(u.flushInterval): + case <-ctx.Done(): + log.Printf("Flushing one more time") + done = true + } + } +} + +func Upload(ctx context.Context, fr io.Reader, bucket, fileName string) error { + b, err := blob.OpenBucket(ctx, bucket) + if err != nil { + return err + } + defer b.Close() + // Setup the blob with the filename to write to + writer, err := b.NewWriter(ctx, fileName, nil) + if err != nil { + return err + } + n, err := writer.ReadFrom(fr) + if err != nil { + return err + } + fmt.Printf("Wrote %d bytes\n", n) + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to close blob file %w", err) + } + return nil +} + +func (u *uploader) BufferWriteToBucket(writer *blob.Writer, src string) (err error) { + f, err := os.Open(src) + if err != nil { + return err + } + + defer func() { + ferr := f.Close() + if ferr != nil { + err = fmt.Errorf("failed to close source file: %s %w", src, err) + } + }() + + s := bufio.NewScanner(f) + + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if len(line) == 0 { + continue + } + if _, err := writer.Write([]byte(line + "\n")); err != nil { + return err + } + } + return nil +} diff --git a/pkg/rotate/blob_test.go b/pkg/rotate/blob_test.go new file mode 100644 index 00000000..511d0ec2 --- /dev/null +++ b/pkg/rotate/blob_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2023 Chainguard, Inc. +SPDX-License-Identifier: Apache-2.0 +*/ + +package rotate + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "gocloud.dev/blob" + _ "gocloud.dev/blob/fileblob" + "golang.org/x/exp/maps" +) + +var ( + wantBeforeBlobs = map[string]string{ + "unit/test/0": "UNIT TEST: 0\n", + "unit-test-1": "UNIT TEST: 1\n", + "unit/test/2": "UNIT TEST: 2\n", + "unit-test-3": "UNIT TEST: 3\n", + "unit/test/4": "UNIT TEST: 4\n", + } + + wantBeforeCombineBlobs = []string{ + "UNIT TEST: 1\nUNIT TEST: 3\n", + "UNIT TEST: 0\nUNIT TEST: 2\nUNIT TEST: 4\n", + } + + wantAfterCombineBlobs = []string{ + "LAST UT\n", + "UNIT TEST: 1\nUNIT TEST: 3\n", + "UNIT TEST: 0\nUNIT TEST: 2\nUNIT TEST: 4\n", + } +) + +func TestBlobUploader(t *testing.T) { + dir := t.TempDir() + blobDir := t.TempDir() + + cancelCtx, cancel := context.WithCancel(context.Background()) + bucketName := "file://" + blobDir + bucket, err := blob.OpenBucket(cancelCtx, bucketName) + if err != nil { + t.Fatalf("Failed to create a bucket: %v", err) + } + defer os.RemoveAll(dir) // clean up + + uploader := NewUploader(dir, bucketName, 1*time.Minute) + + // Create a few files there to be uploaded + for i := 0; i < 5; i++ { + var filename string + if i%2 == 0 { + filename = fmt.Sprintf("%s/unit/test/%d", dir, i) + } else { + filename = fmt.Sprintf("%s/unit-test-%d", dir, i) + } + if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil { + t.Errorf("MkdirAll() = %v", err) + } + contents := fmt.Sprintf("UNIT TEST: %d", i) + err := os.WriteFile(filename, []byte(contents), 0600) + if err != nil { + t.Errorf("Failed to write file %d: %v\n", i, err) + } + } + for i, b := range []string{"", "\n"} { + x := i + 5 + var filename string + if i%2 == 0 { + filename = fmt.Sprintf("%s/unit/test/%d", dir, x) + } else { + filename = fmt.Sprintf("%s/unit-test-%d", dir, x) + } + err := os.WriteFile(filename, []byte(b), 0600) + if err != nil { + t.Errorf("Failed to write file %d: %v\n", x, err) + } + } + + // Start the uploader + go uploader.Run(cancelCtx) + + // Give a little time for uploads, then make sure we have all the files + // there. + time.Sleep(3 * time.Second) + blobsBefore, err := getFiles(cancelCtx, bucket) + if err != nil { + t.Errorf("Failed to read files from blobstore: %v", err) + } + less := func(a, b string) bool { return a < b } + if !cmp.Equal(maps.Values(blobsBefore), wantBeforeCombineBlobs, cmpopts.SortSlices(less)) { + t.Errorf("Did not get expected blobs '%s'\n%v\n%v", cmp.Diff(wantBeforeCombineBlobs, maps.Values(blobsBefore), cmpopts.SortSlices(less)), wantBeforeCombineBlobs, maps.Values(blobsBefore)) + } + + // Then write one more file and trigger cancel, so this too should be now + // written there. + filename := fmt.Sprintf("%s/last", dir) + err = os.WriteFile(filename, []byte("LAST UT"), 0600) + if err != nil { + t.Errorf("Failed to write: %v", err) + } + + // Now one more check, make sure that the file does not get + // immediately uploaded. So check that the files have not been + // uploaded. Then trigger shutdown and ensure the file then gets uploaded + // as 'not per schedule', but aggressive flush during shutdown. + blobsAfter, err := getFiles(cancelCtx, bucket) + if err != nil { + t.Errorf("Failed to read files from blobstore: %v", err) + } + if !cmp.Equal(maps.Values(blobsAfter), wantBeforeCombineBlobs, cmpopts.SortSlices(less)) { + t.Errorf("Did not get expected blobs '%s'\n%v\n%v", cmp.Diff(wantBeforeCombineBlobs, maps.Values(blobsAfter), cmpopts.SortSlices(less)), wantBeforeCombineBlobs, maps.Values(blobsAfter)) + } + + cancel() + time.Sleep(3 * time.Second) + blobsAfter, err = getFiles(cancelCtx, bucket) + if err != nil { + t.Errorf("Failed to read files from blobstore: %v", err) + } + if !cmp.Equal(maps.Values(blobsAfter), wantAfterCombineBlobs, cmpopts.SortSlices(less)) { + t.Errorf("Did not get expected blobs '%s'\n%v\n%v", cmp.Diff(wantAfterCombineBlobs, maps.Values(blobsAfter), cmpopts.SortSlices(less)), wantAfterCombineBlobs, maps.Values(blobsAfter)) + } +} + +func TestBlobUploaderNoop(t *testing.T) { + dir := t.TempDir() + defer os.RemoveAll(dir) // clean up + blobDir := t.TempDir() + + bucketName := "file://" + blobDir + + uploader := NewUploader(dir, bucketName, 1*time.Minute) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*80) + defer cancel() + uploader.Run(ctx) +} + +func TestBlobUpload(t *testing.T) { + blobDir := t.TempDir() + + ctx := context.Background() + bucketName := "file://" + blobDir + bucket, err := blob.OpenBucket(ctx, bucketName) + if err != nil { + t.Fatalf("Failed to create a bucket: %v", err) + } + + // Upload the files + for i := 0; i < 5; i++ { + var filename string + if i%2 == 0 { + filename = fmt.Sprintf("unit/test/%d", i) + } else { + filename = fmt.Sprintf("unit-test-%d", i) + } + contents := fmt.Sprintf("UNIT TEST: %d\n", i) + buf := bytes.NewBuffer([]byte(contents)) + if err := Upload(ctx, buf, bucketName, filename); err != nil { + t.Errorf("Failed to upload file %d: %v\n", i, err) + } + } + + blobsBefore, err := getFiles(ctx, bucket) + if err != nil { + t.Errorf("Failed to read files from blobstore: %v", err) + } + if !cmp.Equal(blobsBefore, wantBeforeBlobs) { + t.Errorf("Did not get expected blobs %s", cmp.Diff(wantBeforeBlobs, blobsBefore)) + } +} + +func getFiles(ctx context.Context, bucket *blob.Bucket) (map[string]string, error) { + blobs := map[string]string{} + iter := bucket.List(nil) + for { + obj, err := iter.Next(ctx) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return blobs, err + } + data, err := bucket.ReadAll(ctx, obj.Key) + if err != nil { + return blobs, err + } + blobs[obj.Key] = string(data) + } + return blobs, nil +}