Skip to content

Commit

Permalink
Merge pull request #1889 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
NikkiWines authored Oct 8, 2024
2 parents 56ec22e + f440475 commit 766d766
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 14 deletions.
12 changes: 7 additions & 5 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <libstuff/libstuff.h>
#include <libstuff/SRandom.h>
#include <libstuff/AutoTimer.h>
#include <libstuff/ResourceMonitorThread.h>
#include <PageLockGuard.h>
#include <sqlitecluster/SQLitePeer.h>

Expand Down Expand Up @@ -116,9 +117,9 @@ void BedrockServer::sync()
// our worker threads now. We don't wait until the node is `LEADING` or `FOLLOWING`, as it's state can change while
// it's running, and our workers will have to maintain awareness of that state anyway.
SINFO("Starting " << workerThreads << " worker threads.");
list<thread> workerThreadList;
list<ResourceMonitorThread> workerThreadList;
for (int threadId = 0; threadId < workerThreads; threadId++) {
workerThreadList.emplace_back(&BedrockServer::worker, this, threadId);
workerThreadList.emplace_back([this, threadId](){this->worker(threadId);});
}

// Now we jump into our main command processing loop.
Expand Down Expand Up @@ -953,6 +954,7 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
if (!canWriteParallel) {
// Roll back the transaction, it'll get re-run in the sync thread.
core.rollback();
dbScope.release();
auto _clusterMessengerCopy = _clusterMessenger;
if (getState() == SQLiteNodeState::LEADING) {
// Limit the command timeout to 20s to avoid blocking the sync thread long enough to cause the cluster to give up and elect a new leader (causing a fork), which happens
Expand Down Expand Up @@ -1318,7 +1320,7 @@ BedrockServer::BedrockServer(const SData& args_)

// Start the sync thread, which will start the worker threads.
SINFO("Launching sync thread '" << _syncThreadName << "'");
_syncThread = thread(&BedrockServer::syncWrapper, this);
_syncThread = ResourceMonitorThread(&BedrockServer::syncWrapper, this);
}

BedrockServer::~BedrockServer() {
Expand Down Expand Up @@ -1867,7 +1869,7 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
if (__quiesceThread) {
response.methodLine = "400 Already Blocked";
} else {
__quiesceThread = new thread([&]() {
__quiesceThread = new ResourceMonitorThread([&]() {
shared_ptr<SQLitePool> dbPoolCopy = _dbPool;
if (dbPoolCopy) {
SQLiteScopedHandle dbScope(*_dbPool, _dbPool->getIndex());
Expand Down Expand Up @@ -2097,7 +2099,7 @@ void BedrockServer::_acceptSockets() {
bool threadStarted = false;
while (!threadStarted) {
try {
t = thread(&BedrockServer::handleSocket, this, move(socket), port == _controlPort, port == _commandPortPublic, port == _commandPortPrivate);
t = ResourceMonitorThread(&BedrockServer::handleSocket, this, move(socket), port == _controlPort, port == _commandPortPublic, port == _commandPortPrivate);
threadStarted = true;
} catch (const system_error& e) {
// We don't care about this lock here from a performance perspective, it only happens when we
Expand Down
26 changes: 26 additions & 0 deletions libstuff/ResourceMonitorThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include "ResourceMonitorThread.h"
#include "libstuff/libstuff.h"
#include <format>
#include <cmath>

thread_local uint64_t ResourceMonitorThread::threadStartTime;
thread_local double ResourceMonitorThread::cpuStartTime;

void ResourceMonitorThread::beforeProcessStart() {
threadStartTime = STimeNow();
cpuStartTime = SGetCPUUserTime();
}

void ResourceMonitorThread::afterProcessFinished() {
const uint64_t threadUserTime = STimeNow() - threadStartTime;
const double cpuUserTime = SGetCPUUserTime() - cpuStartTime;

// This shouldn't happen since the time to start/finish a thread should take more than a microsecond, but to be
// sure we're not dividing by 0 and causing crashes, let's add an if here and return if threadEndTime is 0.
if (threadUserTime == 0) {
return;
}
const double cpuUserPercentage = round((cpuUserTime / static_cast<double>(threadUserTime)) * 100 * 1000) / 1000;
const pid_t tid = syscall(SYS_gettid);
SINFO(format("Thread finished. pID: '{}', CPUTime: '{}µs', CPUPercentage: '{}%'", tid, cpuUserTime, cpuUserPercentage));
}
31 changes: 31 additions & 0 deletions libstuff/ResourceMonitorThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include "libstuff/libstuff.h"
#include <thread>

using namespace std;

// This class is a wrapper around the default thread. We use it to collect the thread CPU usage. That allows us
// to investigate if we have any threads using more resources than it should, which can cause CPU usage peaks in
// the cluster.
class ResourceMonitorThread : public thread
{
public:
// When calling this constructor, if you're passing a class member function as the `f` parameter and that
// function receives parameters, you will need to wrap your function call in a lambda, doing something like:
// ResourceMonitorThread([=, this]{ this->memberFunction(param1, param2);});
template<typename F, typename... Args>
ResourceMonitorThread(F&& f, Args&&... args):
thread(ResourceMonitorThread::wrapper<F&&, Args&&...>, forward<F&&>(f), forward<Args&&>(args)...){};
private:
thread_local static uint64_t threadStartTime;
thread_local static double cpuStartTime;

static void beforeProcessStart();
static void afterProcessFinished();

template<typename F, typename... Args>
static void wrapper(F&& f, Args&&... args) {
beforeProcessStart();
invoke(forward<F>(f), forward<Args>(args)...);
afterProcessFinished();
}
};
9 changes: 9 additions & 0 deletions libstuff/libstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
// C library
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <execinfo.h>
#include <sys/un.h>
#include <cxxabi.h>
Expand Down Expand Up @@ -3201,3 +3203,10 @@ SString& SString::operator=(const bool from) {
return *this;
}

double SGetCPUUserTime() {
struct rusage usage;
getrusage(RUSAGE_THREAD, &usage);

// Returns the current threads CPU user time in microseconds
return static_cast<double>(usage.ru_utime.tv_sec) * 1e6 + static_cast<double>(usage.ru_utime.tv_usec);
}
3 changes: 3 additions & 0 deletions libstuff/libstuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,7 @@ string SGUnzip(const string& content);
// Command-line helpers
STable SParseCommandLine(int argc, char* argv[]);

// Returns the CPU usage inside the current thread
double SGetCPUUserTime();

#endif // LIBSTUFF_H
Loading

0 comments on commit 766d766

Please sign in to comment.