Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Datawrangling macros #208

Merged
merged 32 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9613725
cleaner change
simone-silvestri Oct 31, 2024
e63b77c
bugfix
simone-silvestri Oct 31, 2024
4b43faf
correct comment
simone-silvestri Oct 31, 2024
b7dc6c1
introducing @onrank
simone-silvestri Oct 31, 2024
9e02051
make it compile
simone-silvestri Oct 31, 2024
15a79e3
import macros
simone-silvestri Oct 31, 2024
bbddd33
make it work
simone-silvestri Oct 31, 2024
d447b64
adding a @handshake
simone-silvestri Oct 31, 2024
603436c
just onrank missing
simone-silvestri Oct 31, 2024
427a47d
introducing @onrank
simone-silvestri Oct 31, 2024
b7352bc
will this work?
simone-silvestri Oct 31, 2024
0ece197
probably need this here
simone-silvestri Oct 31, 2024
613e16f
it probably should be a top level file
simone-silvestri Oct 31, 2024
a3aea9a
remove from DataWrangling
simone-silvestri Oct 31, 2024
5a4b75a
correct using here and there
simone-silvestri Oct 31, 2024
cdbf021
need the whole thing
simone-silvestri Oct 31, 2024
ebdc320
download everything at the beginning
simone-silvestri Oct 31, 2024
22ac3da
bug in download command
simone-silvestri Oct 31, 2024
6dde9be
correct old syntax
simone-silvestri Oct 31, 2024
aaf5ab9
using CFTime and Dates
simone-silvestri Oct 31, 2024
28c360a
better organized downloading
simone-silvestri Oct 31, 2024
f1d4e6e
donwload dataset does not change the arguments
simone-silvestri Oct 31, 2024
a682628
Merge branch 'main' into ss/distributed-datawrangling
navidcy Nov 5, 2024
2811f8e
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 6, 2024
f052be5
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 6, 2024
eeb7a65
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 8, 2024
c4c50da
add some tests
simone-silvestri Nov 8, 2024
16c11d0
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 11, 2024
6602fda
bugfix
simone-silvestri Nov 11, 2024
8b25115
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 11, 2024
ea642cd
Merge branch 'main' into ss/distributed-datawrangling
simone-silvestri Nov 12, 2024
b09348a
why does JRA55 fail?
simone-silvestri Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
ImageMorphology = "787d08f9-d448-5407-9aad-5290dd7ab264"
JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
NCDatasets = "85f8d34a-cbdd-5861-8df4-14fed0d494ab"
Oceananigans = "9e8cae18-63c1-5223-a75c-80ca9d6e9a09"
OrthogonalSphericalShellGrids = "c2be9673-fb75-4747-82dc-aa2bb9f4aed0"
Expand All @@ -40,12 +41,12 @@ JLD2 = "0.4, 0.5"
KernelAbstractions = "0.9"
NCDatasets = "0.12, 0.13, 0.14"
Oceananigans = "0.93.1"
OrthogonalSphericalShellGrids = "0.1.2"
OrthogonalSphericalShellGrids = "0.1.2, 0.2"
Scratch = "1"
SeawaterPolynomials = "0.3.4"
StaticArrays = "1"
Statistics = "1.9"
SurfaceFluxes = "0.11"
SurfaceFluxes = "0.12"
Thermodynamics = "0.12"
julia = "1.9"

Expand Down
31 changes: 13 additions & 18 deletions src/Bathymetry.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ using Oceananigans.BoundaryConditions
using KernelAbstractions: @kernel, @index
using JLD2

using ClimaOcean

using NCDatasets
using Downloads
using Printf
Expand Down Expand Up @@ -87,26 +89,19 @@ function regrid_bathymetry(target_grid;
major_basins = Inf) # Allow an `Inf` number of ``lakes''

filepath = joinpath(dir, filename)

if isfile(filepath)
@info "Regridding bathymetry from existing file $filepath."
else
@info "Downloading bathymetry..."
if !ispath(dir)
@info "Making bathymetry directory $dir..."
mkdir(dir)
end

fileurl = joinpath(url, filename)

try
Downloads.download(fileurl, filepath; progress=download_progress, verbose=true)
catch
cmd = `wget --no-check-certificate -O $filepath $fileurl`
run(cmd)
fileurl = joinpath(url, filename)

@root begin # perform all this only on rank 0, aka the "root" rank
if !isfile(filepath)
try
Downloads.download(fileurl, filepath; progress=download_progress, verbose=true)
catch
cmd = `wget --no-check-certificate -O $filepath $fileurl`
@root run(cmd)
end
end
end

dataset = Dataset(filepath)

