Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PI objectives for the transformation pipeline #236

Merged
merged 42 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b6b4b5e
added the statistics calculations in pipeline
Oct 16, 2024
42e88d7
generating JSON file for the stats and uploading it to S3
Oct 16, 2024
a5dcb51
using temp file for JSON
Oct 17, 2024
f98add0
testing JSON update for multiple datasets
Oct 18, 2024
5678be5
tested the creation of JSON file for different datasets
Oct 18, 2024
188f96e
feat:port event-driven vector automation to terraform
ividito Oct 17, 2024
a2e3bad
fix: feature flag for vector automation lambda
ividito Oct 17, 2024
12ba68c
fix: adjust log group count
ividito Oct 17, 2024
f532a6f
fix tf vector automation vars
smohiudd Oct 18, 2024
89d5251
update vars in event bridge tf
smohiudd Oct 18, 2024
9c636e0
add new line
smohiudd Oct 18, 2024
47b0707
fix tag filter
smohiudd Oct 18, 2024
8169ab6
update vector subnet ref
smohiudd Oct 18, 2024
66cfb79
fix vector subnet ids
smohiudd Oct 18, 2024
c449cb9
conditional vector subnets
smohiudd Oct 21, 2024
0df795e
WIP: dynamic ingest mapping for continuous tracing
ividito Jul 19, 2024
bfce3d5
feat: use taskflow for concurrent task runs
ividito Jul 31, 2024
31a3e84
clean up debug code
ividito Jul 31, 2024
a811fe2
Bugfix vector ingest
ividito Aug 1, 2024
3c034ae
add generic vector pipeline
smohiudd Aug 20, 2024
4a5dc02
remove process generic vector pipeline
smohiudd Aug 20, 2024
de8b69d
Update dags/veda_data_pipeline/veda_generic_vector_pipeline.py
ividito Aug 27, 2024
d28432d
Restructure dataset get_files output, add retries to some tasks to av…
ividito Sep 9, 2024
26feccf
Changes to make vector ingest work in shared VPC environment
ividito Oct 4, 2024
18470d0
feat: disable default api gateway endpoint for workflows api
botanical Oct 21, 2024
919e04b
feat: add variable to configure disabling default apigw endpoint
botanical Oct 21, 2024
81f94de
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
7485696
Update infrastructure/terraform.tfvars.tmpl
botanical Oct 23, 2024
75fef32
Using pythonoperators instead of ECS operator
amarouane-ABDELHAK Oct 24, 2024
6f080a7
Switching to pythonOperator
amarouane-ABDELHAK Oct 24, 2024
dafb213
tested the creation of JSON file for different datasets
Oct 18, 2024
884c296
optimizing the automation
Oct 28, 2024
05cd27d
Merge branch 'dev' into feature/optimize_automated_cog_transformation
Oct 29, 2024
8d64aaa
testing the optimized approach for automation
Oct 29, 2024
3ada005
successfully tested the automation for gridded population dataset
Oct 29, 2024
a64099a
updated readme for the folder and resolved the failure bug
Nov 4, 2024
534e571
Adding aut ingest
amarouane-ABDELHAK Dec 11, 2024
da43490
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
ac3d475
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
b584575
Fixing automation DAG
amarouane-ABDELHAK Dec 11, 2024
487f225
Add slack notification libraries
amarouane-ABDELHAK Dec 11, 2024
82f1109
Add slack notification libraries
amarouane-ABDELHAK Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dags/automated_transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Information about the folder
This folder contains `automation DAG`. It contains the DAG which is essential for transforming the given dataset into COGs.
The folder consists of two files-:
- `automation_dag.py` - It contains the DAG architecture.
- `transformation_pipeline.py` - It contains multiple functions to achieve the transformation of a given netCDF file to a COG. The file fetches checks whether the `transformation plugins` exist on `S3 bucket`, fetches the function and uses it for transformation. The statistics for the `netCDF` and `COG` files are also calculated and stored in a `JSON` file and pushed to `S3` along with the `transformed COG`.
128 changes: 128 additions & 0 deletions dags/automated_transformation/automation_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from slack_notifications import slack_fail_alert

DAG_ID = "automate-cog-transformation"

# Custom validation function


dag_run_config = {
"data_acquisition_method": Param(
"s3", enum=["s3"]
), # To add Other protocols (HTTP, SFTP...)
"plugins_uri": Param(
"https://raw.githubusercontent.com/US-GHG-Center/ghgc-docs/refs/heads/main/",
type="string",
),
"raw_data_bucket": "ghgc-data-store-develop",
"raw_data_prefix": Param(
"delivery/gpw",
type="string",
pattern="^[^/].*[^/]$",
),
"dest_data_bucket": "ghgc-data-store-develop",
"data_prefix": Param("transformed_cogs", type="string", pattern="^[^/].*[^/]$"),
"collection_name": "gpw",
"nodata": Param(-9999, type="number"),
"ext": Param(".tif", type="string", pattern="^\\..*$"),
}

