Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature automate data transformation #137

Merged
merged 13 commits into from
Nov 20, 2024
15 changes: 15 additions & 0 deletions data_transformation_plugins/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## Information about the folder
This folder is a part of the `automation pipeline using DAG`. It contains the functions which are essential for transforming the given dataset into COGs. There are python files for transforming every dataset which will be used as plugins into the pipeline.

## How this folder fits in the automation pipeline
These functions/files are created by the `developer` to transform a single data file to COG. Once the COGs are validated, these python scripts are pushed to the `GHGC-SMCE S3` These files are then fetched by the `SM2A DAG` to complete the transformation of entire dataset automatically.

## Naming convention for the transformation files in the folder
- `name of python file` - `collectionname_transformation.py`
`collectionname` refers to the STAC collection name of the dataset followed by the word `transformation`. Make sure the `collectionname` within the filename matches with the `collectionname` passed as a `parameter` to the DAG.

## Steps for running the pipeline
- Test convert a single netCDF file for a new dataset using the `sample_transformation.ipynb` notebook.
- Create a new `data transformation plugin` python file for the new dataset using the convention mentioned above.
- `push_to_s3.py` is not yet plugged into the `CI/CD pipeline` so after creating the plugin, run the python file in the terminal. Running the python file will only push the files that are not present on the S3 folder.
- At this point, the tasks from `ghgc-docs` are completed.
Empty file.
47 changes: 47 additions & 0 deletions data_transformation_plugins/ecco_darwin_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import xarray
import re

def ecco_darwin_transformation(file_obj, name, nodata):
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
"""Tranformation function for the ecco darwin dataset
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
xds = xarray.open_dataset(file_obj)
xds = xds.rename({"y": "latitude", "x": "longitude"})
xds = xds.assign_coords(longitude=((xds.longitude / 1440) * 360) - 180).sortby(
"longitude"
)
xds = xds.assign_coords(latitude=((xds.latitude / 721) * 180) - 90).sortby(
"latitude"
)

variables = list(xds.data_vars)[2:]

for var in variables:
filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
data = xds[var]

data = data.reindex(latitude=list(reversed(data.latitude)))
data = data.where(data != nodata, -9999)
data.rio.set_spatial_dims("longitude", "latitude", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
data.rio.write_nodata(-9999, inplace=True)

filename_elements.pop()
filename_elements[-1] = filename_elements[-2] + filename_elements[-1]
filename_elements.pop(-2)
# # insert date of generated COG into filename
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = data

return var_data_netcdf
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
38 changes: 38 additions & 0 deletions data_transformation_plugins/geos_oco2_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import xarray
import re

def geos_oco2_transformation(file_obj, name, nodata):
"""Tranformation function for the oco2 geos dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
xds = xarray.open_dataset(file_obj)
xds = xds.assign_coords(lon=(((xds.lon + 180) % 360) - 180)).sortby("lon")
variable = [var for var in xds.data_vars]
for time_increment in range(0, len(xds.time)):
for var in variable:
filename = name.split("/ ")[-1]
filename_elements = re.split("[_ .]", filename)
data = getattr(xds.isel(time=time_increment), var)
data = data.isel(lat=slice(None, None, -1))
data = data.where(data != nodata, -9999)
data.rio.set_spatial_dims("lon", "lat", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
data.rio.write_nodata(-9999, inplace=True)
# # insert date of generated COG into filename
filename_elements[-1] = filename_elements[-3]
filename_elements.insert(2, var)
filename_elements.pop(-3)
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = data

return var_data_netcdf
37 changes: 37 additions & 0 deletions data_transformation_plugins/gosat_ch4_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import xarray
import re

def gosat_ch4_transformation(file_obj, name, nodata):
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
"""Tranformation function for the ecco darwin dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
ds = xarray.open_dataset(file_obj)
variable = [var for var in ds.data_vars]

for var in variable:
filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
data = ds[var]
filename_elements.pop()
filename_elements.insert(2, var)
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"

data = data.reindex(lat=list(reversed(data.lat)))
data = data.where(data != -9999, -9999)
data.rio.write_nodata(-9999, inplace=True)

data.rio.set_spatial_dims("lon", "lat")
data.rio.write_crs("epsg:4326", inplace=True)
var_data_netcdf[cog_filename] = data

return var_data_netcdf
34 changes: 34 additions & 0 deletions data_transformation_plugins/gpw_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import xarray
import re

def gpw_transformation(file_obj, name, nodata):
"""Tranformation function for the gridded population dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""

var_data_netcdf = {}
xds = xarray.open_dataarray(file_obj, engine="rasterio")

filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
# # insert date of generated COG into filename
filename_elements.pop()
filename_elements.append(filename_elements[-3])
xds = xds.where(xds != nodata, -9999)
xds.rio.set_spatial_dims("x", "y", inplace=True)
xds.rio.write_crs("epsg:4326", inplace=True)
xds.rio.write_nodata(-9999, inplace=True)

cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = xds

return var_data_netcdf
47 changes: 47 additions & 0 deletions data_transformation_plugins/push_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import boto3
import os

import boto3
import os

def upload_files_to_s3(folder_path, bucket_name, s3_folder, exclude_files):
"""
Uploads all files in a folder to a specified S3 folder only if the file does not already exist in S3,
and excluding specified files.

Parameters:
- folder_path (str): Path to the local folder containing files to upload.
- bucket_name (str): Name of the S3 bucket.
- s3_folder (str): Destination folder path in the S3 bucket.
- exclude_files (list): List of files to exclude from uploading.
"""
# Initialize S3 client
s3 = boto3.client('s3')

# Loop through files in the local folder
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)

# Check if it's a file and not in the exclude list
if os.path.isfile(file_path) and file_name not in exclude_files:
s3_key = os.path.join(s3_folder, file_name)

try:
# Check if the file already exists in S3
s3.head_object(Bucket=bucket_name, Key=s3_key)
print(f"Skipped {file_name} (already exists in S3)")
except s3.exceptions.ClientError as e:
# If the file does not exist, upload it
if e.response['Error']['Code'] == '404':
try:
s3.upload_file(file_path, bucket_name, s3_key)
print(f"Uploaded {file_name} to {s3_key}")
except Exception as upload_error:
print(f"Error uploading {file_name}: {upload_error}")
else:
print(f"Error checking existence of {file_name}: {e}")

# Example usage:
# upload_folder_to_s3("path/to/local/folder", "my-s3-bucket", "my/s3/folder", ["exclude1.ext", "exclude2.ext"])
if __name__ == "__main__":
upload_files_to_s3("data_transformation_plugins", "ghgc-data-store-develop", "data_transformation_plugins", ["__init__.py", "push_to_s3.py", "README.md", "sample_transformation.ipynb"])
Loading
Loading