FT = eltype(target_grid)
Expand Down
8 changes: 7 additions & 1 deletion src/ClimaOcean.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ export
JRA55_field_time_series,
ECCO_field, ECCOMetadata,
ocean_simulation,
initialize!
initialize!,
@root,
@onrank,
@distribute,
@handshake

using Oceananigans
using Oceananigans.Operators: ℑxyᶠᶜᵃ, ℑxyᶜᶠᵃ
Expand All @@ -27,6 +31,8 @@ using DataDeps
using Oceananigans.OutputReaders: GPUAdaptedFieldTimeSeries, FieldTimeSeries
using Oceananigans.Grids: node

include("distributed_utils.jl")

const SomeKindOfFieldTimeSeries = Union{FieldTimeSeries,
GPUAdaptedFieldTimeSeries}

Expand Down
1 change: 1 addition & 0 deletions src/DataWrangling/DataWrangling.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module DataWrangling
using Oceananigans
using Downloads
using Printf
using Downloads

using Oceananigans.Architectures: architecture, on_architecture
using Oceananigans.Grids: node
Expand Down
46 changes: 12 additions & 34 deletions src/DataWrangling/ECCO/ECCO.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export ECCOMetadata, ECCO_field, ECCO_mask, adjusted_ECCO_tracers, initialize!
export ECCO2Monthly, ECCO4Monthly, ECCO2Daily
export ECCO_restoring_forcing

using ClimaOcean
using ClimaOcean.DataWrangling
using ClimaOcean.DataWrangling: inpaint_mask!
using ClimaOcean.InitialConditions: three_dimensional_regrid!, interpolate!

Expand Down Expand Up @@ -135,7 +137,7 @@ function ECCO_field(metadata::ECCOMetadata;
architecture = CPU(),
horizontal_halo = (3, 3))

download_dataset!(metadata)
download_dataset(metadata)
path = metadata_path(metadata)
ds = Dataset(path)

