Skip to content

Commit

Permalink
netcdf preprocess: Parallelized workflow to crop original IMS netcdf …
Browse files Browse the repository at this point in the history
…files to centered window at variable window size with luigi
  • Loading branch information
JuLieAlgebra committed Mar 12, 2024
1 parent d4861a6 commit 4d113a2
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/preprocess_netcdf.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[CropFiles]
input_dir = "ims_1km"
output_dir = "ims_netcdf_1km_cropped_4_000_000km_window"
window_size = 4000
80 changes: 80 additions & 0 deletions icedyno/preprocess/crop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
To run this as a Luigi DAG locally:
``pixi run python icedyno/preprocess/crop.py``
You may have to enable toml support with luigi by setting an variable in your terminal, like ``export LUIGI_CONFIG_PARSER=toml``
"""
import glob
import os
import pathlib

import luigi
import xarray as xr


class CropFiles(luigi.Task):
"""
Crop IMS and MASIE NetCDF files from the center of their grids (where x, y == 1/2*sie.shape) based on input window_size.
"""

input_dir = luigi.Parameter()
output_dir = luigi.Parameter()

window_size = luigi.IntParameter(default=4000)
year = luigi.IntParameter()

def output(self) -> luigi.LocalTarget:
return luigi.LocalTarget(
os.path.join("data", self.output_dir, f"_SUCCESS_{self.year}")
)

def run(self) -> None:
year_output_dir = os.path.join("data", self.output_dir, str(self.year))
if not os.path.exists(year_output_dir):
os.makedirs(year_output_dir)

input_cdf_files = glob.glob(
os.path.join("data", self.input_dir, str(self.year), "*.nc")
)

for cdf_filepath in input_cdf_files:
output_filename = (
os.path.join(year_output_dir, pathlib.Path(cdf_filepath).stem)
+ f"_grid{self.window_size}.nc"
)
if os.path.exists(output_filename):
print(cdf_filepath, "already on disk, skipping...")

# Open the original NetCDF file
ds = xr.open_dataset(cdf_filepath, engine="h5netcdf")

x_coord = ds["x"].shape[0] // 2
y_coord = ds["y"].shape[0] // 2
window = self.window_size * 1000

cropped_ds = ds.sel(
x=slice(x_coord - window, x_coord + window),
y=slice(y_coord - window, y_coord + window),
)

# Write the cropped data to a new NetCDF file
cropped_ds.to_netcdf(output_filename, engine="h5netcdf")


if __name__ == "__main__":
os.environ["LUIGI_CONFIG_PARSER"] = "toml"

config_path = os.path.join("config", "preprocess_netcdf.toml")

config = luigi.configuration.get_config(parser="toml")
config.read(config_path)

luigi.configuration.add_config_path(config_path)

## Change acording to your number of cores
n_workers = 10
years = range(2014, 2025)

tasks = [CropFiles(year=year) for year in years]
luigi.build(tasks, workers=n_workers, local_scheduler=True)
Loading

0 comments on commit 4d113a2

Please sign in to comment.