Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE] GX not automatically updating the config while deploying in AWS Glue into S3 Bucket #10902

Open
tegarbratasena opened this issue Jan 30, 2025 · 1 comment
Labels
feature-request feature request

Comments

@tegarbratasena
Copy link

Describe the bug
I found an issue when deploying GX in our AWS S3 bucket using AWS Glue. I find it hard to setup the yaml configuration for s3, so to build the configuration I need to setup manually using this code:

# Function to update yaml manually
def update_yml_config(context, s3_client, bucket_name='my-bucket', key='great-expectations/great_expectations.yml'):
    config_dict = context.variables.config.to_json_dict()
    
    if 'data_context_id' in config_dict:
        config_dict['data_context_id'] = str(config_dict['data_context_id'])
    
    if 'fluent_datasources' in config_dict:
        for ds in config_dict['fluent_datasources'].values():
            if 'connection_string' in ds:
                ds['connection_string'] = "secret|${aws_secrets_manager_arn}|redshift_connection_string"
    
    config_yaml = yaml.dump(config_dict, default_flow_style=False)
    s3_client.put_object(
        Bucket=bucket_name,
        Key=key,
        Body=config_yaml,
        ContentType="text/yaml"
    )

# Get default context
context = gx.get_context()
config_dict = context.variables.config

# Updating config
config_dict["stores"] = {
    "expectations_store": {
        "class_name": "ExpectationsStore",
        "store_backend": {
            "class_name": "TupleS3StoreBackend",
            "bucket": "my-bucket",
            "prefix": "great-expectations/expectations/"
        }
    },
    "validation_results_store": {
        "class_name": "ValidationResultsStore",
        "store_backend": {
            "class_name": "TupleS3StoreBackend",
            "bucket": "my-bucket",
            "prefix": "great-expectations/validations/"
        }
    },
    "checkpoint_store": {
        "class_name": "CheckpointStore",
        "store_backend": {
            "class_name": "TupleS3StoreBackend",
            "bucket": "my-bucket",
            "prefix": "great-expectations/checkpoints/"
        }
    },
    "validation_definition_store": {
        "class_name": "ValidationDefinitionStore",
        "store_backend": {
            "class_name": "TupleS3StoreBackend",
            "bucket": "my-bucket",
            "prefix": "great-expectations/validation_definitions/"
        }
    }
}

# # Update context config to point to config_variables
config_dict["config_variables_file_path"] = "great-expectations/uncommitted/config_variables.yml"

# Creating config variables yaml
config_variables = {
    "my_aws_creds": "secret|arn:aws:secretsmanager|redshift_connection_string"
}

# Create data docs site
site_config = {
    "class_name": "SiteBuilder",
    "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
    "store_backend": {
        "class_name": "TupleS3StoreBackend",
        "bucket": "my-bucket",
        "prefix": "great-expectations/data_docs/",
    },
}
context.variables.config.data_docs_sites = {
    "local_site": site_config,
    "s3_site": site_config
}

s3_client.put_object(
    Bucket='my-bucket',
    Key='great-expectations/uncommitted/config_variables.yml',
    Body=yaml.dump(config_variables),
    ContentType="text/yaml"
)

# Dump config to yaml files
context_config = context.config
config_dict = context_config.to_dict()
config_yaml = yaml.dump(config_dict, default_flow_style=False)
s3_client.put_object(
  Bucket='my-bucket',
  Key='great-expectations/great_expectations.yml',
  Body=config_yaml,
  ContentType="text/yaml"
)
update_yml_config(context, s3_client)

# Read the config from S3 (when dump it doesn't automatically generate the data_context_id. However running code below and save the config yaml files generate the data_context_id automatically)
response = s3_client.get_object(
    Bucket='my-bucket',
    Key='great-expectations/great_expectations.yml'
)
config_yaml = response['Body'].read().decode('utf-8')
config_dict = yaml.safe_load(config_yaml)
context_config = DataContextConfig(**config_dict)
context = gx.get_context(project_config=context_config)
update_yml_config(context, s3_client)

# Load config_variables.yml into the Data Context
config_vars_response = s3_client.get_object(
    Bucket="my-bucket",
    Key="great-expectations/uncommitted/config_variables.yml"
)
config_vars = yaml.safe_load(config_vars_response["Body"].read())
context.config_variables.update(config_vars)  # Inject variables

