From 5689d9e9b1bb141ea372b34794cb2525f5a008b7 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 26 Nov 2024 22:05:53 +0000 Subject: [PATCH 1/2] refactor --- xprof/xprof.rb.in | 104 +++++++++++++++++-------------------------- ze/sampling_daemon.c | 79 ++++++++++++++++---------------- 2 files changed, 83 insertions(+), 100 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index b5d805f7..c68af762 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -270,14 +270,12 @@ def thapi_trace_dir_root end # _ -# |_) _. ._ ._ o _ ._ -# |_) (_| | | | (/_ | +# | \ _. _ ._ _ _ ._ +# |_/ (_| (/_ | | | (_) | | # -class SyncDaemon +class SpawnDaemon SIGRTMIN = 34 RT_SIGNAL_READY = SIGRTMIN - RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 - RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 RT_SIGNAL_FINISH = SIGRTMIN + 3 def _lazy_exec(&block) @@ -297,6 +295,15 @@ class SyncDaemon _lazy_exec(&block) end end +end + +# _ +# |_) _. ._ ._ o _ ._ +# |_) (_| | | | (/_ | +# +class SyncDaemon < SpawnDaemon + RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 + RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 def initialize daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') @@ -358,6 +365,33 @@ class SyncDaemon end end +# __ +# (_ _. ._ _ ._ | o ._ _ +# __) (_| | | | |_) | | | | (_| +# | _| + +def sampling? + OPTIONS[:sample] && mpi_local_master? +end + +class SamplingDaemon < SpawnDaemon + def initialize + daemon_path = "#{__dir__}/sampling_daemon" + raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) + + LOGGER.debug { "spawn(sampling_daemon) #{Process.pid})" } + lazy_exec('Initialize SamplingDaemon}') do + @pid = spawn("#{daemon_path} #{Process.pid}") + end + end + + def finalize + lazy_exec('Finalize SamplingDaemon') do + `kill -#{RT_SIGNAL_FINISH} #{@pid}` + end + end +end + # # | _|_ _|_ ._ _ # |_ |_ |_ | | (_| @@ -366,11 +400,6 @@ def lttng_session_uuid Digest::MD5.hexdigest(lttng_trace_dir_tmp) end -def sampling? - return false unless OPTIONS[:sample] - mpi_local_master? -end - def env_tracers # Return the list of backends (used by local master to enable lttng events) # and the ENV used by any traced-ranks to preload THAPI tracers @@ -420,9 +449,6 @@ def env_tracers end # Sample - # Currently the same `so` does the tracing, and the sampling - # This mean that is the local rank is not part of the `traced-ranks` - # No sampling will be performed if sampling? LOGGER.debug('Sampling Enabled') h['LTTNG_UST_SAMPLING'] = 1 @@ -605,7 +631,7 @@ def lm_setup_lttng(backends) profiling: OPTIONS[:profile]) end - if OPTIONS.include?(:sample) + if sampling? channel_name = 'non-blocking-channel' exec("lttng enable-channel --userspace --session=#{lttng_session_uuid} #{channel_name}") exec("lttng add-context --userspace --session=#{lttng_session_uuid} --channel=#{channel_name} -t vpid -t vtid") @@ -732,48 +758,6 @@ def gm_rename_folder thapi_trace_dir_root end -class SamplingDaemon - SIGRTMIN = 34 - SIG_SAMPLING_READY = SIGRTMIN - SIG_SAMPLING_FINISH = SIGRTMIN + 1 - - attr_reader :pid - - def initialize - @pid = nil - end - - def start(parent_pid) - return unless sampling? - - daemon_path = "#{__dir__}/sampling_daemon" - raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) - - @pid = spawn("#{daemon_path} #{parent_pid}") - Process.detach(@pid) - - wait_for_ready_signal - end - - def finalize - return unless @pid - - Process.kill(SIG_SAMPLING_FINISH, @pid) - - wait_for_ready_signal - end - - private - - def wait_for_ready_signal - received_ready = false - Signal.trap(SIG_SAMPLING_READY) do - received_ready = true - end - sleep(0.1) until received_ready # Wait until READY signal is received - end -end - # Start, Stop lttng, amd do the on-node analsysis def trace_and_on_node_processing(usr_argv) def teardown_lttng(syncd, sampling_daemon, pids) @@ -800,7 +784,6 @@ def trace_and_on_node_processing(usr_argv) end SyncDaemon.open do |syncd| - sampling_daemon = nil # Load Tracers and APILoaders Lib backends, h = env_tracers @@ -812,11 +795,8 @@ def trace_and_on_node_processing(usr_argv) lm_setup_lttng(backends) lm_babeltrace(backends) if OPTIONS[:archive] end - - if sampling? - sampling_daemon = SamplingDaemon.new - sampling_daemon&.start(Process.pid) - end + # Spawn sampling daemon before starting user apps + sampling_daemon = SamplingDaemon.new if sampling? syncd.local_barrier('waiting_for_lttng_setup') diff --git a/ze/sampling_daemon.c b/ze/sampling_daemon.c index db87a0a2..d23e2646 100644 --- a/ze/sampling_daemon.c +++ b/ze/sampling_daemon.c @@ -4,17 +4,16 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include -#include -#define SIG_SAMPLING_READY (SIGRTMIN) -#define SIG_SAMPLING_FINISH (SIGRTMIN + 1) +#define RT_SIGNAL_READY (SIGRTMIN) +#define RT_SIGNAL_FINISH (SIGRTMIN + 3) #define ZES_INIT_PTR zesInit_ptr #define ZES_DRIVER_GET_PTR zesDriverGet_ptr @@ -247,21 +246,27 @@ static uint32_t **_sampling_powerDomainCounts = NULL; static uint32_t **_sampling_engineCounts = NULL; //////////////////////////////////////////// -#define _ZE_ERROR_MSG(NAME,RES) do {\ - fprintf(stderr,"%s() failed at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ -} while (0) -#define _ZE_ERROR_MSG_NOTERMINATE(NAME,RES) do {\ - fprintf(stderr,"%s() error at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ -} while (0) -#define _ERROR_MSG(MSG) do {\ -perror((MSG));fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__);\ -} while (0) -#define _USAGE_MSG(MSG, ARGV0) do {\ - fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG));\ -} while (0) -#define _DL_ERROR_MSG() do {\ - fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__);\ -} while(0) +#define _ZE_ERROR_MSG(NAME, RES) \ + do { \ + fprintf(stderr, "%s() failed at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ + } while (0) +#define _ZE_ERROR_MSG_NOTERMINATE(NAME, RES) \ + do { \ + fprintf(stderr, "%s() error at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ + } while (0) +#define _ERROR_MSG(MSG) \ + do { \ + perror((MSG)); \ + fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__); \ + } while (0) +#define _USAGE_MSG(MSG, ARGV0) \ + do { \ + fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG)); \ + } while (0) +#define _DL_ERROR_MSG() \ + do { \ + fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__); \ + } while (0) static void intializeFrequency() { ze_result_t res; @@ -730,34 +735,31 @@ void cleanup_sampling() { } } -void signal_handler(int signum) { - if (signum == SIG_SAMPLING_FINISH) { +void signal_handler_finish(int signum) { + if (signum == RT_SIGNAL_FINISH) { cleanup_sampling(); running = false; } } int main(int argc, char **argv) { - - int parent_pid = 0; - int verbose = 0; - void *handle = NULL; + int parent_pid = 0; if (argc < 2) { _USAGE_MSG("", argv[0]); return 1; } - parent_pid = atoi(argv[1]); if (parent_pid <= 0) { - _ERROR_MSG("Invalid or missing parent PID."); - return 1; + _ERROR_MSG("Invalid or missing parent PID."); + return 1; } - - thapi_sampling_init();// Initialize sampling + + thapi_sampling_init(); // Initialize sampling // Load necessary libraries char *s = getenv("LTTNG_UST_ZE_LIBZE_LOADER"); + void *handle = NULL; if (s) { handle = dlopen(s, RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND); } else { @@ -768,23 +770,24 @@ int main(int argc, char **argv) { _DL_ERROR_MSG(); return 1; } - //Find zes symbols + // Find zes symbols + int verbose = 0; find_ze_symbols(handle, verbose); - //Initialize device and telemetry handles + // Initialize device and telemetry handles initializeHandles(); + // Run the signal loop - signal(SIG_SAMPLING_FINISH, signal_handler); + signal(RT_SIGNAL_FINISH, signal_handler_finish); - if (kill(parent_pid, SIG_SAMPLING_READY) != 0) { + if (kill(parent_pid, RT_SIGNAL_READY) != 0) { _ERROR_MSG("Failed to send READY signal to parent"); } // Process_sampling loop until SIG_SAMPLING_FINISH signal while (running) { process_sampling(); } - if (parent_pid != 0) - kill(parent_pid, SIG_SAMPLING_READY); - dlclose(handle); + if (parent_pid != 0) + kill(parent_pid, RT_SIGNAL_READY); return 0; } From afcfa74a70f46155a09e2e9819df54b28575a6c4 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 27 Nov 2024 21:05:52 +0000 Subject: [PATCH 2/2] fix sampling without mpi --- xprof/xprof.rb.in | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index c68af762..deda0fc9 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -278,9 +278,7 @@ class SpawnDaemon RT_SIGNAL_READY = SIGRTMIN RT_SIGNAL_FINISH = SIGRTMIN + 3 - def _lazy_exec(&block) - return unless in_mpi_env? - + def _lazy_exec(_cond, &block) Signal.trap(RT_SIGNAL_READY) do return end @@ -288,11 +286,15 @@ class SpawnDaemon sleep end - def lazy_exec(message = nil, &block) + def lazy_exec(message = nil, cond, &block) LOGGER.info_block(message) do - # Don't inline, it Trap doesn't work with logger + unless cond + LOGGER.debug("#{message}: No-op") + return + end + # Don't inline, Trap doesn't work with logger # https://bugs.ruby-lang.org/issues/7917 - _lazy_exec(&block) + _lazy_exec(cond, &block) end end end @@ -305,6 +307,10 @@ class SyncDaemon < SpawnDaemon RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 + def lazy_exec_mpi(message = nil, &block) + lazy_exec(message, in_mpi_env?, &block) + end + def initialize daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') daemon = case daemon_type @@ -327,25 +333,25 @@ class SyncDaemon < SpawnDaemon end LOGGER.debug { "spawn(#{daemon} #{Process.pid})" } - lazy_exec("Initialize SyncDaemon #{daemon_type}") do + lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do @pid = spawn("#{daemon} #{Process.pid}") end end def finalize - lazy_exec('Finalize SyncDaemon') do + lazy_exec_mpi('Finalize SyncDaemon') do `kill -#{RT_SIGNAL_FINISH} #{@pid}` end end def local_barrier(name) - lazy_exec("Local_barrier #{name}") do + lazy_exec_mpi("Local_barrier #{name}") do `kill -#{RT_SIGNAL_LOCAL_BARRIER} #{@pid}` end end def global_barrier - lazy_exec('Global_barrier') do + lazy_exec_mpi('Global_barrier') do `kill -#{RT_SIGNAL_GLOBAL_BARRIER} #{@pid}` end end @@ -375,18 +381,22 @@ def sampling? end class SamplingDaemon < SpawnDaemon + def lazy_exec_sampling(message = nil, &block) + lazy_exec(message, sampling?, &block) + end + def initialize daemon_path = "#{__dir__}/sampling_daemon" raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) LOGGER.debug { "spawn(sampling_daemon) #{Process.pid})" } - lazy_exec('Initialize SamplingDaemon}') do + lazy_exec_sampling('Initialize SamplingDaemon') do @pid = spawn("#{daemon_path} #{Process.pid}") end end def finalize - lazy_exec('Finalize SamplingDaemon') do + lazy_exec_sampling('Finalize SamplingDaemon') do `kill -#{RT_SIGNAL_FINISH} #{@pid}` end end @@ -780,7 +790,7 @@ def trace_and_on_node_processing(usr_argv) end # we can kill the session daemon lm_lttng_kill_sessiond - sampling_daemon&.finalize + sampling_daemon.finalize end SyncDaemon.open do |syncd| @@ -796,7 +806,7 @@ def trace_and_on_node_processing(usr_argv) lm_babeltrace(backends) if OPTIONS[:archive] end # Spawn sampling daemon before starting user apps - sampling_daemon = SamplingDaemon.new if sampling? + sampling_daemon = SamplingDaemon.new syncd.local_barrier('waiting_for_lttng_setup')