Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sampling Refactoring #303

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 63 additions & 73 deletions xprof/xprof.rb.in
Original file line number Diff line number Diff line change
Expand Up @@ -270,33 +270,46 @@ def thapi_trace_dir_root
end

# _
# |_) _. ._ ._ o _ ._
# |_) (_| | | | (/_ |
# | \ _. _ ._ _ _ ._
# |_/ (_| (/_ | | | (_) | |
#
class SyncDaemon
class SpawnDaemon
SIGRTMIN = 34
RT_SIGNAL_READY = SIGRTMIN
RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1
RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2
RT_SIGNAL_FINISH = SIGRTMIN + 3

def _lazy_exec(&block)
return unless in_mpi_env?

def _lazy_exec(_cond, &block)
Signal.trap(RT_SIGNAL_READY) do
return
end
block.call
sleep
end

def lazy_exec(message = nil, &block)
def lazy_exec(message = nil, cond, &block)
LOGGER.info_block(message) do
# Don't inline, it Trap doesn't work with logger
unless cond
LOGGER.debug("#{message}: No-op")
return
end
# Don't inline, Trap doesn't work with logger
# https://bugs.ruby-lang.org/issues/7917
_lazy_exec(&block)
_lazy_exec(cond, &block)
end
end
end

# _
# |_) _. ._ ._ o _ ._
# |_) (_| | | | (/_ |
#
class SyncDaemon < SpawnDaemon
RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1
RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2

def lazy_exec_mpi(message = nil, &block)
lazy_exec(message, in_mpi_env?, &block)
end

def initialize
daemon_type = env_fetch_first('THAPI_SYNC_DAEMON')
Expand All @@ -320,25 +333,25 @@ class SyncDaemon
end

LOGGER.debug { "spawn(#{daemon} #{Process.pid})" }
lazy_exec("Initialize SyncDaemon #{daemon_type}") do
lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do
@pid = spawn("#{daemon} #{Process.pid}")
end
end

def finalize
lazy_exec('Finalize SyncDaemon') do
lazy_exec_mpi('Finalize SyncDaemon') do
`kill -#{RT_SIGNAL_FINISH} #{@pid}`
end
end

def local_barrier(name)
lazy_exec("Local_barrier #{name}") do
lazy_exec_mpi("Local_barrier #{name}") do
`kill -#{RT_SIGNAL_LOCAL_BARRIER} #{@pid}`
end
end

def global_barrier
lazy_exec('Global_barrier') do
lazy_exec_mpi('Global_barrier') do
`kill -#{RT_SIGNAL_GLOBAL_BARRIER} #{@pid}`
end
end
Expand All @@ -358,6 +371,37 @@ class SyncDaemon
end
end

# __
# (_ _. ._ _ ._ | o ._ _
# __) (_| | | | |_) | | | | (_|
# | _|

def sampling?
OPTIONS[:sample] && mpi_local_master?
sbekele81 marked this conversation as resolved.
Show resolved Hide resolved
end

class SamplingDaemon < SpawnDaemon
def lazy_exec_sampling(message = nil, &block)
lazy_exec(message, sampling?, &block)
end

def initialize
daemon_path = "#{__dir__}/sampling_daemon"
raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path)

LOGGER.debug { "spawn(sampling_daemon) #{Process.pid})" }
lazy_exec_sampling('Initialize SamplingDaemon') do
@pid = spawn("#{daemon_path} #{Process.pid}")
end
end

def finalize
lazy_exec_sampling('Finalize SamplingDaemon') do
`kill -#{RT_SIGNAL_FINISH} #{@pid}`
end
end
end

#
# | _|_ _|_ ._ _
# |_ |_ |_ | | (_|
Expand All @@ -366,11 +410,6 @@ def lttng_session_uuid
Digest::MD5.hexdigest(lttng_trace_dir_tmp)
end

def sampling?
return false unless OPTIONS[:sample]
mpi_local_master?
end

def env_tracers
# Return the list of backends (used by local master to enable lttng events)
# and the ENV used by any traced-ranks to preload THAPI tracers
Expand Down Expand Up @@ -420,9 +459,6 @@ def env_tracers
end

