2
2
#include < faabric/mpi/MpiWorld.h>
3
3
#include < faabric/mpi/mpi.pb.h>
4
4
#include < faabric/planner/PlannerClient.h>
5
+ #include < faabric/transport/PointToPointMessage.h>
5
6
#include < faabric/transport/macros.h>
6
7
#include < faabric/util/ExecGraph.h>
7
8
#include < faabric/util/batch.h>
@@ -60,14 +61,16 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
60
61
throw std::runtime_error (" Error serialising message" );
61
62
}
62
63
try {
63
- broker.sendMessage (
64
- thisRankMsg->groupid (),
65
- sendRank,
66
- recvRank,
67
- reinterpret_cast <const uint8_t *>(serialisedBuffer.data ()),
68
- serialisedBuffer.size (),
69
- dstHost,
70
- true );
64
+ // It is safe to send a pointer to a stack-allocated object
65
+ // because the broker will make an additional copy (and so will NNG!)
66
+ faabric::transport::PointToPointMessage msg (
67
+ { .groupId = thisRankMsg->groupid (),
68
+ .sendIdx = sendRank,
69
+ .recvIdx = recvRank,
70
+ .dataSize = serialisedBuffer.size (),
71
+ .dataPtr = (void *)serialisedBuffer.data () });
72
+
73
+ broker.sendMessage (msg, dstHost, true );
71
74
} catch (std::runtime_error& e) {
72
75
SPDLOG_ERROR (" {}:{}:{} Timed out with: MPI - send {} -> {}" ,
73
76
thisRankMsg->appid (),
@@ -82,10 +85,12 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
82
85
std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage (int sendRank,
83
86
int recvRank)
84
87
{
85
- std::vector<uint8_t > msg;
88
+ faabric::transport::PointToPointMessage msg (
89
+ { .groupId = thisRankMsg->groupid (),
90
+ .sendIdx = sendRank,
91
+ .recvIdx = recvRank });
86
92
try {
87
- msg =
88
- broker.recvMessage (thisRankMsg->groupid (), sendRank, recvRank, true );
93
+ broker.recvMessage (msg, true );
89
94
} catch (std::runtime_error& e) {
90
95
SPDLOG_ERROR (" {}:{}:{} Timed out with: MPI - recv (remote) {} -> {}" ,
91
96
thisRankMsg->appid (),
@@ -95,7 +100,12 @@ std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
95
100
recvRank);
96
101
throw e;
97
102
}
98
- PARSE_MSG (MPIMessage, msg.data (), msg.size ());
103
+
104
+ // Parsing into the protobuf makes a copy of the message, so we can
105
+ // free the heap pointer after
106
+ PARSE_MSG (MPIMessage, msg.dataPtr , msg.dataSize );
107
+ faabric::util::free (msg.dataPtr );
108
+
99
109
return std::make_shared<MPIMessage>(parsedMsg);
100
110
}
101
111
@@ -599,7 +609,10 @@ void MpiWorld::doRecv(std::shared_ptr<MPIMessage>& m,
599
609
// Assert message integrity
600
610
// Note - this checks won't happen in Release builds
601
611
if (m->messagetype () != messageType) {
602
- SPDLOG_ERROR (" Different message types (got: {}, expected: {})" ,
612
+ SPDLOG_ERROR (" {}:{}:{} Different message types (got: {}, expected: {})" ,
613
+ m->worldid (),
614
+ m->sender (),
615
+ m->destination (),
603
616
m->messagetype (),
604
617
messageType);
605
618
}
0 commit comments