Skip to content

Commit

Permalink
comments on mpi communicator
Browse files Browse the repository at this point in the history
  • Loading branch information
paulheinr committed Nov 8, 2024
1 parent 6ccc396 commit ba1acfa
Showing 1 changed file with 69 additions and 63 deletions.
132 changes: 69 additions & 63 deletions src/simulation/messaging/communication/communicators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use tracing::{debug, info, instrument, span, Level};

use crate::simulation::wire_types::messages::{SimMessage, SyncMessage, TravelTimesMessage};

pub trait Message {
fn serialize(&self) -> Vec<u8>;
fn deserialize(data: &[u8]) -> Self;
fn to(&self) -> u32;
fn from(&self) -> u32;
}
// pub trait Message {
// fn serialize(&self) -> Vec<u8>;
// fn deserialize(data: &[u8]) -> Self;
// fn to(&self) -> u32;
// fn from(&self) -> u32;
// }

pub trait SimCommunicator {
fn send_receive_vehicles<F>(
Expand All @@ -36,13 +36,13 @@ pub trait SimCommunicator {
travel_times: HashMap<u64, u32>,
) -> Vec<TravelTimesMessage>;

fn isend_request<M>(&mut self, message: M)
where
M: Message;

fn irecv_request<M>(&mut self) -> Vec<M>
where
M: Message;
// fn isend_request<M>(&mut self, message: M)
// where
// M: Message;
//
// fn irecv_request<M>(&mut self) -> Vec<M>
// where
// M: Message;

fn barrier(&self);

Expand Down Expand Up @@ -72,19 +72,19 @@ impl SimCommunicator for DummySimCommunicator {
vec![TravelTimesMessage::from(travel_times)]
}

fn isend_request<M>(&mut self, message: M)
where
M: Message,
{
todo!()
}

fn irecv_request<M>(&mut self) -> Vec<M>
where
M: Message,
{
todo!()
}
// fn isend_request<M>(&mut self, message: M)
// where
// M: Message,
// {
// todo!()
// }
//
// fn irecv_request<M>(&mut self) -> Vec<M>
// where
// M: Message,
// {
// todo!()
// }

fn barrier(&self) {
info!("Barrier was called on DummySimCommunicator, which doesn't do anything.")
Expand Down Expand Up @@ -214,19 +214,19 @@ impl SimCommunicator for ChannelSimCommunicator {
result
}

fn isend_request<M>(&mut self, message: M)
where
M: Message,
{
todo!()
}

fn irecv_request<M>(&mut self) -> Vec<M>
where
M: Message,
{
todo!()
}
// fn isend_request<M>(&mut self, message: M)
// where
// M: Message,
// {
// todo!()
// }
//
// fn irecv_request<M>(&mut self) -> Vec<M>
// where
// M: Message,
// {
// todo!()
// }

fn barrier(&self) {
self.barrier.wait();
Expand All @@ -237,13 +237,19 @@ impl SimCommunicator for ChannelSimCommunicator {
}
}

pub struct MpiSimCommunicator<'data, 'scope, 'send_buffer>
/// We need the send buffer to allow asynchronous communication. More on that can be found here:
/// https://github.com/rsmpi/rsmpi/discussions/190
///
/// To put it in a nutshell: Since we don't know when the message is actually sent, we need to store the requests in this struct.
/// Since the requests depend on the buffer, the buffer lifetime (`'send_buffer`) must outlive the request lifetime (`'request`).
/// Thus, the buffer is created before starting the simulation in the controller.
pub struct MpiSimCommunicator<'data, 'request, 'send_buffer>
where
'send_buffer: 'scope,
'send_buffer: 'request,
{
pub mpi_communicator: SystemCommunicator,
pub scope: &'data LocalScope<'scope>,
pub requests: &'data mut RequestCollection<'scope, Vec<u8>>,
pub scope: &'data LocalScope<'request>,
pub requests: &'data mut RequestCollection<'request, Vec<u8>>,
pub send_buffer: &'send_buffer Vec<OnceCell<Vec<u8>>>,
pub send_count: u64,
}
Expand Down Expand Up @@ -349,26 +355,26 @@ where
messages
}

fn isend_request<M>(&mut self, message: M)
where
M: Message,
{
let vec = self.send_buffer[self.send_count as usize].get_or_init(|| message.serialize());
let req = self
.mpi_communicator
.process_at_rank(message.to() as Rank)
.immediate_send(self.scope, vec);
self.requests.add(req);
self.send_count += 1;
}

fn irecv_request<M>(&mut self) -> Vec<M>
where
M: Message,
{
let (encoded_msg, _status) = self.mpi_communicator.any_process().receive_vec();
vec![M::deserialize(&encoded_msg)]
}
// fn isend_request<M>(&mut self, message: M)
// where
// M: Message,
// {
// let vec = self.send_buffer[self.send_count as usize].get_or_init(|| message.serialize());
// let req = self
// .mpi_communicator
// .process_at_rank(message.to() as Rank)
// .immediate_send(self.scope, vec);
// self.requests.add(req);
// self.send_count += 1;
// }
//
// fn irecv_request<M>(&mut self) -> Vec<M>
// where
// M: Message,
// {
// let (encoded_msg, _status) = self.mpi_communicator.any_process().receive_vec();
// vec![M::deserialize(&encoded_msg)]
// }

fn barrier(&self) {
self.mpi_communicator.barrier();
Expand Down

0 comments on commit ba1acfa

Please sign in to comment.