Skip to content

Commit 57d56fc

Browse files
committed
transport(ptp): move from protobuf to fixed-size c-struct
1 parent 29d1322 commit 57d56fc

14 files changed

+259
-158
lines changed

include/faabric/transport/PointToPointBroker.h

+3-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <faabric/batch-scheduler/SchedulingDecision.h>
44
#include <faabric/transport/PointToPointClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/util/config.h>
67
#include <faabric/util/locks.h>
78

@@ -120,19 +121,11 @@ class PointToPointBroker
120121

121122
void updateHostForIdx(int groupId, int groupIdx, std::string newHost);
122123

123-
void sendMessage(int groupId,
124-
int sendIdx,
125-
int recvIdx,
126-
const uint8_t* buffer,
127-
size_t bufferSize,
124+
void sendMessage(const PointToPointMessage& msg,
128125
std::string hostHint,
129126
bool mustOrderMsg = false);
130127

131-
void sendMessage(int groupId,
132-
int sendIdx,
133-
int recvIdx,
134-
const uint8_t* buffer,
135-
size_t bufferSize,
128+
void sendMessage(const PointToPointMessage& msg,
136129
bool mustOrderMsg = false,
137130
int sequenceNum = NO_SEQUENCE_NUM,
138131
std::string hostHint = "");

include/faabric/transport/PointToPointClient.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33
#include <faabric/proto/faabric.pb.h>
44
#include <faabric/transport/MessageEndpointClient.h>
55
#include <faabric/transport/PointToPointCall.h>
6+
#include <faabric/transport/PointToPointMessage.h>
67

78
namespace faabric::transport {
89

910
std::vector<std::pair<std::string, faabric::PointToPointMappings>>
1011
getSentMappings();
1112

12-
std::vector<std::pair<std::string, faabric::PointToPointMessage>>
13+
std::vector<std::pair<std::string, PointToPointMessage>>
1314
getSentPointToPointMessages();
1415

1516
std::vector<std::tuple<std::string,
1617
faabric::transport::PointToPointCall,
17-
faabric::PointToPointMessage>>
18+
PointToPointMessage>>
1819
getSentLockMessages();
1920

2021
void clearSentMessages();
@@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient
2627

2728
void sendMappings(faabric::PointToPointMappings& mappings);
2829

29-
void sendMessage(faabric::PointToPointMessage& msg,
30+
void sendMessage(const PointToPointMessage& msg,
3031
int sequenceNum = NO_SEQUENCE_NUM);
3132

3233
void groupLock(int appId,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <span>
5+
6+
namespace faabric::transport {
7+
8+
/* Simple fixed-size C-struct to capture the state of a PTP message moving
9+
* through Faabric.
10+
*
11+
* We require fixed-size, and no unique pointers to be able to use
12+
* high-throughput ring-buffers to send the messages around. This also means
13+
* that we manually malloc/free the data pointer. The message size is:
14+
* 4 * int32_t = 4 * 4 bytes = 16 bytes
15+
* 1 * size_t = 1 * 8 bytes = 8 bytes
16+
* 1 * void* = 1 * 8 bytes = 8 bytes
17+
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
18+
*/
19+
struct PointToPointMessage
20+
{
21+
int32_t appId;
22+
int32_t groupId;
23+
int32_t sendIdx;
24+
int32_t recvIdx;
25+
size_t dataSize;
26+
void* dataPtr;
27+
};
28+
static_assert((sizeof(PointToPointMessage) % 8) == 0,
29+
"PTP message mus be 8-aligned!");
30+
31+
// The wire format for a PTP message is very simple: the fixed-size struct,
32+
// followed by dataSize bytes containing the payload.
33+
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);
34+
35+
void parsePtpMsg(std::span<const uint8_t> buffer, PointToPointMessage* msg);
36+
}

src/mpi/MpiWorld.cpp

+11-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <faabric/mpi/MpiWorld.h>
33
#include <faabric/mpi/mpi.pb.h>
44
#include <faabric/planner/PlannerClient.h>
5+
#include <faabric/transport/PointToPointMessage.h>
56
#include <faabric/transport/macros.h>
67
#include <faabric/util/ExecGraph.h>
78
#include <faabric/util/batch.h>
@@ -60,14 +61,16 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
6061
throw std::runtime_error("Error serialising message");
6162
}
6263
try {
63-
broker.sendMessage(
64-
thisRankMsg->groupid(),
65-
sendRank,
66-
recvRank,
67-
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
68-
serialisedBuffer.size(),
69-
dstHost,
70-
true);
64+
// It is safe to send a pointer to a stack-allocated object
65+
// because the broker will make an additional copy (and so will NNG!)
66+
faabric::transport::PointToPointMessage msg(
67+
{ .groupId = thisRankMsg->groupid(),
68+
.sendIdx = sendRank,
69+
.recvIdx = recvRank,
70+
.dataSize = serialisedBuffer.size(),
71+
.dataPtr = (void*)serialisedBuffer.data() });
72+
73+
broker.sendMessage(msg, dstHost, true);
7174
} catch (std::runtime_error& e) {
7275
SPDLOG_ERROR("{}:{}:{} Timed out with: MPI - send {} -> {}",
7376
thisRankMsg->appid(),

src/proto/faabric.proto

-8
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,6 @@ message StateAppendedResponse {
199199
// POINT-TO-POINT
200200
// ---------------------------------------------
201201

202-
message PointToPointMessage {
203-
int32 appId = 1;
204-
int32 groupId = 2;
205-
int32 sendIdx = 3;
206-
int32 recvIdx = 4;
207-
bytes data = 5;
208-
}
209-
210202
message PointToPointMappings {
211203
int32 appId = 1;
212204
int32 groupId = 2;

src/scheduler/Scheduler.cpp

+13-2
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,19 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg,
456456
auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId);
457457
groupIdxs.erase(0);
458458
for (const auto& recvIdx : groupIdxs) {
459-
broker.sendMessage(
460-
groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int));
459+
// It is safe to send a pointer to the stack, because the
460+
// transport layer will perform an additional copy of the PTP
461+
// message to put it in the message body
462+
// TODO(no-inproc): this may not be true once we move the inproc
463+
// sockets to in-memory queues
464+
faabric::transport::PointToPointMessage msg(
465+
{ .groupId = groupId,
466+
.sendIdx = 0,
467+
.recvIdx = recvIdx,
468+
.dataSize = sizeof(int),
469+
.dataPtr = &newGroupId });
470+
471+
broker.sendMessage(msg);
461472
}
462473
} else if (overwriteNewGroupId == 0) {
463474
std::vector<uint8_t> bytes = broker.recvMessage(groupId, 0, groupIdx);

src/transport/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ faabric_lib(transport
99
MessageEndpointServer.cpp
1010
PointToPointBroker.cpp
1111
PointToPointClient.cpp
12+
PointToPointMessage.cpp
1213
PointToPointServer.cpp
1314
)
1415

src/transport/MessageEndpointClient.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void MessageEndpointClient::asyncSend(int header,
3636
sequenceNum);
3737
}
3838

39+
// TODO: consider making an iovec-style scatter/gather alternative signature
3940
void MessageEndpointClient::asyncSend(int header,
4041
const uint8_t* buffer,
4142
size_t bufferSize,

0 commit comments

Comments
 (0)