Skip to content

Commit

Permalink
MPI_Comments_Renamings (trixi-framework#2059)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielDoehring authored Sep 4, 2024
1 parent 89f7b7d commit a2bf8e9
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 67 deletions.
2 changes: 2 additions & 0 deletions src/callbacks_step/amr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ function (amr_callback::AMRCallback)(u_ode::AbstractVector, mesh::TreeMesh,
@unpack to_refine, to_coarsen = amr_callback.amr_cache
empty!(to_refine)
empty!(to_coarsen)
# Note: This assumes that the entries of `lambda` are sorted with ascending cell ids
for element in eachindex(lambda)
controller_value = lambda[element]
if controller_value > 0
Expand Down Expand Up @@ -395,6 +396,7 @@ function (amr_callback::AMRCallback)(u_ode::AbstractVector, mesh::TreeMesh,
@unpack to_refine, to_coarsen = amr_callback.amr_cache
empty!(to_refine)
empty!(to_coarsen)
# Note: This assumes that the entries of `lambda` are sorted with ascending cell ids
for element in eachindex(lambda)
controller_value = lambda[element]
if controller_value > 0
Expand Down
30 changes: 17 additions & 13 deletions src/meshes/parallel_tree_mesh.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ function partition!(mesh::ParallelTreeMesh; allow_coarsening = true)
n_leaves_per_rank = OffsetArray(fill(div(length(leaves), mpi_nranks()),
mpi_nranks()),
0:(mpi_nranks() - 1))
for d in 0:(rem(length(leaves), mpi_nranks()) - 1)
n_leaves_per_rank[d] += 1
for rank in 0:(rem(length(leaves), mpi_nranks()) - 1)
n_leaves_per_rank[rank] += 1
end
@assert sum(n_leaves_per_rank) == length(leaves)

Expand All @@ -31,17 +31,20 @@ function partition!(mesh::ParallelTreeMesh; allow_coarsening = true)
mesh.n_cells_by_rank = similar(n_leaves_per_rank)

leaf_count = 0
# Assign first cell to rank 0 (employ depth-first indexing of cells)
mesh.first_cell_by_rank[0] = 1
# Iterate over all ranks
for d in 0:(mpi_nranks() - 1)
leaf_count += n_leaves_per_rank[d]
for rank in 0:(mpi_nranks() - 1)
leaf_count += n_leaves_per_rank[rank]
last_id = leaves[leaf_count]
parent_id = mesh.tree.parent_ids[last_id]

# Check if all children of the last parent are leaves
# If coarsening is allowed, we need to make sure that parents of leaves
# are on the same rank as the leaves when coarsened.
if allow_coarsening &&
# Check if all children of the last parent are leaves
all(id -> is_leaf(mesh.tree, id), @view mesh.tree.child_ids[:, parent_id]) &&
d < length(n_leaves_per_rank) - 1
rank < length(n_leaves_per_rank) - 1 # Make sure there is another rank

# To keep children of parent together if they are all leaves,
# all children are added to this rank
Expand All @@ -53,20 +56,21 @@ function partition!(mesh::ParallelTreeMesh; allow_coarsening = true)
additional_cells)
leaf_count += additional_leaves
# Add leaves to this rank, remove from next rank
n_leaves_per_rank[d] += additional_leaves
n_leaves_per_rank[d + 1] -= additional_leaves
n_leaves_per_rank[rank] += additional_leaves
n_leaves_per_rank[rank + 1] -= additional_leaves
end
end

@assert all(n -> n > 0, n_leaves_per_rank) "Too many ranks to properly partition the mesh!"

mesh.n_cells_by_rank[d] = last_id - mesh.first_cell_by_rank[d] + 1
mesh.tree.mpi_ranks[mesh.first_cell_by_rank[d]:last_id] .= d
mesh.n_cells_by_rank[rank] = last_id - mesh.first_cell_by_rank[rank] + 1
# Use depth-first indexing of cells again to assign also non leaf cells
mesh.tree.mpi_ranks[mesh.first_cell_by_rank[rank]:last_id] .= rank

# Set first cell of next rank
if d < length(n_leaves_per_rank) - 1
mesh.first_cell_by_rank[d + 1] = mesh.first_cell_by_rank[d] +
mesh.n_cells_by_rank[d]
if rank < length(n_leaves_per_rank) - 1 # Make sure there is another rank
mesh.first_cell_by_rank[rank + 1] = mesh.first_cell_by_rank[rank] +
mesh.n_cells_by_rank[rank]
end
end

Expand Down
65 changes: 34 additions & 31 deletions src/solvers/dgsem_p4est/dg_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ function start_mpi_send!(mpi_cache::P4estMPICache, mesh, equations, dg, cache)
data_size = nvariables(equations) * nnodes(dg)^(ndims(mesh) - 1)
n_small_elements = 2^(ndims(mesh) - 1)

for d in 1:length(mpi_cache.mpi_neighbor_ranks)
send_buffer = mpi_cache.mpi_send_buffers[d]
for rank in 1:length(mpi_cache.mpi_neighbor_ranks)
send_buffer = mpi_cache.mpi_send_buffers[rank]

for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[d])
for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[rank])
first = (index - 1) * data_size + 1
last = (index - 1) * data_size + data_size
local_side = cache.mpi_interfaces.local_sides[interface]
Expand All @@ -62,14 +62,15 @@ function start_mpi_send!(mpi_cache::P4estMPICache, mesh, equations, dg, cache)