with DAG(
dag_id=DAG_ID,
schedule=None,
catchup=False,
tags=["Transformation", "Report"],
params=dag_run_config,
on_failure_callback=slack_fail_alert,
) as dag:
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

@task
def check_function_exists(ti):
from dags.automated_transformation.transformation_pipeline import (
download_python_file,
)

config = ti.dag_run.conf
folder_name = "data_transformation_plugins"
file_name = f'{config.get("collection_name")}_transformation.py'
try:
plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}"
download_python_file(uri=plugin_url, check_exist=True)
return f"The {file_name} exists in {folder_name} in this URL {plugin_url}."
except Exception as e:
raise Exception(f"Error checking file existence: {e}")

@task
def discover_files(ti):
from dags.automated_transformation.transformation_pipeline import (
get_all_s3_keys,
)

config = ti.dag_run.conf.copy()
bucket = config.get("raw_data_bucket")
model_name = config.get("raw_data_prefix")
ext = config.get("ext") # .nc as well
generated_list = get_all_s3_keys(bucket, model_name, ext)
chunk_size = int(len(generated_list) / 900) + 1
return [
generated_list[i : i + chunk_size]
for i in range(0, len(generated_list), chunk_size)
]

@task(max_active_tis_per_dag=1)
def process_files(file_url, **kwargs):
dag_run = kwargs.get("dag_run")
from dags.automated_transformation.transformation_pipeline import transform_cog

config = dag_run.conf.copy()
raw_bucket_name = config.get("raw_data_bucket")
dest_data_bucket = config.get("dest_data_bucket")
data_prefix = config.get("data_prefix")
nodata = config.get("nodata")
collection_name = config.get("collection_name")
print(f"The file I am processing is {file_url}")
print("len of files", len(file_url))
folder_name = "data_transformation_plugins"
file_name = f"{collection_name}_transformation.py"
plugin_url = f"{config['plugins_uri'].strip('/')}/{folder_name}/{file_name}"

file_status = transform_cog(
file_url,
plugin_url=plugin_url,
nodata=nodata,
raw_data_bucket=raw_bucket_name,
dest_data_bucket=dest_data_bucket,
data_prefix=data_prefix,
collection_name=collection_name,
)
return file_status

@task
def generate_report(reports, **kwargs):
dag_run = kwargs.get("dag_run")
collection_name = dag_run.conf.get("collection_name")
count, failed_files = 0, []
for report in reports:
if "failed" in report.values():
failed_files.append(report)
elif "success" in report.values():
count += 1

if failed_files:
raise Exception(f"Error generating COG file {failed_files}")
return {
"collection": collection_name,
"successes": count,
"failures": failed_files,
}

urls = start >> check_function_exists() >> discover_files()
report_data = process_files.expand(file_url=urls)
generate_report(reports=report_data) >> end
233 changes: 233 additions & 0 deletions dags/automated_transformation/transformation_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import importlib
import json
import os
import tempfile

import boto3
import numpy as np
import rasterio
import requests
import s3fs


def get_all_s3_keys(bucket, model_name, ext) -> list:
"""Function fetches all the s3 keys from the given bucket and model name.

Args:
bucket (str): Name of the bucket from where we want to fetch the data
model_name (str): Dataset name/folder name where the data is stored
ext (str): extension of the file that is to be fetched.

Returns:
list : List of all the keys that match the given criteria
"""
session = boto3.session.Session()
s3_client = session.client("s3")
keys = []

kwargs = {"Bucket": bucket, "Prefix": f"{model_name}"}
try:
while True:
resp = s3_client.list_objects_v2(**kwargs)
print("response is ", resp)
for obj in resp["Contents"]:
if obj["Key"].endswith(ext) and "historical" not in obj["Key"]:
keys.append(obj["Key"])

try:
kwargs["ContinuationToken"] = resp["NextContinuationToken"]
except KeyError:
break
except Exception as ex:
raise Exception(f"Error returned is {ex}")

print(f"Discovered {len(keys)}")
return keys


def download_python_file_from_s3(bucket_name, s3_key):
"""
Downloads a Python file from an S3 bucket and returns a temporary file path.

Parameters:
- bucket_name (str): The name of the S3 bucket.
- s3_key (str): The key (path) to the file in the S3 bucket.

Returns:
- str: Path to the temporary file.
"""
s3 = boto3.client("s3")

# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".py")
temp_file.close() # Close the file so it can be written to by boto3

# Download the S3 file to the temporary file location
s3.download_file(bucket_name, s3_key, temp_file.name)
print(
f"Downloaded {s3_key} from bucket {bucket_name} to temporary file {temp_file.name}"
)

return temp_file.name


def download_python_file(uri: str, check_exist=False):
if uri.startswith("s3://"):
# Remove the 's3://' prefix
s3_path = uri[5:]
# Split into bucket and key
parts = s3_path.split("/", 1)
bucket_name, key = parts
return download_python_file_from_s3(bucket_name=bucket_name, s3_key=key)
return download_python_file_from_github(url=uri, check_exist=check_exist)


