Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data quality part for parking_sensor_synapse using great expectations library #641

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
opencensus-ext-azure==1.1.7
opencensus-ext-azure==1.1.7
great_expectations
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -47,6 +48,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -100,6 +102,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -165,6 +168,122 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# 4. Data Quality\n",
"The following uses the [Great Expectations](https://greatexpectations.io/) library. See [Great Expectation Docs](https://docs.greatexpectations.io/docs/) for more info.\n",
"\n",
"**Note**: for simplication purposes, the [Expectation Suite](https://docs.greatexpectations.io/docs/terms/expectation_suite) is created inline. Generally this should be created prior to data pipeline execution, and only loaded during runtime and executed against a data [Batch](https://docs.greatexpectations.io/docs/terms/batch/) via [Checkpoint](https://docs.greatexpectations.io/docs/terms/checkpoint/)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import os\n",
"from ruamel import yaml\n",
"from great_expectations.core.batch import RuntimeBatchRequest\n",
"from great_expectations.data_context import BaseDataContext\n",
"from great_expectations.data_context.types.base import (\n",
" DataContextConfig,\n",
" DatasourceConfig,\n",
" FilesystemStoreBackendDefaults,\n",
")\n",
"from pyspark.sql import SparkSession, Row\n",
"\n",
"# 0. Create mount point path for spark job\n",
"job_id = mssparkutils.env.getJobId()\n",
"root_directory=f\"/synfs/{job_id}/great_expectations\"\n",
"\n",
"\n",
"# 1. Configure DataContext\n",
"# https://docs.greatexpectations.io/docs/terms/data_context\n",
"data_context_config = DataContextConfig(\n",
" datasources={\n",
" \"parkingbay_data_source\": DatasourceConfig(\n",
" class_name=\"Datasource\",\n",
" execution_engine={\"class_name\": \"SparkDFExecutionEngine\"},\n",
" data_connectors={\n",
" \"parkingbay_data_connector\": {\n",
" \"module_name\": \"great_expectations.datasource.data_connector\",\n",
" \"class_name\": \"RuntimeDataConnector\",\n",
" \"batch_identifiers\": [\n",
" \"environment\",\n",
" \"pipeline_run_id\",\n",
" ],\n",
" }\n",
" }\n",
" )\n",
" },\n",
" store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=root_directory)\n",
")\n",
"context = BaseDataContext(project_config=data_context_config)\n",
"\n",
"\n",
"# 2. Create a BatchRequest based on parkingbay_sdf dataframe.\n",
"# https://docs.greatexpectations.io/docs/terms/batch\n",
"batch_request = RuntimeBatchRequest(\n",
" datasource_name=\"parkingbay_data_source\",\n",
" data_connector_name=\"parkingbay_data_connector\",\n",
" data_asset_name=\"paringbaydataaset\", # This can be anything that identifies this data_asset for you\n",
" batch_identifiers={\n",
" \"environment\": \"stage\",\n",
" \"pipeline_run_id\": \"pipeline_run_id\",\n",
" },\n",
" runtime_parameters={\"batch_data\": parkingbay_sdf}, # Your dataframe goes here\n",
")\n",
"\n",
"\n",
"# 3. Define Expecation Suite and corresponding Data Expectations\n",
"# https://docs.greatexpectations.io/docs/terms/expectation_suite\n",
"expectation_suite_name = \"parkingbay_data_exception_suite_basic\"\n",
"context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)\n",
"validator = context.get_validator(\n",
" batch_request=batch_request,\n",
" expectation_suite_name=expectation_suite_name,\n",
")\n",
"# Add Validatons to suite\n",
"# Check available expectations: validator.list_available_expectation_types()\n",
"# https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/expectations/index.html\n",
"# https://legacy.docs.greatexpectations.io/en/latest/reference/core_concepts/expectations/standard_arguments.html#meta\n",
"validator.expect_column_values_to_not_be_null(column=\"meter_id\")\n",
"validator.expect_column_values_to_not_be_null(column=\"marker_id\")\n",
"validator.expect_column_values_to_be_of_type(column=\"rd_seg_dsc\", type_=\"StringType\")\n",
"validator.expect_column_values_to_be_of_type(column=\"rd_seg_id\", type_=\"IntegerType\")\n",
"# validator.validate() # To run run validations without checkpoint\n",
"validator.save_expectation_suite(discard_failed_expectations=False)\n",
"\n",
"\n",
"# 4. Configure a checkpoint and run Expectation suite using checkpoint\n",
"# https://docs.greatexpectations.io/docs/terms/checkpoint\n",
"my_checkpoint_name = \"Parkingbay Data DQ\"\n",
"checkpoint_config = {\n",
" \"name\": my_checkpoint_name,\n",
" \"config_version\": 1.0,\n",
" \"class_name\": \"SimpleCheckpoint\",\n",
" \"run_name_template\": \"%Y%m%d-%H%M%S-my-run-name-template\",\n",
"}\n",
"my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config))\n",
"context.add_or_update_checkpoint(**checkpoint_config)\n",
"# Run Checkpoint passing in expectation suite.\n",
"checkpoint_result = context.run_checkpoint(\n",
" checkpoint_name=my_checkpoint_name,\n",
" validations=[\n",
" {\n",
" \"batch_request\": batch_request,\n",
" \"expectation_suite_name\": expectation_suite_name,\n",
" }\n",
" ],\n",
")\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand All @@ -174,7 +293,7 @@
}
},
"source": [
"# 4. Observability: Logging to Azure Application Insights using OpenCensus Library"
"# 5. Observability: Logging to Azure Application Insights using OpenCensus Library"
]
},
{
Expand Down Expand Up @@ -227,6 +346,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand All @@ -236,7 +356,7 @@
}
},
"source": [
"# 5. Observability: Logging to Log Analytics workspace using log4j"
"# 6. Observability: Logging to Log Analytics workspace using log4j"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -44,6 +45,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -101,6 +103,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand Down Expand Up @@ -149,6 +152,122 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# 4. Data Quality\n",
"The following uses the [Great Expectations](https://greatexpectations.io/) library. See [Great Expectation Docs](https://docs.greatexpectations.io/docs/) for more info.\n",
"\n",
"**Note**: for simplication purposes, the [Expectation Suite](https://docs.greatexpectations.io/docs/terms/expectation_suite) is created inline. Generally this should be created prior to data pipeline execution, and only loaded during runtime and executed against a data [Batch](https://docs.greatexpectations.io/docs/terms/batch/) via [Checkpoint](https://docs.greatexpectations.io/docs/terms/checkpoint/)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"# import os\n",
"from ruamel import yaml\n",
"from great_expectations.core.batch import RuntimeBatchRequest\n",
"from great_expectations.data_context import BaseDataContext\n",
"from great_expectations.data_context.types.base import (\n",
" DataContextConfig,\n",
" DatasourceConfig,\n",
" FilesystemStoreBackendDefaults,\n",
")\n",
"from pyspark.sql import SparkSession, Row\n",
"\n",
"# 0. Create mount point path for spark job\n",
"job_id = mssparkutils.env.getJobId()\n",
"root_directory=f\"/synfs/{job_id}/great_expectations\"\n",
"\n",
"# 1. Configure DataContext\n",
"# https://docs.greatexpectations.io/docs/terms/data_context\n",
"data_context_config = DataContextConfig(\n",
" datasources={\n",
" \"transformed_data_source\": DatasourceConfig(\n",
" class_name=\"Datasource\",\n",
" execution_engine={\"class_name\": \"SparkDFExecutionEngine\"},\n",
" data_connectors={\n",
" \"transformed_data_connector\": {\n",
" \"module_name\": \"great_expectations.datasource.data_connector\",\n",
" \"class_name\": \"RuntimeDataConnector\",\n",
" \"batch_identifiers\": [\n",
" \"environment\",\n",
" \"pipeline_run_id\",\n",
" ],\n",
" }\n",
" }\n",
" )\n",
" },\n",
" store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=root_directory)\n",
")\n",
"context = BaseDataContext(project_config=data_context_config)\n",
"\n",
"# 2. Create a BatchRequest based on new_fact_parking dataframe.\n",
"# https://docs.greatexpectations.io/docs/terms/batch\n",
"batch_request = RuntimeBatchRequest(\n",
" datasource_name=\"transformed_data_source\",\n",
" data_connector_name=\"transformed_data_connector\",\n",
" data_asset_name=\"paringbaydataaset\", # This can be anything that identifies this data_asset for you\n",
" batch_identifiers={\n",
" \"environment\": \"stage\",\n",
" \"pipeline_run_id\": \"pipeline_run_id\",\n",
" },\n",
" runtime_parameters={\"batch_data\": new_fact_parking}, # Your dataframe goes here\n",
")\n",
"\n",
"\n",
"# 3. Define Expecation Suite and corresponding Data Expectations\n",
"# https://docs.greatexpectations.io/docs/terms/expectation_suite\n",
"expectation_suite_name = \"Transfomed_data_exception_suite_basic\"\n",
"context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)\n",
"validator = context.get_validator(\n",
" batch_request=batch_request,\n",
" expectation_suite_name=expectation_suite_name,\n",
")\n",
"# Add Validatons to suite\n",
"# Check available expectations: validator.list_available_expectation_types()\n",
"# https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/expectations/index.html\n",
"# https://legacy.docs.greatexpectations.io/en/latest/reference/core_concepts/expectations/standard_arguments.html#meta\n",
"validator.expect_column_values_to_not_be_null(column=\"status\")\n",
"validator.expect_column_values_to_be_of_type(column=\"status\", type_=\"StringType\")\n",
"validator.expect_column_values_to_not_be_null(column=\"dim_time_id\")\n",
"validator.expect_column_values_to_be_of_type(column=\"dim_time_id\", type_=\"IntegerType\")\n",
"validator.expect_column_values_to_not_be_null(column=\"dim_parking_bay_id\")\n",
"validator.expect_column_values_to_be_of_type(column=\"dim_parking_bay_id\", type_=\"StringType\")\n",
"#validator.validate() # To run run validations without checkpoint\n",
"validator.save_expectation_suite(discard_failed_expectations=False)\n",
"\n",
"\n",
"# 4. Configure a checkpoint and run Expectation suite using checkpoint\n",
"# https://docs.greatexpectations.io/docs/terms/checkpoint\n",
"my_checkpoint_name = \"Transformed Data\"\n",
"checkpoint_config = {\n",
" \"name\": my_checkpoint_name,\n",
" \"config_version\": 1.0,\n",
" \"class_name\": \"SimpleCheckpoint\",\n",
" \"run_name_template\": \"%Y%m%d-%H%M%S-my-run-name-template\",\n",
"}\n",
"my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config,default_flow_style=False))\n",
"context.add_or_update_checkpoint(**checkpoint_config)\n",
"# Run Checkpoint passing in expectation suite\n",
"checkpoint_result = context.run_checkpoint(\n",
" checkpoint_name=my_checkpoint_name,\n",
" validations=[\n",
" {\n",
" \"batch_request\": batch_request,\n",
" \"expectation_suite_name\": expectation_suite_name,\n",
" }\n",
" ],\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand All @@ -158,7 +277,7 @@
}
},
"source": [
"# 4. Observability: Logging to Azure Application Insights using OpenCensus Library"
"# 5. Observability: Logging to Azure Application Insights using OpenCensus Library"
]
},
{
Expand Down Expand Up @@ -211,6 +330,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"nteract": {
Expand All @@ -220,7 +340,7 @@
}
},
"source": [
"# 5. Observability: Logging to Log Analytics workspace using log4j"
"# 6. Observability: Logging to Log Analytics workspace using log4j"
]
},
{
Expand Down