Skip to content

Commit

Permalink
(Explaining preceding WIP commits and this one) Inspired by attempts …
Browse files Browse the repository at this point in the history
…to make a hookier, simpler intro to Flow-IPC: I wanted a basic comparison to demonstrate (primarily) perf benefits of our core (but not only) use-case/API, SHM-backed zero-copy capnp-data transmission; and (secondarily) coding-convenience benefits of same. So I wrote this perf_demo test which compares vanilla-capnp use with IPC via local-socket with copy-in and copy-out versus Flow-IPC SHM-backed zero-copy capnp-ing. It is not comprehensive, but it is a nice start at least and will provide some benchmarks to quote in aforementioned Flow-IPC intro.
  • Loading branch information
ygoldfeld committed Mar 3, 2024
1 parent 087b968 commit b2608ba
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 44 deletions.
98 changes: 75 additions & 23 deletions test/suite/perf_demo/main_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
* See the License for the specific language governing
* permissions and limitations under the License. */

/* Please see main_srv.cpp top comment for an overview of this client program and the server counterpart.
*
* As is typical in these client-server test/demo programs, the 2 programs mirror each other. So the comments
* are generally in main_srv.cpp, and we keep it light here in main_cli.cpp; except where there's our-side-specific
* stuff. Please refer to the other file, as you go through this one. */

#include "common.hpp"
#include <flow/perf/checkpt_timer.hpp>

Expand All @@ -25,10 +31,14 @@ void verify_rsp(const perf_demo::schema::GetCacheRsp::Reader& rsp_root);
using Timer = flow::perf::Checkpointing_timer;
using Clock_type = flow::perf::Clock_type;

Task_engine g_asio;
flow::Fine_duration g_capnp_over_raw_rtt;
flow::Fine_duration g_capnp_zero_cpy_rtt;
size_t g_total_sz = 0;
static Task_engine g_asio;
/* These globals (which, again, don't really need to be global but are just for expediency for now at least, as they're
* referenced in a few benchmarks) are set by diff benchmarks and then summarized/analyzed a bit at the end of
* main(). */
static flow::Fine_duration g_capnp_over_raw_rtt;
static flow::Fine_duration g_capnp_zero_cpy_rtt;
// Byte count inside the transmitted data. 1st benchmark sets it; 2nd benchmarks ensures it got same-sized data too.
static size_t g_total_sz = 0;