# Set send_buffer corresponding to mortar data to NaN and overwrite the parts where local
# data exists
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[d]) * data_size
mortars_data_size = length(mpi_cache.mpi_neighbor_mortars[d]) *
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[rank]) *
data_size
mortars_data_size = length(mpi_cache.mpi_neighbor_mortars[rank]) *
n_small_elements * 2 * data_size
# `NaN |> eltype(...)` ensures that the NaN's are of the appropriate floating point type
send_buffer[(interfaces_data_size + 1):(interfaces_data_size + mortars_data_size)] .= NaN |>
eltype(mpi_cache)

for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[d])
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[rank])
index_base = interfaces_data_size +
(index - 1) * n_small_elements * 2 * data_size
indices = buffer_mortar_indices(mesh, index_base, data_size)
Expand All @@ -91,18 +92,18 @@ function start_mpi_send!(mpi_cache::P4estMPICache, mesh, equations, dg, cache)
end

# Start sending
for (index, d) in enumerate(mpi_cache.mpi_neighbor_ranks)
for (index, rank) in enumerate(mpi_cache.mpi_neighbor_ranks)
mpi_cache.mpi_send_requests[index] = MPI.Isend(mpi_cache.mpi_send_buffers[index],
d, mpi_rank(), mpi_comm())
rank, mpi_rank(), mpi_comm())
end

return nothing
end

function start_mpi_receive!(mpi_cache::P4estMPICache)
for (index, d) in enumerate(mpi_cache.mpi_neighbor_ranks)
for (index, rank) in enumerate(mpi_cache.mpi_neighbor_ranks)
mpi_cache.mpi_recv_requests[index] = MPI.Irecv!(mpi_cache.mpi_recv_buffers[index],
d, d, mpi_comm())
rank, rank, mpi_comm())
end

return nothing
Expand All @@ -118,11 +119,11 @@ function finish_mpi_receive!(mpi_cache::P4estMPICache, mesh, equations, dg, cach
n_positions = n_small_elements + 1

# Start receiving and unpack received data until all communication is finished
d = MPI.Waitany(mpi_cache.mpi_recv_requests)
while d !== nothing
recv_buffer = mpi_cache.mpi_recv_buffers[d]
data = MPI.Waitany(mpi_cache.mpi_recv_requests)
while data !== nothing
recv_buffer = mpi_cache.mpi_recv_buffers[data]

for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[d])
for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[data])
first = (index - 1) * data_size + 1
last = (index - 1) * data_size + data_size

Expand All @@ -133,8 +134,9 @@ function finish_mpi_receive!(mpi_cache::P4estMPICache, mesh, equations, dg, cach
end
end

interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[d]) * data_size
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[d])
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[data]) *
data_size
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[data])
index_base = interfaces_data_size +
(index - 1) * n_small_elements * 2 * data_size
indices = buffer_mortar_indices(mesh, index_base, data_size)
Expand All @@ -155,7 +157,7 @@ function finish_mpi_receive!(mpi_cache::P4estMPICache, mesh, equations, dg, cach
end
end

d = MPI.Waitany(mpi_cache.mpi_recv_requests)
data = MPI.Waitany(mpi_cache.mpi_recv_requests)
end

return nothing
Expand Down Expand Up @@ -311,10 +313,10 @@ function init_mpi_neighbor_connectivity(mpi_interfaces, mpi_mortars,
# For each neighbor rank, init connectivity data structures
mpi_neighbor_interfaces = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
mpi_neighbor_mortars = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
for (index, d) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(==(d),
for (index, rank) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(==(rank),
neighbor_ranks_interface)]
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (d in x),
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (rank in x),
neighbor_ranks_mortar)]
end

