Skip to content

Commit

Permalink
feat: add support for collective parallel reads in fms2_io (NOAA-GFDL…
Browse files Browse the repository at this point in the history
  • Loading branch information
dkokron authored Mar 29, 2024
1 parent 1bb706c commit f5d9892
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 52 deletions.
104 changes: 74 additions & 30 deletions fms2_io/include/netcdf_read_data.inc
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,12 @@ subroutine netcdf_read_data_2d(fileobj, variable_name, buf, unlim_dim_level, &
endif
c(unlim_dim_index) = unlim_dim_level
endif
if (fileobj%is_root) then
if(fileobj%use_collective) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
! NetCDF does not have the ability to specify collective I/O at
! the file basis so we must activate at the variable level
err = nf90_var_par_access(fileobj%ncid, varid, nf90_collective)
call check_netcdf_code(err, append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
Expand All @@ -370,20 +374,38 @@ subroutine netcdf_read_data_2d(fileobj, variable_name, buf, unlim_dim_level, &
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_2d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
else
if (fileobj%is_root) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (integer(kind=i8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_2d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
endif
endif
end subroutine netcdf_read_data_2d
Expand Down Expand Up @@ -446,8 +468,12 @@ subroutine netcdf_read_data_3d(fileobj, variable_name, buf, unlim_dim_level, &
endif
c(unlim_dim_index) = unlim_dim_level
endif
if (fileobj%is_root) then
if(fileobj%use_collective) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
! NetCDF does not have the ability to specify collective I/O at
! the file basis so we must activate at the variable level
err = nf90_var_par_access(fileobj%ncid, varid, nf90_collective)
call check_netcdf_code(err, append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
Expand All @@ -462,20 +488,38 @@ subroutine netcdf_read_data_3d(fileobj, variable_name, buf, unlim_dim_level, &
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_3d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
else
if (fileobj%is_root) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (integer(kind=i8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_3d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
endif
endif
end subroutine netcdf_read_data_3d
Expand Down
82 changes: 61 additions & 21 deletions fms2_io/netcdf_io.F90
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ module netcdf_io_mod
character (len=20) :: time_name
type(dimension_information) :: bc_dimensions !<information about the current dimensions for regional
!! restart variables
logical :: use_collective = .false. !< Flag indicating if we should open the file for collective input
!! this should be set to .true. in the user application if they want
!! collective reads (put before open_file())
integer :: tile_comm=MPP_COMM_NULL !< MPI communicator used for collective reads.
!! To be replaced with a real communicator at user request

endtype FmsNetcdfFile_t

Expand Down Expand Up @@ -562,6 +567,8 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do

integer :: nc_format_param
integer :: err
integer :: netcdf4 !< Query the file for the _IsNetcdf4 global attribute in the event
!! that the open for collective reads fails
character(len=256) :: buf !< Filename with .res in the filename if it is a restart
character(len=256) :: buf2 !< Filename with the filename appendix if there is one
logical :: is_res
Expand Down Expand Up @@ -619,30 +626,30 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do
fileobj%is_root = mpp_pe() .eq. fileobj%io_root

fileobj%is_netcdf4 = .false.
!Open the file with netcdf if this rank is the I/O root.
if (fileobj%is_root) then
if (fms2_ncchksz == -1) call error("netcdf_file_open:: fms2_ncchksz not set, call fms2_io_init")
if (fms2_nc_format_param == -1) call error("netcdf_file_open:: fms2_nc_format_param not set, call fms2_io_init")

if (present(nc_format)) then
if (string_compare(nc_format, "64bit", .true.)) then
nc_format_param = nf90_64bit_offset
elseif (string_compare(nc_format, "classic", .true.)) then
nc_format_param = nf90_classic_model
elseif (string_compare(nc_format, "netcdf4", .true.)) then
fileobj%is_netcdf4 = .true.
nc_format_param = nf90_netcdf4
else
call error("unrecognized netcdf file format: '"//trim(nc_format)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are 64bit, classic, netcdf4")
endif
call string_copy(fileobj%nc_format, nc_format)
if (fms2_ncchksz == -1) call error("netcdf_file_open:: fms2_ncchksz not set, call fms2_io_init")
if (fms2_nc_format_param == -1) call error("netcdf_file_open:: fms2_nc_format_param not set, call fms2_io_init")

if (present(nc_format)) then
if (string_compare(nc_format, "64bit", .true.)) then
nc_format_param = nf90_64bit_offset
elseif (string_compare(nc_format, "classic", .true.)) then
nc_format_param = nf90_classic_model
elseif (string_compare(nc_format, "netcdf4", .true.)) then
fileobj%is_netcdf4 = .true.
nc_format_param = nf90_netcdf4
else
call string_copy(fileobj%nc_format, trim(fms2_nc_format))
nc_format_param = fms2_nc_format_param
fileobj%is_netcdf4 = fms2_is_netcdf4
call error("unrecognized netcdf file format: '"//trim(nc_format)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are 64bit, classic, netcdf4")
endif
call string_copy(fileobj%nc_format, nc_format)
else
call string_copy(fileobj%nc_format, trim(fms2_nc_format))
nc_format_param = fms2_nc_format_param
fileobj%is_netcdf4 = fms2_is_netcdf4
endif

!Open the file with netcdf if this rank is the I/O root.
if (fileobj%is_root .and. .not.(fileobj%use_collective)) then
if (string_compare(mode, "read", .true.)) then
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid, chunksize=fms2_ncchksz)
elseif (string_compare(mode, "append", .true.)) then
Expand All @@ -656,6 +663,39 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do
&"Check your open_file call, the acceptable values are read, append, write, overwrite")
endif
call check_netcdf_code(err, "netcdf_file_open:"//trim(fileobj%path))
elseif(fileobj%use_collective .and. (fileobj%tile_comm /= MPP_COMM_NULL)) then
if(string_compare(mode, "read", .true.)) then
! Open the file for collective reads if the user requested that treatment in their application.
! NetCDF does not have the ability to specify collective I/O at the file basis
! so we must activate each variable in netcdf_read_data_2d() and netcdf_read_data_3d()
err = nf90_open(trim(fileobj%path), ior(NF90_NOWRITE, NF90_MPIIO), fileobj%ncid, &
comm=fileobj%tile_comm, info=MPP_INFO_NULL)
if(err /= nf90_noerr) then
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid)
err = nf90_get_att(fileobj%ncid, nf90_global, "_IsNetcdf4", netcdf4)
err = nf90_close(fileobj%ncid)
if(netcdf4 /= 1) then
call mpp_error(NOTE,"netcdf_file_open: Open for collective read failed because the file is not &
netCDF-4 format."// &
" Falling back to parallel independent for file "// trim(fileobj%path))
endif
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid, chunksize=fms2_ncchksz)
endif
elseif (string_compare(mode, "write", .true.)) then
call mpp_error(FATAL,"netcdf_file_open: Attempt to create a file for collective write"// &
" This feature is not implemented"// trim(fileobj%path))
!err = nf90_create(trim(fileobj%path), ior(nf90_noclobber, nc_format_param), fileobj%ncid, &
! comm=fileobj%tile_comm, info=MPP_INFO_NULL)
elseif (string_compare(mode,"overwrite",.true.)) then
call mpp_error(FATAL,"netcdf_file_open: Attempt to create a file for collective overwrite"// &
" This feature is not implemented"// trim(fileobj%path))
!err = nf90_create(trim(fileobj%path), ior(nf90_clobber, nc_format_param), fileobj%ncid, &
! comm=fileobj%tile_comm, info=MPP_INFO_NULL)
else
call error("unrecognized file mode: '"//trim(mode)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are read, append, write, overwrite")
endif
call check_netcdf_code(err, "netcdf_file_open:"//trim(fileobj%path))
else
fileobj%ncid = missing_ncid
endif
Expand Down
11 changes: 10 additions & 1 deletion mpp/mpp.F90
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ module mpp_mod
public :: COMM_TAG_9, COMM_TAG_10, COMM_TAG_11, COMM_TAG_12
public :: COMM_TAG_13, COMM_TAG_14, COMM_TAG_15, COMM_TAG_16
public :: COMM_TAG_17, COMM_TAG_18, COMM_TAG_19, COMM_TAG_20
public :: MPP_FILL_INT,MPP_FILL_DOUBLE,MPP_INFO_NULL
public :: MPP_FILL_INT,MPP_FILL_DOUBLE,MPP_INFO_NULL,MPP_COMM_NULL
public :: mpp_init_test_full_init, mpp_init_test_init_true_only, mpp_init_test_peset_allocated
public :: mpp_init_test_clocks_init, mpp_init_test_datatype_list_init, mpp_init_test_logfile_init
public :: mpp_init_test_read_namelist, mpp_init_test_etc_unit, mpp_init_test_requests_allocated
Expand Down Expand Up @@ -1334,6 +1334,15 @@ module mpp_mod
integer, parameter :: MPP_INFO_NULL = 469762048
#endif

!> MPP_COMM_NULL acts as an analagous mpp-macro for MPI_COMM_NULL to share with fms2_io NetCDF4
!! mpi-io. The default value for the no-mpi case comes from Intel MPI and MPICH. OpenMPI sets
!! a default value of '2'
#if defined(use_libMPI)
integer, parameter :: MPP_COMM_NULL = MPI_COMM_NULL
#else
integer, parameter :: MPP_COMM_NULL = 67108864
#endif

!***********************************************************************
! variables needed for subroutine read_input_nml (include/mpp_util.inc)
!
Expand Down

0 comments on commit f5d9892

Please sign in to comment.