# Sample
# Currently the same `so` does the tracing, and the sampling
# This mean that is the local rank is not part of the `traced-ranks`
# No sampling will be performed
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old documentation needed to be removed

if sampling?
LOGGER.debug('Sampling Enabled')
h['LTTNG_UST_SAMPLING'] = 1
Expand Down Expand Up @@ -605,7 +641,7 @@ def lm_setup_lttng(backends)
profiling: OPTIONS[:profile])
end

if OPTIONS.include?(:sample)
if sampling?
sbekele81 marked this conversation as resolved.
Show resolved Hide resolved
channel_name = 'non-blocking-channel'
exec("lttng enable-channel --userspace --session=#{lttng_session_uuid} #{channel_name}")
exec("lttng add-context --userspace --session=#{lttng_session_uuid} --channel=#{channel_name} -t vpid -t vtid")
Expand Down Expand Up @@ -732,48 +768,6 @@ def gm_rename_folder
thapi_trace_dir_root
end

class SamplingDaemon
SIGRTMIN = 34
SIG_SAMPLING_READY = SIGRTMIN
SIG_SAMPLING_FINISH = SIGRTMIN + 1

attr_reader :pid

def initialize
@pid = nil
end

def start(parent_pid)
return unless sampling?

daemon_path = "#{__dir__}/sampling_daemon"
raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path)

@pid = spawn("#{daemon_path} #{parent_pid}")
Process.detach(@pid)

wait_for_ready_signal
end

def finalize
return unless @pid

Process.kill(SIG_SAMPLING_FINISH, @pid)

wait_for_ready_signal
end

private

def wait_for_ready_signal
received_ready = false
Signal.trap(SIG_SAMPLING_READY) do
received_ready = true
end
sleep(0.1) until received_ready # Wait until READY signal is received
end
end

# Start, Stop lttng, amd do the on-node analsysis
def trace_and_on_node_processing(usr_argv)
def teardown_lttng(syncd, sampling_daemon, pids)
Expand All @@ -796,11 +790,10 @@ def trace_and_on_node_processing(usr_argv)
end
# we can kill the session daemon
lm_lttng_kill_sessiond
sampling_daemon&.finalize
sampling_daemon.finalize
end

SyncDaemon.open do |syncd|
sampling_daemon = nil
sbekele81 marked this conversation as resolved.
Show resolved Hide resolved
# Load Tracers and APILoaders Lib
backends, h = env_tracers

Expand All @@ -812,11 +805,8 @@ def trace_and_on_node_processing(usr_argv)
lm_setup_lttng(backends)
lm_babeltrace(backends) if OPTIONS[:archive]
end

if sampling?
sampling_daemon = SamplingDaemon.new
sampling_daemon&.start(Process.pid)
Comment on lines -817 to -818
Copy link
Collaborator Author

@TApplencourt TApplencourt Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have a start and a initialize, they are now merged

end
# Spawn sampling daemon before starting user apps
sampling_daemon = SamplingDaemon.new

syncd.local_barrier('waiting_for_lttng_setup')

Expand Down
79 changes: 41 additions & 38 deletions ze/sampling_daemon.c
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name is bad, should be ze_sampling_daemon.
I think right now --sampling without a ze backend is broken

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It make sense as we dont have support for the others yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we need to move it out of the folder and make it generic at some point (we could want to sample other platform counters). Each sampling backend should be activated by it's own environment variable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kerilk I agree that associating the option with one backend is not ideal for the long term. Making it generic and adding the support for the others is good idea.

Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
#include <dlfcn.h>
#include <errno.h>
#include <ffi.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
#include <signal.h>
#include <unistd.h>
#include <stdbool.h>

#define SIG_SAMPLING_READY (SIGRTMIN)
#define SIG_SAMPLING_FINISH (SIGRTMIN + 1)
#define RT_SIGNAL_READY (SIGRTMIN)
#define RT_SIGNAL_FINISH (SIGRTMIN + 3)