def download_python_file_from_github(url, check_exist=False):
try:
# Send a GET request to the URL
response = requests.get(url)
response.raise_for_status() # Raise an error for HTTP errors
if check_exist:
return True

# Extract the file name from the URL
file_name = os.path.basename(url)

# Create a temporary directory and file with the same name
temp_dir = tempfile.gettempdir()
temp_file_path = os.path.join(temp_dir, file_name)

# Write the content to the temporary file
with open(temp_file_path, "wb") as temp_file:
temp_file.write(response.content)

print(f"File downloaded to: {temp_file_path}")
return temp_file_path

except requests.exceptions.RequestException as e:
print(f"Error downloading the file: {e}")
return None


def load_function_from_file(file_path, function_name):
"""
Dynamically loads a function from a Python file.

Parameters:
- file_path (str): Path to the Python file.
- function_name (str): Name of the function to load.

Returns:
- function: The loaded function object.
"""
# Load the module from the file path
spec = importlib.util.spec_from_file_location("dynamic_module", file_path)
dynamic_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(dynamic_module)

# Return the function
return getattr(dynamic_module, function_name)


def transform_cog(
name_list,
nodata,
raw_data_bucket,
dest_data_bucket,
data_prefix,
collection_name,
plugin_url,
):
"""This function calls the plugins (dataset specific transformation functions) and
generalizes the transformation of dataset to COGs.

Args:
plugin_url:
name_list (str): List of the files to be transformed
nodata (str): Nodata value as mentioned by the data provider
raw_data_bucket (str): Name of the bucket where the raw data resides
dest_data_bucket (str): Name of the bucket where we want to store the tranformed cogs
raw_data_prefix (str): Folder where the netCDF files are stored in the bucket
collection_name (str): Name of the collection that would be used for the dataset

Returns:
dict: Status and name of the file that is transformed
"""

session = boto3.session.Session()
s3_client = session.client("s3")
json_dict = {}
function_name = f'{collection_name.replace("-", "_")}_transformation'
temp_file_path = download_python_file(plugin_url)
for name in name_list:
url = f"s3://{raw_data_bucket}/{name}"
fs = s3fs.S3FileSystem()
print("the url is", url)
with fs.open(url, mode="rb") as file_obj:
try:
transform_func = load_function_from_file(temp_file_path, function_name)
var_data_netcdf = transform_func(file_obj, name, nodata)

for cog_filename, data in var_data_netcdf.items():
# generate COG
min_value_netcdf = data.min().item()
max_value_netcdf = data.max().item()
std_value_netcdf = data.std().item()
mean_value_netcdf = data.mean().item()
COG_PROFILE = {"driver": "COG", "compress": "DEFLATE"}
with tempfile.NamedTemporaryFile() as temp_file:
data.rio.to_raster(temp_file.name, **COG_PROFILE)
s3_client.upload_file(
Filename=temp_file.name,
Bucket=dest_data_bucket,
Key=f"{data_prefix}/{collection_name}/{cog_filename}",
)
raster_data = rasterio.open(temp_file.name).read()
raster_data[raster_data == -9999] = np.nan
min_value_cog = np.nanmin(raster_data)
max_value_cog = np.nanmax(raster_data)
mean_value_cog = np.nanmean(raster_data)
std_value_cog = np.nanstd(raster_data)
json_dict.update(
{
"original_file_url": name,
"transformed_filename": cog_filename,
"transformed_cog_s3uri": f"s3://{dest_data_bucket}/{data_prefix}/{cog_filename}",
"minimum_value_cog": f"{min_value_cog:.4f}",
"maximum_value_cog": f"{max_value_cog:.4f}",
"std_value_cog": f"{std_value_cog:.4f}",
"mean_value_cog": f"{mean_value_cog:.4f}",
"minimum_value_netcdf": f"{min_value_netcdf:.4f}",
"maximum_value_netcdf": f"{max_value_netcdf:.4f}",
"std_value_netcdf": f"{std_value_netcdf:.4f}",
"mean_value_netcdf": f"{mean_value_netcdf:.4f}",
}
)
with tempfile.NamedTemporaryFile() as json_temp:
with open(json_temp.name, "w") as fp:
json.dump(json_dict, fp, indent=4)
print("JSON dictionary is ", json_dict)

# Upload the file to the specified S3 bucket and folder
s3_client.upload_file(
Filename=json_temp.name,
Bucket=dest_data_bucket,
Key=f"{data_prefix}/{collection_name}/{cog_filename[:-4]}.json",
ExtraArgs={"ContentType": "application/json"},
)
status = {
"transformed_filename": cog_filename,
"statistics_file": f"{cog_filename.split('.')[0]}.json",
"s3uri": f"s3://{dest_data_bucket}/{data_prefix}/{collection_name}/{cog_filename}",
"status": "success",
}

except Exception as ex:
status = {
"transformed_filename": name,
"status": "failed",
"reason": f"Error: {ex}",
}
finally:
os.remove(temp_file_path)
return status
Loading
Loading