it successfully store the yaml file within the path, the folder directory and the config_variables. To call the config I need to read the yaml file manually both for gx config and config variables like this:

response = s3_client.get_object(
    Bucket='my-bucket',
    Key='great-expectations/great_expectations.yml'
)
config_yaml = response['Body'].read().decode('utf-8')
config_dict = yaml.safe_load(config_yaml)
context_config = DataContextConfig(**config_dict)
context = gx.get_context(project_config=context_config)
# update_yml_config(context, s3_client)

# Load config_variables.yml into the Data Context
config_vars_response = s3_client.get_object(
    Bucket="my-bucket",
    Key="great-expectations/uncommitted/config_variables.yml"
)
config_vars = yaml.safe_load(config_vars_response["Body"].read())
context.config_variables.update(config_vars)  # Inject variables

I once tried to run this code:

root_dir = 's3://my-bucket/great-expectations/'
context = gx.get_context(context_root_dir=root_dir)
print(context)

But the return show that the GX read the config from the tmp files instead of the project folder, like this log:

{
  "checkpoint_store_name": "checkpoint_store",
  "config_version": 4,
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "/tmp/tmp3scyd_fn"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "expectations_store_name": "expectations_store",
  "fluent_datasources": {},
  "stores": {
    "expectations_store": {
      "class_name": "ExpectationsStore",
      "store_backend": {
        "class_name": "InMemoryStoreBackend"
      }
    },
    "validation_results_store": {
      "class_name": "ValidationResultsStore",
      "store_backend": {
        "class_name": "InMemoryStoreBackend"
      }
    },
    "checkpoint_store": {
      "class_name": "CheckpointStore",
      "store_backend": {
        "class_name": "InMemoryStoreBackend"
      }
    },
    "validation_definition_store": {
      "class_name": "ValidationDefinitionStore",
      "store_backend": {
    "class_name": "InMemoryStoreBackend"
}
    }
  },
  "validation_results_store_name": "validation_results_store"
}

I hope it can be more simpler in the future to deploy the GX. Thank you

Environment:

  • Great Expectations Version: v1.3.3
  • Data Source: Redshift DB
  • Cloud environment: AWS Glue, S3 Bucket

Additional context
Here's my full code that I need to execute to run the data validation in AWS Glue after setting up the config:

import sys
from datetime import datetime
from botocore.exceptions import ClientError
import os
import boto3,time,json
import yaml
import great_expectations as gx
from great_expectations.data_context.types.base import DataContextConfig, S3StoreBackendDefaults
from great_expectations.datasource.fluent import SQLDatasource
from great_expectations.checkpoint import (
    UpdateDataDocsAction
)

s3_client = boto3.client('s3')

def get_redshift_credentials():
    # Get config from S3
    s3 = boto3.client("s3", region_name="ap-southeast-1")
    config = json.loads(s3.get_object(
        Bucket="my-bucket",
        Key="config/prod-rms-id.json"
    )["Body"].read())["RMS_ID"][0]

    # Get credentials from Secrets Manager
    secrets = json.loads(
        boto3.client('secretsmanager', region_name="ap-southeast-1")
        .get_secret_value(SecretId=config["secret_manager_name_redshift"])
        ["SecretString"]
    )

    return {
        'host': config["redshift_host"],
        'database': config["redshift_db_name"],
        'port': 5439,
        'username': secrets["username"],
        'password': secrets["password"]
    }

# Function to update yaml manually
def update_yml_config(context, s3_client, bucket_name='my-bucket', key='great-expectations/great_expectations.yml'):
    config_dict = context.variables.config.to_json_dict()
    
    if 'data_context_id' in config_dict:
        config_dict['data_context_id'] = str(config_dict['data_context_id'])
    
    if 'fluent_datasources' in config_dict:
        for ds in config_dict['fluent_datasources'].values():
            if 'connection_string' in ds:
                ds['connection_string'] = "secret|${aws_secrets_manager_arn}|redshift_connection_string"
    
    config_yaml = yaml.dump(config_dict, default_flow_style=False)
    s3_client.put_object(
        Bucket=bucket_name,
        Key=key,
        Body=config_yaml,
        ContentType="text/yaml"
    )

