Skip to content

Commit ba0d691

Browse files
committed
transport(ptp): #388
1 parent 5b7667e commit ba0d691

17 files changed

+591
-239
lines changed

include/faabric/transport/PointToPointBroker.h

+6-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <faabric/batch-scheduler/SchedulingDecision.h>
44
#include <faabric/transport/PointToPointClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/util/config.h>
67
#include <faabric/util/locks.h>
78

@@ -120,27 +121,16 @@ class PointToPointBroker
120121

121122
void updateHostForIdx(int groupId, int groupIdx, std::string newHost);
122123

123-
void sendMessage(int groupId,
124-
int sendIdx,
125-
int recvIdx,
126-
const uint8_t* buffer,
127-
size_t bufferSize,
124+
void sendMessage(const PointToPointMessage& msg,
128125
std::string hostHint,
129126
bool mustOrderMsg = false);
130127

131-
void sendMessage(int groupId,
132-
int sendIdx,
133-
int recvIdx,
134-
const uint8_t* buffer,
135-
size_t bufferSize,
128+
void sendMessage(const PointToPointMessage& msg,
136129
bool mustOrderMsg = false,
137130
int sequenceNum = NO_SEQUENCE_NUM,
138131
std::string hostHint = "");
139132

140-
std::vector<uint8_t> recvMessage(int groupId,
141-
int sendIdx,
142-
int recvIdx,
143-
bool mustOrderMsg = false);
133+
void recvMessage(PointToPointMessage& msg, bool mustOrderMsg = false);
144134

145135
void clearGroup(int groupId);
146136

@@ -163,7 +153,8 @@ class PointToPointBroker
163153

164154
std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId);
165155

166-
Message doRecvMessage(int groupId, int sendIdx, int recvIdx);
156+
// Returns the message response code and the sequence number
157+
std::pair<MessageResponseCode, int> doRecvMessage(PointToPointMessage& msg);
167158

168159
void initSequenceCounters(int groupId);
169160

include/faabric/transport/PointToPointClient.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33
#include <faabric/proto/faabric.pb.h>
44
#include <faabric/transport/MessageEndpointClient.h>
55
#include <faabric/transport/PointToPointCall.h>
6+
#include <faabric/transport/PointToPointMessage.h>
67

78
namespace faabric::transport {
89

910
std::vector<std::pair<std::string, faabric::PointToPointMappings>>
1011
getSentMappings();
1112

12-
std::vector<std::pair<std::string, faabric::PointToPointMessage>>
13+
std::vector<std::pair<std::string, PointToPointMessage>>
1314
getSentPointToPointMessages();
1415

1516
std::vector<std::tuple<std::string,
1617
faabric::transport::PointToPointCall,
17-
faabric::PointToPointMessage>>
18+
PointToPointMessage>>
1819
getSentLockMessages();
1920

2021
void clearSentMessages();
@@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient
2627

2728
void sendMappings(faabric::PointToPointMappings& mappings);
2829

29-
void sendMessage(faabric::PointToPointMessage& msg,
30+
void sendMessage(const PointToPointMessage& msg,
3031
int sequenceNum = NO_SEQUENCE_NUM);
3132

3233
void groupLock(int appId,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <span>
5+
6+
namespace faabric::transport {
7+
8+
/* Simple fixed-size C-struct to capture the state of a PTP message moving
9+
* through Faabric.
10+
*
11+
* We require fixed-size, and no unique pointers to be able to use
12+
* high-throughput ring-buffers to send the messages around. This also means
13+
* that we manually malloc/free the data pointer. The message size is:
14+
* 4 * int32_t = 4 * 4 bytes = 16 bytes
15+
* 1 * size_t = 1 * 8 bytes = 8 bytes
16+
* 1 * void* = 1 * 8 bytes = 8 bytes
17+
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
18+
*/
19+
struct PointToPointMessage
20+
{
21+
int32_t appId;
22+
int32_t groupId;
23+
int32_t sendIdx;
24+
int32_t recvIdx;
25+
size_t dataSize;
26+
void* dataPtr;
27+
};
28+
static_assert((sizeof(PointToPointMessage) % 8) == 0,
29+
"PTP message mus be 8-aligned!");
30+
31+
// The wire format for a PTP message is very simple: the fixed-size struct,
32+
// followed by dataSize bytes containing the payload.
33+
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);
34+
35+
// This parsing function mallocs space for the message payload. This is to
36+
// keep the PTP message at fixed-size, and be able to efficiently move it
37+
// around in-memory queues
38+
void parsePtpMsg(std::span<const uint8_t> bytes, PointToPointMessage* msg);
39+
40+
// Alternative signature for parsing PTP messages for when the caller can
41+
// provide an already-allocated buffer to write into
42+
void parsePtpMsg(std::span<const uint8_t> bytes,
43+
PointToPointMessage* msg,
44+
std::span<uint8_t> preAllocBuffer);
45+
}

src/mpi/MpiWorld.cpp

+18-12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <faabric/mpi/MpiMessage.h>
33
#include <faabric/mpi/MpiWorld.h>
44
#include <faabric/planner/PlannerClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/transport/macros.h>
67
#include <faabric/util/ExecGraph.h>
78
#include <faabric/util/batch.h>
@@ -59,14 +60,16 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
5960
serializeMpiMsg(serialisedBuffer, msg);
6061

6162
try {
62-
broker.sendMessage(
63-
thisRankMsg->groupid(),
64-
sendRank,
65-
recvRank,
66-
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
67-
serialisedBuffer.size(),
68-
dstHost,
69-
true);
63+
// It is safe to send a pointer to a stack-allocated object
64+
// because the broker will make an additional copy (and so will NNG!)
65+
faabric::transport::PointToPointMessage msg(
66+
{ .groupId = thisRankMsg->groupid(),
67+
.sendIdx = sendRank,
68+
.recvIdx = recvRank,
69+
.dataSize = serialisedBuffer.size(),
70+
.dataPtr = (void*)serialisedBuffer.data() });
71+
72+
broker.sendMessage(msg, dstHost, true);
7073
} catch (std::runtime_error& e) {
7174
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}",
7275
thisRankMsg->appid(),
@@ -80,10 +83,12 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
8083

8184
MpiMessage MpiWorld::recvRemoteMpiMessage(int sendRank, int recvRank)
8285
{
83-
std::vector<uint8_t> msg;
86+
faabric::transport::PointToPointMessage msg(
87+
{ .groupId = thisRankMsg->groupid(),
88+
.sendIdx = sendRank,
89+
.recvIdx = recvRank });
8490
try {
85-
msg =
86-
broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true);
91+
broker.recvMessage(msg, true);
8792
} catch (std::runtime_error& e) {
8893
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - recv (remote) {} -> {}",
8994
thisRankMsg->appid(),
@@ -96,7 +101,8 @@ MpiMessage MpiWorld::recvRemoteMpiMessage(int sendRank, int recvRank)
96101

97102
// TODO(mpi-opt): make sure we minimze copies here
98103
MpiMessage parsedMsg;
99-
parseMpiMsg(msg, &parsedMsg);
104+
std::vector<uint8_t> msgBytes((uint8_t*) msg.dataPtr, (uint8_t*) msg.dataPtr + msg.dataSize);
105+
parseMpiMsg(msgBytes, &parsedMsg);
100106

101107
return parsedMsg;
102108
}

src/proto/faabric.proto

-8
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,6 @@ message StateAppendedResponse {
199199
// POINT-TO-POINT
200200
// ---------------------------------------------
201201

202-
message PointToPointMessage {
203-
int32 appId = 1;
204-
int32 groupId = 2;
205-
int32 sendIdx = 3;
206-
int32 recvIdx = 4;
207-
bytes data = 5;
208-
}
209-
210202
message PointToPointMappings {
211203
int32 appId = 1;
212204
int32 groupId = 2;

src/scheduler/Scheduler.cpp

+23-3
Original file line numberDiff line numberDiff line change
@@ -459,12 +459,32 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
459459
auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId);
460460
groupIdxs.erase(0);
461461
for (const auto& recvIdx : groupIdxs) {
462-
broker.sendMessage(
463-
groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int));
462+
// It is safe to send a pointer to the stack, because the
463+
// transport layer will perform an additional copy of the PTP
464+
// message to put it in the message body
465+
// TODO(no-inproc): this may not be true once we move the inproc
466+
// sockets to in-memory queues
467+
faabric::transport::PointToPointMessage msg(
468+
{ .groupId = groupId,
469+
.sendIdx = 0,
470+
.recvIdx = recvIdx,
471+
.dataSize = sizeof(int),
472+
.dataPtr = &newGroupId });
473+
474+
broker.sendMessage(msg);
464475
}
465476
} else if (overwriteNewGroupId == 0) {
466-
std::vector<uint8_t> bytes = broker.recvMessage(groupId, 0, groupIdx);
477+
faabric::transport::PointToPointMessage msg(
478+
{ .groupId = groupId, .sendIdx = 0, .recvIdx = groupIdx });
479+
// TODO(no-order): when we remove the need to order ptp messages we
480+
// should be able to call recv giving it a pre-allocated buffer,
481+
// avoiding the hassle of malloc-ing and free-ing
482+
broker.recvMessage(msg);
483+
std::vector<uint8_t> bytes((uint8_t*)msg.dataPtr,
484+
(uint8_t*)msg.dataPtr + msg.dataSize);
467485
newGroupId = faabric::util::bytesToInt(bytes);
486+
// The previous call makes a copy, so safe to free now
487+
faabric::util::free(msg.dataPtr);
468488
} else {
469489
// In some settings, like tests, we already know the new group id, so
470490
// we can set it here (and in fact, we need to do so when faking two

src/transport/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ faabric_lib(transport
99
MessageEndpointServer.cpp
1010
PointToPointBroker.cpp
1111
PointToPointClient.cpp
12+
PointToPointMessage.cpp
1213
PointToPointServer.cpp
1314
)
1415

src/transport/MessageEndpointClient.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void MessageEndpointClient::asyncSend(int header,
3636
sequenceNum);
3737
}
3838

39+
// TODO: consider making an iovec-style scatter/gather alternative signature
3940
void MessageEndpointClient::asyncSend(int header,
4041
const uint8_t* buffer,
4142
size_t bufferSize,

0 commit comments

Comments
 (0)