Expand Down Expand Up @@ -519,10 +521,10 @@ function exchange_normal_directions!(mpi_mortars, mpi_cache,
recv_requests = Vector{MPI.Request}(undef, length(mpi_neighbor_mortars))

# Fill send buffers
for d in 1:length(mpi_neighbor_ranks)
send_buffer = send_buffers[d]
for rank in 1:length(mpi_neighbor_ranks)
send_buffer = send_buffers[rank]

for (index, mortar) in enumerate(mpi_neighbor_mortars[d])
for (index, mortar) in enumerate(mpi_neighbor_mortars[rank])
index_base = (index - 1) * n_small_elements * data_size
indices = buffer_mortar_indices(mesh, index_base, data_size)
for position in mpi_mortars.local_neighbor_positions[mortar]
Expand All @@ -538,17 +540,18 @@ function exchange_normal_directions!(mpi_mortars, mpi_cache,
end

# Start data exchange
for (index, d) in enumerate(mpi_neighbor_ranks)
send_requests[index] = MPI.Isend(send_buffers[index], d, mpi_rank(), mpi_comm())
recv_requests[index] = MPI.Irecv!(recv_buffers[index], d, d, mpi_comm())
for (index, rank) in enumerate(mpi_neighbor_ranks)
send_requests[index] = MPI.Isend(send_buffers[index], rank, mpi_rank(),
mpi_comm())
recv_requests[index] = MPI.Irecv!(recv_buffers[index], rank, rank, mpi_comm())
end

# Unpack data from receive buffers
d = MPI.Waitany(recv_requests)
while d !== nothing
recv_buffer = recv_buffers[d]
data = MPI.Waitany(recv_requests)
while data !== nothing
recv_buffer = recv_buffers[data]

for (index, mortar) in enumerate(mpi_neighbor_mortars[d])
for (index, mortar) in enumerate(mpi_neighbor_mortars[data])
index_base = (index - 1) * n_small_elements * data_size
indices = buffer_mortar_indices(mesh, index_base, data_size)
for position in 1:n_small_elements
Expand All @@ -563,7 +566,7 @@ function exchange_normal_directions!(mpi_mortars, mpi_cache,
end
end

d = MPI.Waitany(recv_requests)
data = MPI.Waitany(recv_requests)
end

# Wait for communication to finish
Expand Down
6 changes: 3 additions & 3 deletions src/solvers/dgsem_t8code/dg_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ function init_mpi_neighbor_connectivity(mpi_mesh_info, mesh::ParallelT8codeMesh)
# For each neighbor rank, init connectivity data structures
mpi_neighbor_interfaces = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
mpi_neighbor_mortars = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
for (index, d) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(==(d),
for (index, rank) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(==(rank),
neighbor_ranks_interface)]
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (d in x),
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (rank in x),
neighbor_ranks_mortar)]
end

Expand Down
6 changes: 6 additions & 0 deletions src/solvers/dgsem_tree/containers_2d.jl
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ end
# Container data structure (structure-of-arrays style) for DG MPI interfaces
mutable struct MPIInterfaceContainer2D{uEltype <: Real} <: AbstractContainer
u::Array{uEltype, 4} # [leftright, variables, i, interfaces]
# Note: `local_neighbor_ids` stores the MPI-local neighbors, but with globally valid index!
local_neighbor_ids::Vector{Int} # [interfaces]
orientations::Vector{Int} # [interfaces]
remote_sides::Vector{Int} # [interfaces]
Expand Down Expand Up @@ -907,6 +908,8 @@ function init_mpi_interfaces!(mpi_interfaces, elements, mesh::TreeMesh2D)

# Create interface between elements
count += 1
# Note: `local_neighbor_ids` stores the MPI-local neighbors,
# but with globally valid index!
mpi_interfaces.local_neighbor_ids[count] = element

if iseven(direction) # element is "left" of interface, remote cell is "right" of interface
Expand Down Expand Up @@ -941,6 +944,7 @@ end
mutable struct MPIL2MortarContainer2D{uEltype <: Real} <: AbstractContainer
u_upper::Array{uEltype, 4} # [leftright, variables, i, mortars]
u_lower::Array{uEltype, 4} # [leftright, variables, i, mortars]
# Note: `local_neighbor_ids` stores the MPI-local neighbors, but with globally valid index!
local_neighbor_ids::Vector{Vector{Int}} # [mortars][ids]
local_neighbor_positions::Vector{Vector{Int}} # [mortars][positions]
# Large sides: left -> 1, right -> 2
Expand Down Expand Up @@ -1214,6 +1218,8 @@ function init_mpi_mortars!(mpi_mortars, elements, mesh::TreeMesh2D)
# 3 -> large element
count += 1

# Note: `local_neighbor_ids` stores the MPI-local neighbors,
# but with globally valid index!
local_neighbor_ids = Vector{Int}()
local_neighbor_positions = Vector{Int}()
if is_own_cell(mesh.tree, lower_cell_id)
Expand Down
42 changes: 22 additions & 20 deletions src/solvers/dgsem_tree/dg_2d_parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ end

# TODO: MPI dimension agnostic
function start_mpi_receive!(mpi_cache::MPICache)
for (index, d) in enumerate(mpi_cache.mpi_neighbor_ranks)
for (index, rank) in enumerate(mpi_cache.mpi_neighbor_ranks)
mpi_cache.mpi_recv_requests[index] = MPI.Irecv!(mpi_cache.mpi_recv_buffers[index],
d, d, mpi_comm())
rank, rank, mpi_comm())
end