# Create a list of Actions for the Checkpoint to perform
action_list = [
    # This Action updates the Data Docs static website with the Validation
    #   Results after the Checkpoint is run.
    UpdateDataDocsAction(
        name="update_all_data_docs",
    ),
]


# # Get default context
# context = gx.get_context()
# config_dict = context.variables.config

# # Updating config
# config_dict["stores"] = {
#     "expectations_store": {
#         "class_name": "ExpectationsStore",
#         "store_backend": {
#             "class_name": "TupleS3StoreBackend",
#             "bucket": "my-bucket",
#             "prefix": "great-expectations/expectations/"
#         }
#     },
#     "validation_results_store": {
#         "class_name": "ValidationResultsStore",
#         "store_backend": {
#             "class_name": "TupleS3StoreBackend",
#             "bucket": "my-bucket",
#             "prefix": "great-expectations/validations/"
#         }
#     },
#     "checkpoint_store": {
#         "class_name": "CheckpointStore",
#         "store_backend": {
#             "class_name": "TupleS3StoreBackend",
#             "bucket": "my-bucket",
#             "prefix": "great-expectations/checkpoints/"
#         }
#     },
#     "validation_definition_store": {
#         "class_name": "ValidationDefinitionStore",
#         "store_backend": {
#             "class_name": "TupleS3StoreBackend",
#             "bucket": "my-bucket",
#             "prefix": "great-expectations/validation_definitions/"
#         }
#     }
# }

# # # Update context config to point to config_variables
# config_dict["config_variables_file_path"] = "great-expectations/uncommitted/config_variables.yml"

# # Creating config variables yaml
# config_variables = {
#     "my_aws_creds": "secret|arn:aws:secretsmanager|redshift_connection_string"
# }

# # Create data docs site
# site_config = {
#     "class_name": "SiteBuilder",
#     "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
#     "store_backend": {
#         "class_name": "TupleS3StoreBackend",
#         "bucket": "my-bucket",
#         "prefix": "great-expectations/data_docs/",
#     },
# }
# context.variables.config.data_docs_sites = {
#     "local_site": site_config,
#     "s3_site": site_config
# }

# s3_client.put_object(
#     Bucket='my-bucket',
#     Key='great-expectations/uncommitted/config_variables.yml',
#     Body=yaml.dump(config_variables),
#     ContentType="text/yaml"
# )

# # Dump config to yaml files
# context_config = context.config
# config_dict = context_config.to_dict()
# config_yaml = yaml.dump(config_dict, default_flow_style=False)
# s3_client.put_object(
#   Bucket='my-bucket',
#   Key='great-expectations/great_expectations.yml',
#   Body=config_yaml,
#   ContentType="text/yaml"
# )
# update_yml_config(context, s3_client)

# Read the config from S3 (when dump it doesn't automatically generate the data_context_id. However running code below and save the config yaml files generate the data_context_id automatically)
response = s3_client.get_object(
    Bucket='my-bucket',
    Key='great-expectations/great_expectations.yml'
)
config_yaml = response['Body'].read().decode('utf-8')
config_dict = yaml.safe_load(config_yaml)
context_config = DataContextConfig(**config_dict)
context = gx.get_context(project_config=context_config)
# update_yml_config(context, s3_client)

# Load config_variables.yml into the Data Context
config_vars_response = s3_client.get_object(
    Bucket="my-bucket",
    Key="great-expectations/uncommitted/config_variables.yml"
)
config_vars = yaml.safe_load(config_vars_response["Body"].read())
context.config_variables.update(config_vars)  # Inject variables

# Get secrets from AWS Secrets Manager
creds = get_redshift_credentials()
redshift_host, database, port = creds['host'], creds['database'], creds['port']
username, password = creds['username'], creds['password']

# Build connection string and adding the table assets need to be run in every execution.
# Build the connection string
datasource_name = "my_db"
connection_string = f"redshift+redshift_connector://{username}:{password}@{redshift_host}:{port}/{database}"
redshift_datasource = context.data_sources.add_sql(
    name=datasource_name,
    connection_string=connection_string,
)

# Add table assets in a loop
tables = [
    "inbound_shipment",
    "sku",
    "batch"
]
table_assets = []
for table in tables:
    asset = redshift_datasource.add_table_asset(
        name=table,
        table_name=table,
        schema_name="public"
    )
    table_assets.append(asset)
    print(f"Added asset: {table}")
