From acccb2f9fa4ed052a5cf7ec2a58db4ac0cf6d761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 29 Mar 2024 13:31:36 +0100 Subject: [PATCH] Add streaming BigQuery Loader modules v2 --- README.md | 42 +++--- main.tf | 204 +++++++++--------------------- outputs.tf | 32 +++-- templates/bq-mutator.sh.tmpl | 43 ------- templates/bq-repeater.sh.tmpl | 20 --- templates/bq-streamloader.sh.tmpl | 17 --- templates/config.json.tmpl | 71 ++++++----- templates/iglu_resolver.json.tmpl | 4 +- templates/startup-script.sh.tmpl | 36 +++++- variables.tf | 79 ++++++------ 10 files changed, 214 insertions(+), 334 deletions(-) delete mode 100644 templates/bq-mutator.sh.tmpl delete mode 100644 templates/bq-repeater.sh.tmpl delete mode 100644 templates/bq-streamloader.sh.tmpl diff --git a/README.md b/README.md index 35bd873..940b84e 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # terraform-google-bigquery-loader-pubsub-ce -A Terraform module which deploys the requisite micro-services for loading BigQuery on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04. +A Terraform module which deploys the BigQuery Loader application on Google running on top of Compute Engine. If you want to use a custom image for this deployment you will need to ensure it is based on top of Ubuntu 20.04. ## Telemetry @@ -20,13 +20,7 @@ For details on what information is collected please see this module: https://git ## Usage -This module will deploy three seperate instance groups: - -1. `mutator`: Attempts to create the events table if it doesn't exist and then listens for new `types` to update the table with as custom events and entities are tracked -2. `repeater`: Events that were sent with custom `events` and `entities` that have not yet been added to the events table will be re-tried later by the repeater -3. `streamloader`: Core application which pulls data from an Enriched events topic and loads into BigQuery - -The mutator is deployed as a `singleton` instance but both the `repeater` and `streamloader` can be scaled horizontally if higher throughput is needed. +The BigQuery Loader reads data from a Snowplow Enriched output PubSub topic and writes in realtime to BigQuery events table. ```hcl # NOTE: Needs to be fed by the enrich module with valid Snowplow Events @@ -49,12 +43,6 @@ resource "google_bigquery_dataset" "pipeline_db" { location = var.region } -resource "google_storage_bucket" "dead_letter_bucket" { - name = "bq-loader-dead-letter" - location = var.region - force_destroy = true -} - module "bigquery_loader_pubsub" { source = "snowplow-devops/bigquery-loader-pubsub-ce/google" @@ -69,7 +57,6 @@ module "bigquery_loader_pubsub" { input_topic_name = module.enriched_topic.name bad_rows_topic_name = module.bad_rows_topic.name - gcs_dead_letter_bucket_name = google_storage_bucket.dead_letter_bucket.name bigquery_dataset_id = google_bigquery_dataset.pipeline_db.dataset_id ssh_key_pairs = [] @@ -115,19 +102,14 @@ module "bigquery_loader_pubsub" { | [google_bigquery_dataset_iam_member.dataset_bigquery_data_editor_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_dataset_iam_member) | resource | | [google_compute_firewall.egress](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource | | [google_compute_firewall.ingress_ssh](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource | +| [google_compute_firewall.ingress_health_check](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/compute_firewall) | resource | | [google_project_iam_member.sa_bigquery_data_editor](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | | [google_project_iam_member.sa_logging_log_writer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | | [google_project_iam_member.sa_pubsub_publisher](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | | [google_project_iam_member.sa_pubsub_subscriber](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | | [google_project_iam_member.sa_pubsub_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | -| [google_project_iam_member.sa_storage_object_viewer](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/project_iam_member) | resource | -| [google_pubsub_subscription.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource | | [google_pubsub_subscription.input](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource | -| [google_pubsub_subscription.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_subscription) | resource | -| [google_pubsub_topic.failed_inserts](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource | -| [google_pubsub_topic.types](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/pubsub_topic) | resource | | [google_service_account.sa](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/service_account) | resource | -| [google_storage_bucket_iam_binding.dead_letter_storage_object_admin_binding](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket_iam_binding) | resource | ## Inputs @@ -147,31 +129,37 @@ module "bigquery_loader_pubsub" { | [bigquery\_partition\_column](#input\_bigquery\_partition\_column) | The partition column to use in the dataset | `string` | `"collector_tstamp"` | no | | [bigquery\_require\_partition\_filter](#input\_bigquery\_require\_partition\_filter) | Whether to require a filter on the partition column in all queries | `bool` | `true` | no | | [bigquery\_table\_id](#input\_bigquery\_table\_id) | The ID of the table within a dataset to load data into (will be created if it doesn't exist) | `string` | `"events"` | no | +| [bigquery\_service\_account\_json\_b64](#input\_bigquery\_service\_account\_json\_b64) | Custom credentials (as base64 encoded service account key) instead of default service account assigned to the loader's compute group | `string` | `""` | no | | [custom\_iglu\_resolvers](#input\_custom\_iglu\_resolvers) | The custom Iglu Resolvers that will be used by the loader to resolve and validate events |
list(object({
name = string
priority = number
uri = string
api_key = string
vendor_prefixes = list(string)
}))
| `[]` | no | | [default\_iglu\_resolvers](#input\_default\_iglu\_resolvers) | The default Iglu Resolvers that will be used by the loader to resolve and validate events |
list(object({
name = string
priority = number
uri = string
api_key = string
vendor_prefixes = list(string)
}))
|
[
{
"api_key": "",
"name": "Iglu Central",
"priority": 10,
"uri": "http://iglucentral.com",
"vendor_prefixes": []
},
{
"api_key": "",
"name": "Iglu Central - Mirror 01",
"priority": 20,
"uri": "http://mirror01.iglucentral.com",
"vendor_prefixes": []
}
]
| no | -| [gcp\_logs\_enabled](#input\_gcp\_logs\_enabled) | Whether application logs should be reported to GCP Logging | `bool` | `true` | no | +| [iglu\_cache\_size](#input\_iglu\_cache\_size) | The size of cache used by Iglu Resolvers | `number` | `500` | no | +| [iglu\_cache\_ttl\_seconds](#input\_iglu\_cache\_ttl\_seconds) | Duration in seconds, how long should entries be kept in Iglu Resolvers cache before they expire | `number` | `600` | no | | [java\_opts](#input\_java\_opts) | Custom JAVA Options | `string` | `"-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"` | no | | [labels](#input\_labels) | The labels to append to this resource | `map(string)` | `{}` | no | -| [machine\_type\_mutator](#input\_machine\_type\_mutator) | The machine type to use | `string` | `"e2-small"` | no | -| [machine\_type\_repeater](#input\_machine\_type\_repeater) | The machine type to use | `string` | `"e2-small"` | no | -| [machine\_type\_streamloader](#input\_machine\_type\_streamloader) | The machine type to use | `string` | `"e2-small"` | no | +| [machine\_type](#input\_machine\_type) | The machine type to use | `string` | `"e2-small"` | no | | [ssh\_block\_project\_keys](#input\_ssh\_block\_project\_keys) | Whether to block project wide SSH keys | `bool` | `true` | no | | [ssh\_ip\_allowlist](#input\_ssh\_ip\_allowlist) | The list of CIDR ranges to allow SSH traffic from | `list(any)` |
[
"0.0.0.0/0"
]
| no | | [ssh\_key\_pairs](#input\_ssh\_key\_pairs) | The list of SSH key-pairs to add to the servers |
list(object({
user_name = string
public_key = string
}))
| `[]` | no | | [subnetwork](#input\_subnetwork) | The name of the sub-network to deploy within; if populated will override the 'network' setting | `string` | `""` | no | -| [target\_size\_repeater](#input\_target\_size\_repeater) | The number of servers to deploy | `number` | `1` | no | -| [target\_size\_streamloader](#input\_target\_size\_streamloader) | The number of servers to deploy | `number` | `1` | no | +| [target\_size](#input\_target\_size) | The number of servers to deploy | `number` | `1` | no | | [telemetry\_enabled](#input\_telemetry\_enabled) | Whether or not to send telemetry information back to Snowplow Analytics Ltd | `bool` | `true` | no | | [ubuntu\_20\_04\_source\_image](#input\_ubuntu\_20\_04\_source\_image) | The source image to use which must be based of of Ubuntu 20.04; by default the latest community version is used | `string` | `""` | no | | [user\_provided\_id](#input\_user\_provided\_id) | An optional unique identifier to identify the telemetry events emitted by this stack | `string` | `""` | no | +| [webhook\_collector](#input\_webhook\_collector) | Collector address used to gather monitoring alerts | `string` | `""` | no | +| [skip\_schemas](#input\_skip\_schemas) | The list of schema keys which should be skipped (not loaded) to the warehouse | `list(string)` | `[]` | no | +| [healthcheck\_enabled](#input\_healthcheck\_enabled) | Whether or not to enable health check probe for GCP instance group | `bool` | `true` | no | ## Outputs | Name | Description | |------|-------------| +| [health\_check\_id](#output\_health\_check\_id) | Identifier for the health check on the instance group | +| [health\_check\_self\_link](#output\_health\_check\_self\_link) | The URL for the health check on the instance group | | [instance\_group\_url](#output\_instance\_group\_url) | The full URL of the instance group created by the manager | | [manager\_id](#output\_manager\_id) | Identifier for the instance group manager | | [manager\_self\_link](#output\_manager\_self\_link) | The URL for the instance group manager | +| [named\_port\_http](#output\_named\_port\_http) | The name of the port exposed by the instance group | +| [named\_port\_value](#output\_named\_port\_value) | The named port value (e.g. 8080) | # Copyright and license diff --git a/main.tf b/main.tf index 27b6c1d..28d072e 100644 --- a/main.tf +++ b/main.tf @@ -2,8 +2,9 @@ locals { module_name = "bigquery-loader-pubsub-ce" module_version = "0.3.0" - app_name = "snowplow-bigquery-loader" - app_version = var.app_version + app_name = "snowplow-bigquery-loader" + app_version = var.app_version + ingress_port = 8000 local_labels = { name = var.name @@ -32,10 +33,6 @@ module "telemetry" { app_version = local.app_version module_name = local.module_name module_version = local.module_version - - app_name_override_1 = "snowplow-bigquery-mutator" - app_name_override_2 = "snowplow-bigquery-repeater" - app_name_override_3 = "snowplow-bigquery-streamloader" } # --- IAM: Service Account setup @@ -75,20 +72,6 @@ resource "google_project_iam_member" "sa_bigquery_data_editor" { member = "serviceAccount:${google_service_account.sa.email}" } -resource "google_project_iam_member" "sa_storage_object_viewer" { - project = var.project_id - role = "roles/storage.objectViewer" - member = "serviceAccount:${google_service_account.sa.email}" -} - -resource "google_storage_bucket_iam_binding" "dead_letter_storage_object_admin_binding" { - bucket = var.gcs_dead_letter_bucket_name - role = "roles/storage.objectAdmin" - members = [ - "serviceAccount:${google_service_account.sa.email}" - ] -} - resource "google_bigquery_dataset_iam_member" "dataset_bigquery_data_editor_binding" { project = var.project_id dataset_id = var.bigquery_dataset_id @@ -98,6 +81,21 @@ resource "google_bigquery_dataset_iam_member" "dataset_bigquery_data_editor_bind # --- CE: Firewall rules +resource "google_compute_firewall" "ingress_health_check" { + count = var.healthcheck_enabled == true ? 1 : 0 + name = "${var.name}-traffic-in" + + network = var.network + target_tags = [var.name] + + allow { + protocol = "tcp" + ports = ["${local.ingress_port}"] + } + + source_ranges = ["130.211.0.0/22", "35.191.0.0/16"] +} + resource "google_compute_firewall" "ingress_ssh" { name = "${var.name}-ssh-in" @@ -134,16 +132,6 @@ resource "google_compute_firewall" "egress" { # --- PubSub: Topics and subscriptions -resource "google_pubsub_topic" "types" { - name = "${var.name}-types-topic" - labels = local.labels -} - -resource "google_pubsub_topic" "failed_inserts" { - name = "${var.name}-failed-inserts-topic" - labels = local.labels -} - resource "google_pubsub_subscription" "input" { name = "${var.name}-input" topic = var.input_topic_name @@ -155,28 +143,6 @@ resource "google_pubsub_subscription" "input" { labels = local.labels } -resource "google_pubsub_subscription" "types" { - name = "${var.name}-types" - topic = google_pubsub_topic.types.name - - expiration_policy { - ttl = "" - } - - labels = local.labels -} - -resource "google_pubsub_subscription" "failed_inserts" { - name = "${var.name}-failed-inserts" - topic = google_pubsub_topic.failed_inserts.name - - expiration_policy { - ttl = "" - } - - labels = local.labels -} - # --- CE: Instance group setup locals { @@ -218,115 +184,69 @@ locals { local.resolvers_closed ]) - iglu_resolver = templatefile("${path.module}/templates/iglu_resolver.json.tmpl", { resolvers = jsonencode(local.resolvers) }) - - config = templatefile("${path.module}/templates/config.json.tmpl", { - project_id = var.project_id - - # config: loader - input_subscription_name = google_pubsub_subscription.input.name - dataset_id = var.bigquery_dataset_id - table_id = var.bigquery_table_id - bad_rows_topic_name = var.bad_rows_topic_name - types_topic_name = google_pubsub_topic.types.name - failed_inserts_topic_name = google_pubsub_topic.failed_inserts.name - - # config: mutator - types_sub_name = google_pubsub_subscription.types.name + iglu_resolver = templatefile("${path.module}/templates/iglu_resolver.json.tmpl", { + resolvers = jsonencode(local.resolvers) + cache_size = var.iglu_cache_size + cache_ttl = var.iglu_cache_ttl_seconds + }) - # config: repeater - failed_inserts_sub_name = google_pubsub_subscription.failed_inserts.name - gcs_dead_letter_bucket_name = var.gcs_dead_letter_bucket_name + hocon = templatefile("${path.module}/templates/config.json.tmpl", { + project_id = var.project_id + input_subscription_id = google_pubsub_subscription.input.id + dataset_id = var.bigquery_dataset_id + table_id = var.bigquery_table_id + bad_rows_topic_id = var.bad_rows_topic_id + bigquery_service_account_json_b64 = var.bigquery_service_account_json_b64 + + skip_schemas = jsonencode(var.skip_schemas) + webhook_collector = var.webhook_collector + tags = jsonencode(var.labels) + + telemetry_disable = !var.telemetry_enabled + telemetry_collector_uri = join("", module.telemetry.*.collector_uri) + telemetry_collector_port = 443 + telemetry_secure = true + telemetry_user_provided_id = var.user_provided_id + telemetry_auto_gen_id = join("", module.telemetry.*.auto_generated_id) + telemetry_module_name = local.module_name + telemetry_module_version = local.module_version }) - config_base64 = base64encode(local.config) - iglu_resolver_base64 = base64encode(local.iglu_resolver) - - applications = { - mutator = { - metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", { - application_script = templatefile("${path.module}/templates/bq-mutator.sh.tmpl", { - accept_limited_use_license = var.accept_limited_use_license - - version = local.app_version - require_partition_filter = var.bigquery_require_partition_filter - partition_column = var.bigquery_partition_column - config_base64 = local.config_base64 - iglu_resolver_base64 = local.iglu_resolver_base64 - gcp_logs_enabled = var.gcp_logs_enabled - java_opts = var.java_opts - }) - telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_1) - }) - - machine_type = var.machine_type_mutator - target_size = 1 - } - - repeater = { - metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", { - application_script = templatefile("${path.module}/templates/bq-repeater.sh.tmpl", { - accept_limited_use_license = var.accept_limited_use_license - - version = local.app_version - config_base64 = local.config_base64 - iglu_resolver_base64 = local.iglu_resolver_base64 - gcp_logs_enabled = var.gcp_logs_enabled - java_opts = var.java_opts - }) - telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_2) - }) - - machine_type = var.machine_type_repeater - target_size = var.target_size_repeater - } - - streamloader = { - metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", { - application_script = templatefile("${path.module}/templates/bq-streamloader.sh.tmpl", { - accept_limited_use_license = var.accept_limited_use_license - - version = local.app_version - config_base64 = local.config_base64 - iglu_resolver_base64 = local.iglu_resolver_base64 - gcp_logs_enabled = var.gcp_logs_enabled - java_opts = var.java_opts - }) - telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_3) - }) - - machine_type = var.machine_type_streamloader - target_size = var.target_size_streamloader - } - } + startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", { + version = local.app_version + config_b64 = base64encode(local.hocon) + iglu_config_b64 = base64encode(local.iglu_resolver) + accept_limited_use_license = var.accept_limited_use_license + bigquery_service_account_json_b64 = base64decode(var.bigquery_service_account_json_b64) + telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data) + gcp_logs_enabled = var.gcp_logs_enabled + java_opts = var.java_opts + }) } module "service" { source = "snowplow-devops/service-ce/google" version = "0.1.0" - for_each = local.applications - - user_supplied_script = each.value.metadata_startup_script - name = "${var.name}-${each.key}" + user_supplied_script = local.startup_script + name = var.name instance_group_version_name = "${local.app_name}-${local.app_version}" - labels = merge( - local.labels, - { - app_project_name = each.key - } - ) + labels = local.labels region = var.region network = var.network subnetwork = var.subnetwork ubuntu_20_04_source_image = var.ubuntu_20_04_source_image - machine_type = each.value.machine_type - target_size = each.value.target_size + machine_type = var.machine_type + target_size = var.target_size ssh_block_project_keys = var.ssh_block_project_keys ssh_key_pairs = var.ssh_key_pairs service_account_email = google_service_account.sa.email associate_public_ip_address = var.associate_public_ip_address + + named_port_http = var.healthcheck_enabled == true ? "http" : "" + ingress_port = var.healthcheck_enabled == true ? local.ingress_port : -1 + health_check_path = var.healthcheck_enabled == true ? "/" : "" } diff --git a/outputs.tf b/outputs.tf index d41db11..28fef49 100644 --- a/outputs.tf +++ b/outputs.tf @@ -1,20 +1,34 @@ +output "named_port_http" { + value = module.service.named_port_http + description = "The name of the port exposed by the instance group" +} + +output "named_port_value" { + value = module.service.named_port_value + description = "The named port value (e.g. 8080)" +} + output "manager_id" { - value = { - for k, v in module.service : k => v.manager_id - } + value = module.service.manager_id description = "Identifier for the instance group manager" } output "manager_self_link" { - value = { - for k, v in module.service : k => v.manager_self_link - } + value = module.service.manager_self_link description = "The URL for the instance group manager" } output "instance_group_url" { - value = { - for k, v in module.service : k => v.instance_group_url - } + value = module.service.instance_group_url description = "The full URL of the instance group created by the manager" } + +output "health_check_id" { + value = module.service.health_check_id + description = "Identifier for the health check on the instance group" +} + +output "health_check_self_link" { + value = module.service.health_check_self_link + description = "The URL for the health check on the instance group" +} diff --git a/templates/bq-mutator.sh.tmpl b/templates/bq-mutator.sh.tmpl deleted file mode 100644 index 8665672..0000000 --- a/templates/bq-mutator.sh.tmpl +++ /dev/null @@ -1,43 +0,0 @@ -# 1. Bootstrap BigQuery table with required tables -set +e -sudo docker run \ - --name mutator-create \ - --network host \ -%{ if gcp_logs_enabled ~} - --log-driver gcplogs \ -%{ else ~} - --log-opt max-size=10m \ - --log-opt max-file=5 \ -%{ endif ~} - --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ - snowplow/snowplow-bigquery-mutator:${version} \ - create \ -%{ if require_partition_filter ~} - --requirePartitionFilter \ -%{ endif ~} -%{ if partition_column != "" ~} - --partitionColumn=${partition_column} \ -%{ endif ~} - --config ${config_base64} \ - --resolver ${iglu_resolver_base64} -set -e - -# 2. Listen for changes and mutate BigQuery table accordingly -sudo docker run \ - -d \ - --name mutator-listen \ - --restart always \ - --network host \ - --memory=$(get_application_memory_mb)m \ -%{ if gcp_logs_enabled ~} - --log-driver gcplogs \ -%{ else ~} - --log-opt max-size=10m \ - --log-opt max-file=5 \ -%{ endif ~} - --env JDK_JAVA_OPTIONS='${java_opts}' \ - --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ - snowplow/snowplow-bigquery-mutator:${version} \ - listen \ - --config ${config_base64} \ - --resolver ${iglu_resolver_base64} diff --git a/templates/bq-repeater.sh.tmpl b/templates/bq-repeater.sh.tmpl deleted file mode 100644 index c9e92df..0000000 --- a/templates/bq-repeater.sh.tmpl +++ /dev/null @@ -1,20 +0,0 @@ -sudo docker run \ - -d \ - --name repeater \ - --restart always \ - --network host \ - --memory=$(get_application_memory_mb)m \ -%{ if gcp_logs_enabled ~} - --log-driver gcplogs \ -%{ else ~} - --log-opt max-size=10m \ - --log-opt max-file=5 \ -%{ endif ~} - --env JDK_JAVA_OPTIONS='${java_opts}' \ - --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ - snowplow/snowplow-bigquery-repeater:${version} \ - --config ${config_base64} \ - --resolver ${iglu_resolver_base64} \ - --bufferSize=20 \ - --timeout=20 \ - --backoffPeriod=900 diff --git a/templates/bq-streamloader.sh.tmpl b/templates/bq-streamloader.sh.tmpl deleted file mode 100644 index 8532f81..0000000 --- a/templates/bq-streamloader.sh.tmpl +++ /dev/null @@ -1,17 +0,0 @@ -sudo docker run \ - -d \ - --name streamloader \ - --restart always \ - --network host \ - --memory=$(get_application_memory_mb)m \ -%{ if gcp_logs_enabled ~} - --log-driver gcplogs \ -%{ else ~} - --log-opt max-size=10m \ - --log-opt max-file=5 \ -%{ endif ~} - --env JDK_JAVA_OPTIONS='${java_opts}' \ - --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ - snowplow/snowplow-bigquery-streamloader:${version} \ - --config ${config_base64} \ - --resolver ${iglu_resolver_base64} diff --git a/templates/config.json.tmpl b/templates/config.json.tmpl index 0f561b9..31a7256 100644 --- a/templates/config.json.tmpl +++ b/templates/config.json.tmpl @@ -1,45 +1,48 @@ { - "projectId": ${project_id} + "input": { + "subscription": ${input_subscription_id} + } + + "output": { + + "good": { + "project": ${project_id} + "dataset": ${dataset_id} + "table": ${table_id} - "loader": { - "input": { - "subscription": ${input_subscription_name} + %{ if bigquery_service_account_json_b64 != "" ~} + "credentials": $${BIGQUERY_SERVICE_ACCOUNT_JSON} + %{ endif ~} } - "output": { - "good": { - "datasetId": ${dataset_id} - "tableId": ${table_id} - } - "bad": { - "topic": ${bad_rows_topic_name} - } - "types": { - "topic": ${types_topic_name} - } - "failedInserts": { - "topic": ${failed_inserts_topic_name} - } + + "bad": { + "topic": ${bad_rows_topic_id} } + } - "mutator": { - "input": { - "subscription": ${types_sub_name} - } - "output": { - "good": $${loader.output.good} + "skipSchemas": ${skip_schemas} + +%{ if webhook_collector != "" ~} + "monitoring": { + "webhook": { + "endpoint": "${webhook_collector}", + "tags": ${tags} } } +%{ endif ~} - "repeater": { - "input": { - "subscription": ${failed_inserts_sub_name} - } - "output": { - "good": $${loader.output.good} - "deadLetters": { - "bucket": "gs://${gcs_dead_letter_bucket_name}" - } - } + "telemetry": { + "disable": ${telemetry_disable} + "interval": 15 minutes + "method": "POST" + "collectorUri": "${telemetry_collector_uri}" + "collectorPort": ${telemetry_collector_port} + "secure": ${telemetry_secure} + "userProvidedId": "${telemetry_user_provided_id}" + "autoGeneratedId": "${telemetry_auto_gen_id}" + "moduleName": "${telemetry_module_name}" + "moduleVersion": "${telemetry_module_version}" + "instanceId": $${INSTANCE_ID} } } diff --git a/templates/iglu_resolver.json.tmpl b/templates/iglu_resolver.json.tmpl index f266ad8..cc7980f 100644 --- a/templates/iglu_resolver.json.tmpl +++ b/templates/iglu_resolver.json.tmpl @@ -1,8 +1,8 @@ { "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-2", "data": { - "cacheSize": 500, - "cacheTtl": 600, + "cacheSize": ${cache_size}, + "cacheTtl": ${cache_ttl}, "repositories": ${resolvers} } } diff --git a/templates/startup-script.sh.tmpl b/templates/startup-script.sh.tmpl index 357ddd3..de0345a 100644 --- a/templates/startup-script.sh.tmpl +++ b/templates/startup-script.sh.tmpl @@ -1,2 +1,34 @@ -${application_script} -${telemetry_script} +readonly CONFIG_DIR=/opt/snowplow/config + +sudo mkdir -p $${CONFIG_DIR} + +sudo base64 --decode << EOF > $${CONFIG_DIR}/loader.hocon +${config_b64} +EOF + +sudo base64 --decode << EOF > $${CONFIG_DIR}/iglu.hocon +${iglu_config_b64} +EOF + +sudo docker run \ + -d \ + --name bigquery-loader \ + --restart always \ + --network host \ + --memory=$(get_application_memory_mb)m \ +%{ if gcp_logs_enabled ~} + --log-driver gcplogs \ +%{ else ~} + --log-opt max-size=10m \ + --log-opt max-file=5 \ +%{ endif ~} + --mount type=bind,source=$${CONFIG_DIR},target=/snowplow/config \ + --env JDK_JAVA_OPTIONS='${java_opts}' \ + --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ +%{ if bigquery_service_account_json_b64 != "" ~} + --env BIGQUERY_SERVICE_ACCOUNT_JSON='${bigquery_service_account_json_b64}' \ +%{ endif ~} + --env INSTANCE_ID=$(get_instance_id) \ + snowplow/bigquery-loader-pubsub:${version} \ + --config /snowplow/config/loader.hocon \ + --iglu-config /snowplow/config/iglu.hocon diff --git a/variables.tf b/variables.tf index f15b05c..1064317 100644 --- a/variables.tf +++ b/variables.tf @@ -14,10 +14,11 @@ variable "name" { type = string } +//TODO still RC, wait for proper release variable "app_version" { description = "App version to use. This variable facilitates dev flow, the modules may not work with anything other than the default value." type = string - default = "1.7.0" + default = "2.0.0-rc10" } variable "project_id" { @@ -94,31 +95,13 @@ variable "java_opts" { # --- Configuration options -variable "machine_type_mutator" { +variable "machine_type" { description = "The machine type to use" type = string default = "e2-small" } -variable "machine_type_repeater" { - description = "The machine type to use" - type = string - default = "e2-small" -} - -variable "target_size_repeater" { - description = "The number of servers to deploy" - default = 1 - type = number -} - -variable "machine_type_streamloader" { - description = "The machine type to use" - type = string - default = "e2-small" -} - -variable "target_size_streamloader" { +variable "target_size" { description = "The number of servers to deploy" default = 1 type = number @@ -129,13 +112,8 @@ variable "input_topic_name" { type = string } -variable "bad_rows_topic_name" { - description = "The name of the output topic for all bad data" - type = string -} - -variable "gcs_dead_letter_bucket_name" { - description = "The name of the GCS bucket to dump unloadable events into" +variable "bad_rows_topic_id" { + description = "The id of the output topic for all bad data" type = string } @@ -150,18 +128,11 @@ variable "bigquery_table_id" { type = string } -variable "bigquery_require_partition_filter" { - description = "Whether to require a filter on the partition column in all queries" - default = true - type = bool -} - -variable "bigquery_partition_column" { - description = "The partition column to use in the dataset" - default = "collector_tstamp" +variable "bigquery_service_account_json_b64" { + description = "Custom credentials (as base64 encoded service account key) instead of default service account assigned to the loader's compute group" + default = "" type = string } - # --- Iglu Resolver variable "default_iglu_resolvers" { @@ -203,6 +174,18 @@ variable "custom_iglu_resolvers" { })) } +variable "iglu_cache_size" { + description = "The size of cache used by Iglu Resolvers" + type = number + default = 500 +} + +variable "iglu_cache_ttl_seconds" { + description = "Duration in seconds, how long should entries be kept in Iglu Resolvers cache before they expire" + type = number + default = 600 +} + # --- Telemetry variable "telemetry_enabled" { @@ -216,3 +199,23 @@ variable "user_provided_id" { type = string default = "" } + +# --- Webhook monitoring + +variable "webhook_collector" { + description = "Collector address used to gather monitoring alerts" + type = string + default = "" +} + +variable "skip_schemas" { + description = "The list of schema keys which should be skipped (not loaded) to the warehouse" + type = list(string) + default = [] +} + +variable "healthcheck_enabled" { + description = "Whether or not to enable health check probe for GCP instance group" + type = bool + default = true +}