int main(int argc, char const * const * argv)
{
Expand All @@ -48,16 +58,12 @@ int main(int argc, char const * const * argv)
constexpr String_view LOG_FILE = "perf_demo_cli.log";
constexpr int BAD_EXIT = 1;

/* Set up logging within this function. We could easily just use `cout` and `cerr` instead, but this
* Flow stuff will give us time stamps and such for free, so why not? Normally, one derives from
* Log_context to do this very trivially, but we just have the one function, main(), so far so: */
// Set up logging.
Config std_log_config;
std_log_config.init_component_to_union_idx_mapping<Flow_log_component>(1000, 999);
std_log_config.init_component_names<Flow_log_component>(flow::S_FLOW_LOG_COMPONENT_NAME_MAP, false, "perf_demo-");

Simple_ostream_logger std_logger(&std_log_config);
FLOW_LOG_SET_CONTEXT(&std_logger, Flow_log_component::S_UNCAT);

// This is separate: the IPC/Flow logging will go into this file.
const auto log_file = (argc >= 2) ? String_view(argv[1]) : LOG_FILE;
FLOW_LOG_INFO("Opening log file [" << log_file << "] for IPC/Flow logs only.");
Expand All @@ -66,8 +72,6 @@ int main(int argc, char const * const * argv)
Async_file_logger log_logger(nullptr, &log_config, log_file, false);

#if JEM_ELSE_CLASSIC
/* Instructed to do so by ipc::session::shm::arena_lend public docs (short version: this is basically a global,
* and it would not be cool for ipc::session non-global objects to impose their individual loggers on it). */
ipc::session::shm::arena_lend::Borrower_shm_pool_collection_repository_singleton::get_instance()
.set_logger(&log_logger);
#endif
Expand All @@ -83,21 +87,39 @@ int main(int argc, char const * const * argv)
FLOW_LOG_INFO("Session-client attempting to open session against session-server; "
"it'll either succeed or fail very soon.");

Session::Channels chans; // Server shall offer us 2 channels.
Session::Channels chans;
session.sync_connect(session.mdt_builder(), nullptr, nullptr, &chans); // Let it throw on error.
FLOW_LOG_INFO("Session/channels opened.");

assert(chans.size() == 2); // Server shall offer us 2 channels. (We could also ask for some above, but we won't.)

auto& chan_raw = chans[0]; // Binary channel for raw-ish tests.
Channel_struc chan_struc(&log_logger, std::move(chans[1]), // Structured channel: SHM-backed underneath.
ipc::transport::struc::Channel_base::S_SERIALIZE_VIA_SESSION_SHM, &session);

run_capnp_over_raw(&std_logger, &chan_raw);
run_capnp_zero_cpy(&std_logger, &chan_struc);
run_capnp_over_raw(&std_logger, &chan_raw); // Benchmark 1. capnp data transmission without Flow-IPC zero-copy.
run_capnp_zero_cpy(&std_logger, &chan_struc); // Benchmark 2. Same but with it.

/* They already printed detailed timing info; now let's summarize the total results. As you can see it
* just prints b1's RTT, b2's RTT, and the ratio; while reminding how much data was transmitted.
* (Ultimately b2's RTT will always be about the same and small; whereas b1's involves a bunch of copying
* into/out of tranport and hence will be proportional to data size.)
*
* The only subtlety is that we coarsen the RTT to be a multiple of 100us, rounding up. Reason: It's not
* bulletproof, and it might be different on slower machines, but for now I've found this to be decent in practice:
* There's quite a bit of variation for a small message's RTT, maybe +/- 50us; and the total tends to be, if
* rounded to nearest 100us, at least 100us. Furthermore, if sending small messages, sometimes there are
* paradoxical-ish results like b1-RTT/b2-RTT < 1, but really they're both around 100us, so it's more like 1.
* Once total_sz is increased beyond 10k-or-so, this stuff falls away and the coarsening to 100us-multiples
* doesn't really matter anyway and is easier to read.
*
* Maybe that's silliness. In any case the un-coarsened detailed results are printed by run_*(); here
* we're summarizing. @todo Revisit. */

const auto raw_rtt = ceil_div(round<microseconds>(g_capnp_over_raw_rtt).count(), microseconds::rep(100)) * 100;
const auto zcp_rtt = ceil_div(round<microseconds>(g_capnp_zero_cpy_rtt).count(), microseconds::rep(100)) * 100;

FLOW_LOG_INFO("Benchmark summary (coarsened to 100s of usec): ");
FLOW_LOG_INFO("Benchmark summary (rounded-up to 100-usec multiples): ");
FLOW_LOG_INFO("Transmission of ~[" << (g_total_sz / 1024) << " ki] of Cap'n Proto structured data: ");
FLOW_LOG_INFO("Via raw-local-stream-socket: RTT = [" << raw_rtt << " usec].");
FLOW_LOG_INFO("Via-zero-copy-Flow-IPC-channel ("
Expand Down Expand Up @@ -136,7 +158,14 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
using Capnp_word_array_array_ptr = kj::ArrayPtr<const Capnp_word_array_ptr>;
using Capnp_heap_engine = ::capnp::SegmentArrayMessageReader;

struct Algo :// Just so we can arrange functions in chronological order really.
/* Reminder: see main_srv.cpp run_capnp_over_raw() counterpart; we keep comments light except for client-specifics.
*
* In particular the couple comments there about how we could've had simpler code, had we used this or that technique,
* tends to apply more to *us* rather than main_srv.cpp counterpart (but to it too). We have more receiving logic
* including looping receiving, so we're a bit more complex; so those simplifications would've benefitted us more
* (while trading off other stuff... anyway see that comment in main_srv.cpp!). */

struct Algo :
public Log_context
{
Channel_raw& m_chan;
Expand All @@ -146,6 +175,8 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
size_t m_n_segs;
vector<Blob> m_segs;
bool m_new_seg_next = true;
/* Server sends the stuff, but we time from just before sending request to just-after receiving and accessing reply.
* Ctor call begins the timing; so wait until invoking it. */
std::optional<Timer> m_timer;

Algo(Logger* logger_ptr, Channel_raw* chan_ptr) :
Expand All @@ -161,7 +192,6 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
m_chan.start_send_blob_ops(ev_wait);
m_chan.start_receive_blob_ops(ev_wait);

// Receive a dummy message to synchronize initialization.
FLOW_LOG_INFO("< Expecting handshake SYN for initialization sync.");
m_chan.async_receive_blob(Blob_mutable(&m_n, sizeof(m_n)), &m_err_code, &m_sz,
[&](const Error_code& err_code, size_t) { on_sync(err_code); });
Expand All @@ -176,7 +206,7 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
FLOW_LOG_INFO("= Got handshake SYN.");

FLOW_LOG_INFO("> Issuing get-cache request via tiny message.");
m_timer.emplace(get_logger(), "capnp-raw", Timer::real_clock_types(), 100);
m_timer.emplace(get_logger(), "capnp-raw", Timer::real_clock_types(), 100); // Begin timing.
m_chan.send_blob(Blob_const(&m_n, sizeof(m_n)));
m_timer->checkpoint("sent request");

Expand All @@ -198,6 +228,23 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
m_timer->checkpoint("got seg-count");

m_segs.reserve(m_n_segs);

/* This is where the looping-read code is, and where we need to be careful to not start
* a recursion-loop (stack overflows-oh my) but rather an iteration-loop. That is, if an async_X() yields
* would-block then return; if it yields error then explode; but if it yields success, then do *not*
* call our own function, or some function that would call our own function (that did the async_X()).
* Rather, loop around to the next async_X().
*
* So we just have a simple state machine (with 2 states):
* m_new_seg_next = true; read seg-size; m_new_seg_next = false; read blobs until seg-size bytes are ready,
* placing them contiguously into the currently-being-read segment;
* repeat (until m_n_segs segs have been obtained).
*
* We use a flow::util::Blob (a-la vector<uint8_t>) for each segment; its .capacity() = seg-size, while
* its .size() = how many bytes we've filled out already. (It is formally allowed to write into the area
* [.end(), .begin() + capacity()).)
*/
assert(m_new_seg_next);
read_segs();
}

Expand Down Expand Up @@ -238,11 +285,14 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
m_new_seg_next = false;
assert(m_n != 0);

// New segment's size known; reserve the space and then set .size() = 0, while leaving .capacity() same.
m_segs.emplace_back(m_n);
m_segs.back().clear();
assert(m_segs.capacity() == m_n); // Ensure it didn't dealloc.
}
else
{
// Register the received bytes; then see if we finished the segment with that; or maybe even the last one.
auto& seg = m_segs.back();
seg.resize(seg.size() + sz);
if (seg.size() == seg.capacity())
Expand All @@ -255,7 +305,7 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
if (m_segs.size() == m_n_segs)
{
m_timer->checkpoint("got last seg");
on_complete_response();
on_complete_response(); // Yay! Next step of algo.
return true;
}
m_timer->checkpoint("got a seg");
Expand All @@ -268,6 +318,8 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)

void on_complete_response()
{
/* Now for vanilla Cap'n Proto work: We have the segments; use SegmentArrayMessageReader as normal to
* interpret it into a capnp-backed structured message. */
vector<Capnp_word_array_ptr> capnp_segs;
capnp_segs.reserve(m_segs.size());

Expand All @@ -280,7 +332,7 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
Capnp_heap_engine capnp_msg(capnp_segs_ptr,
/* Defeat safety limit. Search for ReaderOptions in (e.g.) heap_serializer.hpp
* source code for explanation. We do it here, since we are bypassing all that
* in favor of direct capnp code (in this part of the demo). */
* Flow-IPC goodness in favor of direct capnp code (in this part of the demo). */
::capnp::ReaderOptions{ std::numeric_limits<uint64_t>::max() / sizeof(word), 64 });

const auto rsp_root = capnp_msg.getRoot<perf_demo::schema::Body>().getGetCacheRsp();
Expand All @@ -302,8 +354,6 @@ void run_capnp_over_raw(flow::log::Logger* logger_ptr, Channel_raw* chan_ptr)
post(g_asio, [&]() { algo.start(); });
g_asio.run();
g_asio.restart();
g_asio.poll();
g_asio.restart();
} // run_capnp_over_raw()

void run_capnp_zero_cpy([[maybe_unused]] flow::log::Logger* logger_ptr, Channel_struc* chan_ptr)
Expand All @@ -314,7 +364,9 @@ void run_capnp_zero_cpy([[maybe_unused]] flow::log::Logger* logger_ptr, Channel_
using ::capnp::word;
using boost::asio::post;

struct Algo :// Just so we can arrange functions in chronological order really.
// Reminder: see main_srv.cpp run_capnp_over_raw() counterpart; we keep comments light except for client-specifics.

struct Algo :
public Log_context
{
Channel_struc& m_chan;
Expand Down
Loading

0 comments on commit b2608ba

Please sign in to comment.