update_yml_config(context, s3_client)

# # Create expectation suite (cannot duplicate)
# suite_name = "inbound_shipment_expectation"
# suite = gx.ExpectationSuite(name=suite_name)
# suite = context.suites.add(suite)
# # Adding expectation
# suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="shipment_date"))
# suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="expected_kolis_count"))
# suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column='id'))

# Run this before checkpoint
datasource = context.data_sources.get("my_db")
data_asset = datasource.get_asset("inbound_shipment")
batch_definition = data_asset.add_batch_definition_whole_table("inbound_shipment_batch")
expectation_suite = context.suites.get("inbound_shipment_expectation")

# Create a validation definition (cannot duplicate)
# validation_definition = context.validation_definitions.add(
#     gx.ValidationDefinition(
#         name="inbound_shipment_validation",
#         data=batch_definition,
#         suite=expectation_suite
#     )
# )

# # Create a list of one or more Validation Definitions for the Checkpoint to run
# validation_definitions = [
#     context.validation_definitions.get("inbound_shipment_validation")
# ]

# # Create the Checkpoint (cannot duplicate)
# checkpoint_name = "inbound_shipment_checkpoints"
# checkpoint = gx.Checkpoint(
#     name=checkpoint_name,
#     validation_definitions=validation_definitions,
#     actions=action_list,
#     result_format={"result_format": "COMPLETE"},
# )
# # Save the Checkpoint to the Data Context
# context.checkpoints.add(checkpoint)

# # Run checkpoint
checkpoint = context.checkpoints.get("inbound_shipment_checkpoints")
validation_results = checkpoint.run()

Here's my config for references:

analytics_enabled: null
checkpoint_store_name: checkpoint_store
config_variables_file_path: great-expectations/uncommitted/config_variables.yml
config_version: 4
data_context_id: 9c0a5f7a-d6de-4888-a830-1b7831348037
data_docs_sites:
  local_site:
    class_name: SiteBuilder
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/data_docs/
  s3_site:
    class_name: SiteBuilder
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/data_docs/
expectations_store_name: expectations_store
fluent_datasources:
  my_db:
    assets:
    - batch_metadata: {}
      id: 0a0962e9-ff0d-4c08-b46c-b83dae193889
      name: inbound_shipment
      schema_name: public
      table_name: inbound_shipment
      type: table
    - batch_metadata: {}
      id: 0ae17b3c-1292-430f-899d-7634c17d9b7e
      name: sku
      schema_name: public
      table_name: sku
      type: table
    - batch_metadata: {}
      id: 14c05845-9876-448b-9c71-ed6ffbd187e0
      name: batch
      schema_name: public
      table_name: batch
      type: table
    connection_string: secret|${aws_secrets_manager_arn}|redshift_connection_string
    id: ac816895-ab3a-4546-aadb-e39a03a79545
    name: my_db
    type: sql
plugins_directory: null
progress_bars: null
stores:
  checkpoint_store:
    class_name: CheckpointStore
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/checkpoints/
  expectations_store:
    class_name: ExpectationsStore
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/expectations/
  validation_definition_store:
    class_name: ValidationDefinitionStore
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/validation_definitions/
  validation_results_store:
    class_name: ValidationResultsStore
    store_backend:
      bucket: my-bucket
      class_name: TupleS3StoreBackend
      prefix: great-expectations/validations/
validation_results_store_name: validation_results_store
@adeola-ak adeola-ak moved this from To Do to In progress in GX Core Issues Board Feb 3, 2025
@adeola-ak adeola-ak added the feature-request feature request label Feb 3, 2025
@adeola-ak
Copy link
Contributor

hi there,

thanks for your detailed report. This behavior is expected—GX requires a local project structure and does not currently support using an S3 path as the full project directory. By default, we initialize in a temporary directory (e.g., /tmp/tmp3scyd_fn). However, we do support configuring individual stores (e.g., expectations_store, validation_results_store, checkpoint_store) to use S3 by specifying an s3:// path in the store_backend configuration.

I understand that this setup adds complexity, but I believe your current approach is the best way with our existing structure. I’ll share your feedback about streamlining deployments with my team, though I can't guarantee any immediate changes. Please check back on this issue for further updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request feature request
Projects
Status: In progress
Development

No branches or pull requests

2 participants