Skip to content

Commit

Permalink
add AIRFLOW_VAR_ prefix to environment variables in data-platform blu…
Browse files Browse the repository at this point in the history
…eprints (GoogleCloudPlatform#1651)

* add AIRFLOW_VAR_ prefix to env vars in minimal blueprint

* update DAGs for env vars in minimal blueprint

* add AIRFLOW_VAR_ prefix to env vars in foundation blueprint

* update DAGs for env vars in foundation blueprint

* apply tf linting

---------

Co-authored-by: lcaggio <[email protected]>
Co-authored-by: Ludovico Magnocavallo <[email protected]>
  • Loading branch information
3 people authored Sep 8, 2023
1 parent e14789e commit 0e7cfc8
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# tfdoc:file:description Orchestration Cloud Composer definition.

locals {
env_variables = {
_env_variables = {
BQ_LOCATION = var.location
DATA_CAT_TAGS = try(jsonencode(module.common-datacatalog.tags), "{}")
DF_KMS_KEY = try(var.service_encryption_keys.dataflow, "")
Expand Down Expand Up @@ -48,6 +48,12 @@ locals {
TRF_SA_DF = module.transf-sa-df-0.email
TRF_SA_BQ = module.transf-sa-bq-0.email
}
env_variables = {
for k, v in merge(
try(var.composer_config.software_config.env_variables, null),
local._env_variables
) : "AIRFLOW_VAR_${k}" => v
}
}
module "orch-sa-cmp-0" {
source = "../../../modules/iam-service-account"
Expand All @@ -70,7 +76,7 @@ resource "google_composer_environment" "orch-cmp-0" {
software_config {
airflow_config_overrides = try(var.composer_config.software_config.airflow_config_overrides, null)
pypi_packages = try(var.composer_config.software_config.pypi_packages, null)
env_variables = merge(try(var.composer_config.software_config.env_variables, null), local.env_variables)
env_variables = local.env_variables
image_version = try(var.composer_config.software_config.image_version, null)
}
dynamic "workloads_config" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,52 @@
# Load The Dependencies
# --------------------------------------------------------------------------------

import csv
import datetime
import io
import json
import logging
import os

from airflow import models
from airflow.models.variable import Variable
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators import empty
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
DRP_PRJ = os.environ.get("DRP_PRJ")
DRP_BQ = os.environ.get("DRP_BQ")
DRP_GCS = os.environ.get("DRP_GCS")
DRP_PS = os.environ.get("DRP_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
BQ_LOCATION = Variable.get("BQ_LOCATION")
DATA_CAT_TAGS = Variable.get("DATA_CAT_TAGS", deserialize_json=True)
DWH_LAND_PRJ = Variable.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = Variable.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = Variable.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = Variable.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = Variable.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = Variable.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = Variable.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = Variable.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = Variable.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = Variable.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = Variable.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = Variable.get("DWH_PLG_GCS")
GCP_REGION = Variable.get("GCP_REGION")
DRP_PRJ = Variable.get("DRP_PRJ")
DRP_BQ = Variable.get("DRP_BQ")
DRP_GCS = Variable.get("DRP_GCS")
DRP_PS = Variable.get("DRP_PS")
LOD_PRJ = Variable.get("LOD_PRJ")
LOD_GCS_STAGING = Variable.get("LOD_GCS_STAGING")
LOD_NET_VPC = Variable.get("LOD_NET_VPC")
LOD_NET_SUBNET = Variable.get("LOD_NET_SUBNET")
LOD_SA_DF = Variable.get("LOD_SA_DF")
ORC_PRJ = Variable.get("ORC_PRJ")
ORC_GCS = Variable.get("ORC_GCS")
TRF_PRJ = Variable.get("TRF_PRJ")
TRF_GCS_STAGING = Variable.get("TRF_GCS_STAGING")
TRF_NET_VPC = Variable.get("TRF_NET_VPC")
TRF_NET_SUBNET = Variable.get("TRF_NET_SUBNET")
TRF_SA_DF = Variable.get("TRF_SA_DF")
TRF_SA_BQ = Variable.get("TRF_SA_BQ")
DF_KMS_KEY = Variable.get("DF_KMS_KEY", "")
DF_REGION = Variable.get("GCP_REGION")
DF_ZONE = Variable.get("GCP_REGION") + "-b"

# --------------------------------------------------------------------------------
# Set default arguments
Expand Down Expand Up @@ -106,12 +101,12 @@
'data_pipeline_dag',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
start = empty.EmptyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy.DummyOperator(
end = empty.EmptyOperator(
task_id='end',
trigger_rule='all_success'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,53 @@
# Load The Dependencies
# --------------------------------------------------------------------------------

import csv
import datetime
import io
import json
import logging
import os

from airflow import models
from airflow.models.variable import Variable
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.operators import dummy
from airflow.operators import empty
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup

# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
DRP_PRJ = os.environ.get("DRP_PRJ")
DRP_BQ = os.environ.get("DRP_BQ")
DRP_GCS = os.environ.get("DRP_GCS")
DRP_PS = os.environ.get("DRP_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
BQ_LOCATION = Variable.get("BQ_LOCATION")
DATA_CAT_TAGS = Variable.get("DATA_CAT_TAGS", deserialize_json=True)
DWH_LAND_PRJ = Variable.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = Variable.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = Variable.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = Variable.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = Variable.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = Variable.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = Variable.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = Variable.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = Variable.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = Variable.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = Variable.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = Variable.get("DWH_PLG_GCS")
GCP_REGION = Variable.get("GCP_REGION")
DRP_PRJ = Variable.get("DRP_PRJ")
DRP_BQ = Variable.get("DRP_BQ")
DRP_GCS = Variable.get("DRP_GCS")
DRP_PS = Variable.get("DRP_PS")
LOD_PRJ = Variable.get("LOD_PRJ")
LOD_GCS_STAGING = Variable.get("LOD_GCS_STAGING")
LOD_NET_VPC = Variable.get("LOD_NET_VPC")
LOD_NET_SUBNET = Variable.get("LOD_NET_SUBNET")
LOD_SA_DF = Variable.get("LOD_SA_DF")
ORC_PRJ = Variable.get("ORC_PRJ")
ORC_GCS = Variable.get("ORC_GCS")
TRF_PRJ = Variable.get("TRF_PRJ")
TRF_GCS_STAGING = Variable.get("TRF_GCS_STAGING")
TRF_NET_VPC = Variable.get("TRF_NET_VPC")
TRF_NET_SUBNET = Variable.get("TRF_NET_SUBNET")
TRF_SA_DF = Variable.get("TRF_SA_DF")
TRF_SA_BQ = Variable.get("TRF_SA_BQ")
DF_KMS_KEY = Variable.get("DF_KMS_KEY", "")
DF_REGION = Variable.get("GCP_REGION")
DF_ZONE = Variable.get("GCP_REGION") + "-b"

# --------------------------------------------------------------------------------
# Set default arguments
Expand Down Expand Up @@ -106,12 +102,12 @@
'data_pipeline_dc_tags_dag',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(
start = empty.EmptyOperator(
task_id='start',
trigger_rule='all_success'
)

end = dummy.DummyOperator(
end = empty.EmptyOperator(
task_id='end',
trigger_rule='all_success'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,54 @@
# --------------------------------------------------------------------------------

import datetime
import json
import os
import time

from airflow import models
from airflow.operators import dummy
from airflow.models.variable import Variable
from airflow.operators import empty
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryUpsertTableOperator, BigQueryUpdateTableSchemaOperator
from airflow.utils.task_group import TaskGroup

# --------------------------------------------------------------------------------
# Set variables - Needed for the DEMO
# --------------------------------------------------------------------------------
BQ_LOCATION = os.environ.get("BQ_LOCATION")
DATA_CAT_TAGS = json.loads(os.environ.get("DATA_CAT_TAGS"))
DWH_LAND_PRJ = os.environ.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = os.environ.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = os.environ.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = os.environ.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = os.environ.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = os.environ.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = os.environ.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = os.environ.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = os.environ.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = os.environ.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = os.environ.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = os.environ.get("DWH_PLG_GCS")
GCP_REGION = os.environ.get("GCP_REGION")
DRP_PRJ = os.environ.get("DRP_PRJ")
DRP_BQ = os.environ.get("DRP_BQ")
DRP_GCS = os.environ.get("DRP_GCS")
DRP_PS = os.environ.get("DRP_PS")
LOD_PRJ = os.environ.get("LOD_PRJ")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
LOD_NET_VPC = os.environ.get("LOD_NET_VPC")
LOD_NET_SUBNET = os.environ.get("LOD_NET_SUBNET")
LOD_SA_DF = os.environ.get("LOD_SA_DF")
ORC_PRJ = os.environ.get("ORC_PRJ")
ORC_GCS = os.environ.get("ORC_GCS")
ORC_GCS_TMP_DF = os.environ.get("ORC_GCS_TMP_DF")
TRF_PRJ = os.environ.get("TRF_PRJ")
TRF_GCS_STAGING = os.environ.get("TRF_GCS_STAGING")
TRF_NET_VPC = os.environ.get("TRF_NET_VPC")
TRF_NET_SUBNET = os.environ.get("TRF_NET_SUBNET")
TRF_SA_DF = os.environ.get("TRF_SA_DF")
TRF_SA_BQ = os.environ.get("TRF_SA_BQ")
DF_KMS_KEY = os.environ.get("DF_KMS_KEY", "")
DF_REGION = os.environ.get("GCP_REGION")
DF_ZONE = os.environ.get("GCP_REGION") + "-b"
BQ_LOCATION = Variable.get("BQ_LOCATION")
DATA_CAT_TAGS = Variable.get("DATA_CAT_TAGS", deserialize_json=True)
DWH_LAND_PRJ = Variable.get("DWH_LAND_PRJ")
DWH_LAND_BQ_DATASET = Variable.get("DWH_LAND_BQ_DATASET")
DWH_LAND_GCS = Variable.get("DWH_LAND_GCS")
DWH_CURATED_PRJ = Variable.get("DWH_CURATED_PRJ")
DWH_CURATED_BQ_DATASET = Variable.get("DWH_CURATED_BQ_DATASET")
DWH_CURATED_GCS = Variable.get("DWH_CURATED_GCS")
DWH_CONFIDENTIAL_PRJ = Variable.get("DWH_CONFIDENTIAL_PRJ")
DWH_CONFIDENTIAL_BQ_DATASET = Variable.get("DWH_CONFIDENTIAL_BQ_DATASET")
DWH_CONFIDENTIAL_GCS = Variable.get("DWH_CONFIDENTIAL_GCS")
DWH_PLG_PRJ = Variable.get("DWH_PLG_PRJ")
DWH_PLG_BQ_DATASET = Variable.get("DWH_PLG_BQ_DATASET")
DWH_PLG_GCS = Variable.get("DWH_PLG_GCS")
GCP_REGION = Variable.get("GCP_REGION")
DRP_PRJ = Variable.get("DRP_PRJ")
DRP_BQ = Variable.get("DRP_BQ")
DRP_GCS = Variable.get("DRP_GCS")
DRP_PS = Variable.get("DRP_PS")
LOD_PRJ = Variable.get("LOD_PRJ")
LOD_GCS_STAGING = Variable.get("LOD_GCS_STAGING")
LOD_NET_VPC = Variable.get("LOD_NET_VPC")
LOD_NET_SUBNET = Variable.get("LOD_NET_SUBNET")
LOD_SA_DF = Variable.get("LOD_SA_DF")
ORC_PRJ = Variable.get("ORC_PRJ")
ORC_GCS = Variable.get("ORC_GCS")
ORC_GCS_TMP_DF = Variable.get("ORC_GCS_TMP_DF")
TRF_PRJ = Variable.get("TRF_PRJ")
TRF_GCS_STAGING = Variable.get("TRF_GCS_STAGING")
TRF_NET_VPC = Variable.get("TRF_NET_VPC")
TRF_NET_SUBNET = Variable.get("TRF_NET_SUBNET")
TRF_SA_DF = Variable.get("TRF_SA_DF")
TRF_SA_BQ = Variable.get("TRF_SA_BQ")
DF_KMS_KEY = Variable.get("DF_KMS_KEY", "")
DF_REGION = Variable.get("GCP_REGION")
DF_ZONE = Variable.get("GCP_REGION") + "-b"

# --------------------------------------------------------------------------------
# Set default arguments
Expand Down Expand Up @@ -104,9 +103,9 @@
with models.DAG('data_pipeline_dc_tags_dag_flex',
default_args=default_args,
schedule_interval=None) as dag:
start = dummy.DummyOperator(task_id='start', trigger_rule='all_success')
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')

end = dummy.DummyOperator(task_id='end', trigger_rule='all_success')
end = empty.EmptyOperator(task_id='end', trigger_rule='all_success')

# Bigquery Tables created here for demo porpuse.
# Consider a dedicated pipeline or tool for a real life scenario.
Expand Down
Loading

0 comments on commit 0e7cfc8

Please sign in to comment.