Expand Down Expand Up @@ -216,48 +218,24 @@ function inpainted_ECCO_field(metadata::ECCOMetadata;
inpaint_mask!(f, mask; maxiter)

fill_halo_regions!(f)

return f
end

inpainted_ECCO_field(variable_name::Symbol; kw...) = inpainted_ECCO_field(ECCOMetadata(variable_name); kw...)

function set!(field::DistributedField, ECCO_metadata::ECCOMetadata; kw...)
function set!(field::Field, ecco_metadata::ECCOMetadata; kw...)

# Fields initialized from ECCO
grid = field.grid
arch = architecture(grid)
child_arch = child_architecture(arch)

f_ECCO = if arch.local_rank == 0 # Make sure we read/write the file using only one core
mask = ECCO_mask(ECCO_metadata, child_arch)
inpainted_ECCO_field(ECCO_metadata; mask, architecture = child_arch, kw...)
else
empty_ECCO_field(ECCO_metadata; architecture = child_arch)
end
arch = child_architecture(grid)
mask = ECCO_mask(ecco_metadata, arch)

barrier!(arch)
f = inpainted_ECCO_field(ecco_metadata; mask,
architecture = arch,
kw...)

# Distribute ECCO field to all workers
parent(f_ECCO) .= all_reduce(+, parent(f_ECCO), arch)

f_grid = Field(location(ECCO_metadata), grid)
interpolate!(f_grid, f_ECCO)
set!(field, f_grid)

return field
end

function set!(field::Field, ECCO_metadata::ECCOMetadata; kw...)

# Fields initialized from ECCO
grid = field.grid
arch = architecture(grid)
mask = ECCO_mask(ECCO_metadata, arch)

f = inpainted_ECCO_field(ECCO_metadata; mask, architecture=arch, kw...)
f_grid = Field(location(ECCO_metadata), grid)
interpolate!(f_grid, f)
set!(field, f_grid)
interpolate!(field, f)

return field
end
Expand Down
22 changes: 14 additions & 8 deletions src/DataWrangling/ECCO/ECCO_metadata.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using CFTime
using Dates
using ClimaOcean.DataWrangling

import Dates: year, month, day

using Base: @propagate_inbounds

Expand Down Expand Up @@ -96,6 +99,9 @@ all_ECCO_dates(::ECCO4Monthly) = DateTimeProlepticGregorian(1992, 1, 1) : Month(
all_ECCO_dates(::ECCO2Monthly) = DateTimeProlepticGregorian(1992, 1, 1) : Month(1) : DateTimeProlepticGregorian(2023, 12, 1)
all_ECCO_dates(::ECCO2Daily) = DateTimeProlepticGregorian(1992, 1, 4) : Day(1) : DateTimeProlepticGregorian(2023, 12, 31)

# File names of metadata containing multiple dates
metadata_filename(metadata) = [metadata_filename(metadatum) for metadatum in metadata]

# File name generation specific to each Dataset version
function metadata_filename(metadata::ECCOMetadata{<:AbstractCFDateTime, <:ECCO4Monthly})
shortname = short_name(metadata)
Expand Down Expand Up @@ -173,7 +179,7 @@ urls(::ECCOMetadata{<:Any, <:ECCO2Daily}) = "https://ecco.jpl.nasa.gov/drive/f
urls(::ECCOMetadata{<:Any, <:ECCO4Monthly}) = "https://ecco.jpl.nasa.gov/drive/files/Version4/Release4/interp_monthly/"

"""
download_dataset!(metadata::ECCOMetadata; url = urls(metadata))
download_dataset(metadata::ECCOMetadata; url = urls(metadata))

Download the dataset specified by `ECCOMetadata`. If `ECCOMetadata.dates` is a single date,
the dataset is downloaded directly. If `ECCOMetadata.dates` is a vector of dates, each date
Expand All @@ -187,30 +193,30 @@ ECCO_USERNAME=myuser ECCO_PASSWORD=mypasswrd julia
# Arguments
- `metadata::ECCOMetadata`: The metadata specifying the dataset to be downloaded.
"""
function download_dataset!(metadata::ECCOMetadata; url = urls(metadata))
function download_dataset(metadata::ECCOMetadata; url = urls(metadata))
username = get(ENV, "ECCO_USERNAME", nothing)
password = get(ENV, "ECCO_PASSWORD", nothing)
dir = metadata.dir

for metadatum in metadata
filename = metadata_filename(metadatum)
@distribute for metadatum in metadata # Distribute the download among ranks if MPI is initialized

fileurl = metadata_url(url, metadatum)
filepath = metadata_path(metadatum)

if !isfile(filepath)
instructions_msg = "\n See ClimaOcean.jl/src/ECCO/README.md for instructions."
if isnothing(username)
msg = "Could not find the ECCO_PASSWORD environment variable. \
See ClimaOcean.jl/src/ECCO/README.md for instructions on obtaining \
and setting your ECCO_USERNAME and ECCO_PASSWORD."
and setting your ECCO_USERNAME and ECCO_PASSWORD." * instructions_msg
throw(ArgumentError(msg))
elseif isnothing(password)
msg = "Could not find the ECCO_PASSWORD environment variable. \
See ClimaOcean.jl/src/ECCO/README.md for instructions on obtaining \
and setting your ECCO_USERNAME and ECCO_PASSWORD."
and setting your ECCO_USERNAME and ECCO_PASSWORD." * instructions_msg
throw(ArgumentError(msg))
end

fileurl = metadata_url(url, metadatum)

cmd = `wget --http-user=$(username) --http-passwd=$(password) --directory-prefix=$dir $fileurl`
run(cmd)
end
Expand Down
2 changes: 1 addition & 1 deletion src/DataWrangling/ECCO/ECCO_restoring.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ function ECCO_field_time_series(metadata::ECCOMetadata;
grid = nothing)

# Making sure all the required individual files are downloaded
download_dataset!(metadata)
download_dataset(metadata)

# ECCO data is too chunky to allow other backends
backend = ECCONetCDFBackend(time_indices_in_memory, metadata;
Expand Down
11 changes: 8 additions & 3 deletions src/DataWrangling/JRA55.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module JRA55

using Oceananigans
using Oceananigans.Units

using Oceananigans.Architectures: arch_array
using Oceananigans.DistributedComputations
using Oceananigans.DistributedComputations: child_architecture
Expand All @@ -11,6 +11,9 @@ using Oceananigans.Grids: λnodes, φnodes, on_architecture
using Oceananigans.Fields: interpolate!
using Oceananigans.OutputReaders: Cyclical, TotallyInMemory, AbstractInMemoryBackend, FlavorOfFTS, time_indices

using ClimaOcean
using ClimaOcean.DataWrangling: download_progress

using ClimaOcean.OceanSeaIceModels:
PrescribedAtmosphere,
TwoBandDownwellingRadiation
Expand Down Expand Up @@ -370,8 +373,10 @@ function JRA55_field_time_series(variable_name;
fts_name = field_time_series_short_names[variable_name]

# Note, we don't re-use existing jld2 files.
isfile(filepath) || download(url, filepath)
isfile(jld2_filepath) && rm(jld2_filepath)
@root begin
isfile(filepath) || download(url, filepath)
isfile(jld2_filepath) && rm(jld2_filepath)
end

# Determine default time indices
if totally_in_memory
Expand Down
Loading
Loading