return nothing
Expand All @@ -60,10 +60,10 @@ end
function start_mpi_send!(mpi_cache::MPICache, mesh, equations, dg, cache)
data_size = nvariables(equations) * nnodes(dg)^(ndims(mesh) - 1)

for d in 1:length(mpi_cache.mpi_neighbor_ranks)
send_buffer = mpi_cache.mpi_send_buffers[d]
for rank in 1:length(mpi_cache.mpi_neighbor_ranks)
send_buffer = mpi_cache.mpi_send_buffers[rank]

for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[d])
for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[rank])
first = (index - 1) * data_size + 1
last = (index - 1) * data_size + data_size

Expand All @@ -78,11 +78,12 @@ function start_mpi_send!(mpi_cache::MPICache, mesh, equations, dg, cache)

# Each mortar has a total size of 4 * data_size, set everything to NaN first and overwrite the
# parts where local data exists
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[d]) * data_size
mortars_data_size = length(mpi_cache.mpi_neighbor_mortars[d]) * 4 * data_size
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[rank]) *
data_size
mortars_data_size = length(mpi_cache.mpi_neighbor_mortars[rank]) * 4 * data_size
send_buffer[(interfaces_data_size + 1):(interfaces_data_size + mortars_data_size)] .= NaN

for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[d])
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[rank])
# First and last indices in the send buffer for mortar data obtained from local element
# in a given position
index_base = interfaces_data_size + (index - 1) * 4 * data_size
Expand Down Expand Up @@ -143,9 +144,9 @@ function start_mpi_send!(mpi_cache::MPICache, mesh, equations, dg, cache)
end

# Start sending
for (index, d) in enumerate(mpi_cache.mpi_neighbor_ranks)
for (index, rank) in enumerate(mpi_cache.mpi_neighbor_ranks)
mpi_cache.mpi_send_requests[index] = MPI.Isend(mpi_cache.mpi_send_buffers[index],
d, mpi_rank(), mpi_comm())
rank, mpi_rank(), mpi_comm())
end

return nothing
Expand All @@ -161,11 +162,11 @@ function finish_mpi_receive!(mpi_cache::MPICache, mesh, equations, dg, cache)
data_size = nvariables(equations) * nnodes(dg)^(ndims(mesh) - 1)

# Start receiving and unpack received data until all communication is finished
d = MPI.Waitany(mpi_cache.mpi_recv_requests)
while d !== nothing
recv_buffer = mpi_cache.mpi_recv_buffers[d]
data = MPI.Waitany(mpi_cache.mpi_recv_requests)
while data !== nothing
recv_buffer = mpi_cache.mpi_recv_buffers[data]

for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[d])
for (index, interface) in enumerate(mpi_cache.mpi_neighbor_interfaces[data])
first = (index - 1) * data_size + 1
last = (index - 1) * data_size + data_size

Expand All @@ -176,8 +177,9 @@ function finish_mpi_receive!(mpi_cache::MPICache, mesh, equations, dg, cache)
end
end

interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[d]) * data_size
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[d])
interfaces_data_size = length(mpi_cache.mpi_neighbor_interfaces[data]) *
data_size
for (index, mortar) in enumerate(mpi_cache.mpi_neighbor_mortars[data])
# First and last indices in the receive buffer for mortar data obtained from remote element
# in a given position
index_base = interfaces_data_size + (index - 1) * 4 * data_size
Expand Down Expand Up @@ -230,7 +232,7 @@ function finish_mpi_receive!(mpi_cache::MPICache, mesh, equations, dg, cache)
end
end

d = MPI.Waitany(mpi_cache.mpi_recv_requests)
data = MPI.Waitany(mpi_cache.mpi_recv_requests)
end

return nothing
Expand Down Expand Up @@ -431,10 +433,10 @@ function init_mpi_neighbor_connectivity(elements, mpi_interfaces, mpi_mortars,
# For each neighbor rank, init connectivity data structures
mpi_neighbor_interfaces = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
mpi_neighbor_mortars = Vector{Vector{Int}}(undef, length(mpi_neighbor_ranks))
for (index, d) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(x -> (x == d),
for (index, rank) in enumerate(mpi_neighbor_ranks)
mpi_neighbor_interfaces[index] = interface_ids[findall(x -> (x == rank),
neighbor_ranks_interface)]
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (d in x),
mpi_neighbor_mortars[index] = mortar_ids[findall(x -> (rank in x),
neighbor_ranks_mortar)]
end

Expand Down

0 comments on commit a2bf8e9

Please sign in to comment.