-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix chunking issues in sum_AMEL and reduce_damages #83
Changes from 14 commits
c75f747
d0cc037
031e01c
f3129b9
a1e6d95
9e66ab9
fab2286
c6d1349
e187c1d
7302ed4
e7915c4
66bd270
58682cc
970c623
7fc0f03
edc191e
7574b6e
c730d26
f0e6ede
ae69955
43b7843
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,46 @@ def _parse_projection_filesys(input_path, query="exists==True"): | |
return df.query(query) | ||
|
||
|
||
def concatenate_damage_output(damage_dir, basename, save_path): | ||
"""Concatenate labor/energy damage output across batches. | ||
|
||
Parameters | ||
---------- | ||
damage_dir str | ||
Directory containing separate labor/energy damage output files by batches. | ||
basename str | ||
Prefix of the damage output filenames (ex. {basename}_batch0.zarr) | ||
save_path str | ||
Path to save concatenated file in .zarr format | ||
""" | ||
paths = glob.glob(f"{damage_dir}/{basename}*") | ||
data = xr.open_mfdataset(paths=paths, engine="zarr") | ||
|
||
for v in data: | ||
del data[v].encoding["chunks"] | ||
|
||
chunkies = { | ||
"batch": 15, | ||
"rcp": 1, | ||
"gcm": 1, | ||
"model": 1, | ||
"ssp": 1, | ||
"region": -1, | ||
"year": 10, | ||
} | ||
|
||
data = data.chunk(chunkies) | ||
|
||
for v in list(data.coords.keys()): | ||
if data.coords[v].dtype == object: | ||
data.coords[v] = data.coords[v].astype("unicode") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might as well handle this in a unit test to add the coverage and avoid the warning |
||
for v in list(data.variables.keys()): | ||
if data[v].dtype == object: | ||
data[v] = data[v].astype("unicode") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above comment |
||
|
||
data.to_zarr(save_path, mode="w") | ||
|
||
|
||
def calculate_labor_impacts(input_path, file_prefix, variable, val_type): | ||
"""Calculate impacts for labor results. | ||
|
||
|
@@ -371,7 +411,7 @@ def process_batch(g): | |
batches = [ds for ds in batches if ds is not None] | ||
chunkies = { | ||
"rcp": 1, | ||
"region": 24378, | ||
"region": -1, | ||
"gcm": 1, | ||
"year": 10, | ||
"model": 1, | ||
|
@@ -738,12 +778,21 @@ def prep( | |
).expand_dims({"gcm": [gcm]}) | ||
|
||
damages = damages.chunk( | ||
{"batch": 15, "ssp": 1, "model": 1, "rcp": 1, "gcm": 1, "year": 10} | ||
{ | ||
"batch": 15, | ||
kemccusker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"ssp": 1, | ||
"model": 1, | ||
"rcp": 1, | ||
"gcm": 1, | ||
"year": 10, | ||
"region": -1, | ||
} | ||
) | ||
damages.coords.update({"batch": [f"batch{i}" for i in damages.batch.values]}) | ||
|
||
# convert to EPA VSL | ||
damages = damages * 0.90681089 | ||
damages = damages.astype(np.float32) | ||
|
||
for v in list(damages.coords.keys()): | ||
if damages.coords[v].dtype == object: | ||
|
@@ -790,6 +839,15 @@ def coastal_inputs( | |
) | ||
else: | ||
d = d.sel(adapt_type=adapt_type, vsl_valuation=vsl_valuation, drop=True) | ||
chunkies = { | ||
"batch": 15, | ||
"ssp": 1, | ||
"model": 1, | ||
"slr": 1, | ||
"year": 10, | ||
"region": -1, | ||
} | ||
d = d.chunk(chunkies) | ||
d.to_zarr( | ||
f"{path}/coastal_damages_{version}-{adapt_type}-{vsl_valuation}.zarr", | ||
consolidated=True, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,6 +102,24 @@ def reduce_damages( | |
xr.open_zarr(damages).chunks["batch"][0] == 15 | ||
), "'batch' dim on damages does not have chunksize of 15. Please rechunk." | ||
|
||
if "coastal" not in sector: | ||
chunkies = { | ||
"rcp": 1, | ||
"region": -1, | ||
"gcm": 1, | ||
"year": 10, | ||
"model": 1, | ||
"ssp": 1, | ||
} | ||
else: | ||
chunkies = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add to unit tests |
||
"region": -1, | ||
"slr": 1, | ||
"year": 10, | ||
"model": 1, | ||
"ssp": 1, | ||
} | ||
|
||
ce_batch_dims = [i for i in gdppc.dims] + [ | ||
i for i in ds.dims if i not in gdppc.dims and i != "batch" | ||
] | ||
|
@@ -110,15 +128,14 @@ def reduce_damages( | |
i for i in gdppc.region.values if i in ce_batch_coords["region"] | ||
] | ||
ce_shapes = [len(ce_batch_coords[c]) for c in ce_batch_dims] | ||
ce_chunks = [xr.open_zarr(damages).chunks[c][0] for c in ce_batch_dims] | ||
|
||
template = xr.DataArray( | ||
da.empty(ce_shapes, chunks=ce_chunks), | ||
da.empty(ce_shapes), | ||
dims=ce_batch_dims, | ||
coords=ce_batch_coords, | ||
) | ||
).chunk(chunkies) | ||
|
||
other = xr.open_zarr(damages) | ||
other = xr.open_zarr(damages).chunk(chunkies) | ||
|
||
out = other.map_blocks( | ||
ce_from_chunk, | ||
|
@@ -205,7 +222,21 @@ def sum_AMEL( | |
for sector in sectors: | ||
print(f"Opening {sector},{params[sector]['sector_path']}") | ||
ds = xr.open_zarr(params[sector]["sector_path"], consolidated=True) | ||
ds = ds[params[sector][var]].rename(var) | ||
ds = ( | ||
ds[params[sector][var]] | ||
.rename(var) | ||
.chunk( | ||
{ | ||
"batch": 15, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seeing this dictionary of chunks repeated many times confirms that we should generalize at least a little bit - perhaps define a global chunkies and eventually put into a config. This can be done in a later PR. |
||
"ssp": 1, | ||
"model": 1, | ||
"rcp": 1, | ||
"gcm": 1, | ||
"year": 10, | ||
"region": -1, | ||
} | ||
) | ||
) | ||
ds = xr.where(np.isinf(ds), np.nan, ds) | ||
datasets.append(ds) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer to explicitly create a list of filenames to open, in case there's extra data files or anything like that. Maybe that's handled in a data check later?