#define ZES_INIT_PTR zesInit_ptr
#define ZES_DRIVER_GET_PTR zesDriverGet_ptr
Expand Down Expand Up @@ -247,21 +246,27 @@ static uint32_t **_sampling_powerDomainCounts = NULL;
static uint32_t **_sampling_engineCounts = NULL;

////////////////////////////////////////////
#define _ZE_ERROR_MSG(NAME,RES) do {\
fprintf(stderr,"%s() failed at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\
} while (0)
#define _ZE_ERROR_MSG_NOTERMINATE(NAME,RES) do {\
fprintf(stderr,"%s() error at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\
} while (0)
#define _ERROR_MSG(MSG) do {\
perror((MSG));fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__);\
} while (0)
#define _USAGE_MSG(MSG, ARGV0) do {\
fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG));\
} while (0)
#define _DL_ERROR_MSG() do {\
fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__);\
} while(0)
#define _ZE_ERROR_MSG(NAME, RES) \
do { \
fprintf(stderr, "%s() failed at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \
} while (0)
#define _ZE_ERROR_MSG_NOTERMINATE(NAME, RES) \
do { \
fprintf(stderr, "%s() error at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \
} while (0)
#define _ERROR_MSG(MSG) \
do { \
perror((MSG)); \
fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__); \
} while (0)
#define _USAGE_MSG(MSG, ARGV0) \
do { \
fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG)); \
} while (0)
#define _DL_ERROR_MSG() \
do { \
fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__); \
} while (0)

static void intializeFrequency() {
ze_result_t res;
Expand Down Expand Up @@ -730,34 +735,31 @@ void cleanup_sampling() {
}
}

void signal_handler(int signum) {
if (signum == SIG_SAMPLING_FINISH) {
void signal_handler_finish(int signum) {
if (signum == RT_SIGNAL_FINISH) {
cleanup_sampling();
running = false;
}
}

int main(int argc, char **argv) {

int parent_pid = 0;
int verbose = 0;
void *handle = NULL;

int parent_pid = 0;
if (argc < 2) {
_USAGE_MSG("<parent_pid>", argv[0]);
return 1;
}

parent_pid = atoi(argv[1]);
if (parent_pid <= 0) {
_ERROR_MSG("Invalid or missing parent PID.");
return 1;
_ERROR_MSG("Invalid or missing parent PID.");
return 1;
}
thapi_sampling_init();// Initialize sampling

thapi_sampling_init(); // Initialize sampling

// Load necessary libraries
char *s = getenv("LTTNG_UST_ZE_LIBZE_LOADER");
void *handle = NULL;
if (s) {
handle = dlopen(s, RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND);
} else {
Expand All @@ -768,23 +770,24 @@ int main(int argc, char **argv) {
_DL_ERROR_MSG();
return 1;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will deadlock as we the host will wait for ready. I didn't try, maybe we are luck as the non-zero exit code will trigger something and everybody will be happy and bail.

}
//Find zes symbols
// Find zes symbols
int verbose = 0;
find_ze_symbols(handle, verbose);
//Initialize device and telemetry handles
// Initialize device and telemetry handles
initializeHandles();

// Run the signal loop
signal(SIG_SAMPLING_FINISH, signal_handler);
signal(RT_SIGNAL_FINISH, signal_handler_finish);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about signal (the doc said Avoid its use: use [sigaction(2)](https://man7.org/linux/man-pages/man2/sigaction.2.html) instead. See Portability below. ).
IMO we should just copy what we did for MPI, but 🤷🏽 didn't changed it yet at it work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, MPI and this should use the same code, maybe we can even de-duplicate using inlined functions in a header.


if (kill(parent_pid, SIG_SAMPLING_READY) != 0) {
if (kill(parent_pid, RT_SIGNAL_READY) != 0) {
_ERROR_MSG("Failed to send READY signal to parent");
}
// Process_sampling loop until SIG_SAMPLING_FINISH signal
while (running) {
process_sampling();
}
if (parent_pid != 0)
kill(parent_pid, SIG_SAMPLING_READY);

dlclose(handle);
if (parent_pid != 0)
kill(parent_pid, RT_SIGNAL_READY);
return 0;
}