From 5689d9e9b1bb141ea372b34794cb2525f5a008b7 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 26 Nov 2024 22:05:53 +0000 Subject: [PATCH] refactor --- xprof/xprof.rb.in | 104 +++++++++++++++++-------------------------- ze/sampling_daemon.c | 79 ++++++++++++++++---------------- 2 files changed, 83 insertions(+), 100 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index b5d805f7..c68af762 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -270,14 +270,12 @@ def thapi_trace_dir_root end # _ -# |_) _. ._ ._ o _ ._ -# |_) (_| | | | (/_ | +# | \ _. _ ._ _ _ ._ +# |_/ (_| (/_ | | | (_) | | # -class SyncDaemon +class SpawnDaemon SIGRTMIN = 34 RT_SIGNAL_READY = SIGRTMIN - RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 - RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 RT_SIGNAL_FINISH = SIGRTMIN + 3 def _lazy_exec(&block) @@ -297,6 +295,15 @@ class SyncDaemon _lazy_exec(&block) end end +end + +# _ +# |_) _. ._ ._ o _ ._ +# |_) (_| | | | (/_ | +# +class SyncDaemon < SpawnDaemon + RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 + RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 def initialize daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') @@ -358,6 +365,33 @@ class SyncDaemon end end +# __ +# (_ _. ._ _ ._ | o ._ _ +# __) (_| | | | |_) | | | | (_| +# | _| + +def sampling? + OPTIONS[:sample] && mpi_local_master? +end + +class SamplingDaemon < SpawnDaemon + def initialize + daemon_path = "#{__dir__}/sampling_daemon" + raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) + + LOGGER.debug { "spawn(sampling_daemon) #{Process.pid})" } + lazy_exec('Initialize SamplingDaemon}') do + @pid = spawn("#{daemon_path} #{Process.pid}") + end + end + + def finalize + lazy_exec('Finalize SamplingDaemon') do + `kill -#{RT_SIGNAL_FINISH} #{@pid}` + end + end +end + # # | _|_ _|_ ._ _ # |_ |_ |_ | | (_| @@ -366,11 +400,6 @@ def lttng_session_uuid Digest::MD5.hexdigest(lttng_trace_dir_tmp) end -def sampling? - return false unless OPTIONS[:sample] - mpi_local_master? -end - def env_tracers # Return the list of backends (used by local master to enable lttng events) # and the ENV used by any traced-ranks to preload THAPI tracers @@ -420,9 +449,6 @@ def env_tracers end # Sample - # Currently the same `so` does the tracing, and the sampling - # This mean that is the local rank is not part of the `traced-ranks` - # No sampling will be performed if sampling? LOGGER.debug('Sampling Enabled') h['LTTNG_UST_SAMPLING'] = 1 @@ -605,7 +631,7 @@ def lm_setup_lttng(backends) profiling: OPTIONS[:profile]) end - if OPTIONS.include?(:sample) + if sampling? channel_name = 'non-blocking-channel' exec("lttng enable-channel --userspace --session=#{lttng_session_uuid} #{channel_name}") exec("lttng add-context --userspace --session=#{lttng_session_uuid} --channel=#{channel_name} -t vpid -t vtid") @@ -732,48 +758,6 @@ def gm_rename_folder thapi_trace_dir_root end -class SamplingDaemon - SIGRTMIN = 34 - SIG_SAMPLING_READY = SIGRTMIN - SIG_SAMPLING_FINISH = SIGRTMIN + 1 - - attr_reader :pid - - def initialize - @pid = nil - end - - def start(parent_pid) - return unless sampling? - - daemon_path = "#{__dir__}/sampling_daemon" - raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) - - @pid = spawn("#{daemon_path} #{parent_pid}") - Process.detach(@pid) - - wait_for_ready_signal - end - - def finalize - return unless @pid - - Process.kill(SIG_SAMPLING_FINISH, @pid) - - wait_for_ready_signal - end - - private - - def wait_for_ready_signal - received_ready = false - Signal.trap(SIG_SAMPLING_READY) do - received_ready = true - end - sleep(0.1) until received_ready # Wait until READY signal is received - end -end - # Start, Stop lttng, amd do the on-node analsysis def trace_and_on_node_processing(usr_argv) def teardown_lttng(syncd, sampling_daemon, pids) @@ -800,7 +784,6 @@ def trace_and_on_node_processing(usr_argv) end SyncDaemon.open do |syncd| - sampling_daemon = nil # Load Tracers and APILoaders Lib backends, h = env_tracers @@ -812,11 +795,8 @@ def trace_and_on_node_processing(usr_argv) lm_setup_lttng(backends) lm_babeltrace(backends) if OPTIONS[:archive] end - - if sampling? - sampling_daemon = SamplingDaemon.new - sampling_daemon&.start(Process.pid) - end + # Spawn sampling daemon before starting user apps + sampling_daemon = SamplingDaemon.new if sampling? syncd.local_barrier('waiting_for_lttng_setup') diff --git a/ze/sampling_daemon.c b/ze/sampling_daemon.c index db87a0a2..d23e2646 100644 --- a/ze/sampling_daemon.c +++ b/ze/sampling_daemon.c @@ -4,17 +4,16 @@ #include #include #include +#include +#include #include #include #include #include #include -#include -#include -#include -#define SIG_SAMPLING_READY (SIGRTMIN) -#define SIG_SAMPLING_FINISH (SIGRTMIN + 1) +#define RT_SIGNAL_READY (SIGRTMIN) +#define RT_SIGNAL_FINISH (SIGRTMIN + 3) #define ZES_INIT_PTR zesInit_ptr #define ZES_DRIVER_GET_PTR zesDriverGet_ptr @@ -247,21 +246,27 @@ static uint32_t **_sampling_powerDomainCounts = NULL; static uint32_t **_sampling_engineCounts = NULL; //////////////////////////////////////////// -#define _ZE_ERROR_MSG(NAME,RES) do {\ - fprintf(stderr,"%s() failed at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ -} while (0) -#define _ZE_ERROR_MSG_NOTERMINATE(NAME,RES) do {\ - fprintf(stderr,"%s() error at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ -} while (0) -#define _ERROR_MSG(MSG) do {\ -perror((MSG));fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__);\ -} while (0) -#define _USAGE_MSG(MSG, ARGV0) do {\ - fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG));\ -} while (0) -#define _DL_ERROR_MSG() do {\ - fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__);\ -} while(0) +#define _ZE_ERROR_MSG(NAME, RES) \ + do { \ + fprintf(stderr, "%s() failed at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ + } while (0) +#define _ZE_ERROR_MSG_NOTERMINATE(NAME, RES) \ + do { \ + fprintf(stderr, "%s() error at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ + } while (0) +#define _ERROR_MSG(MSG) \ + do { \ + perror((MSG)); \ + fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__); \ + } while (0) +#define _USAGE_MSG(MSG, ARGV0) \ + do { \ + fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG)); \ + } while (0) +#define _DL_ERROR_MSG() \ + do { \ + fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__); \ + } while (0) static void intializeFrequency() { ze_result_t res; @@ -730,34 +735,31 @@ void cleanup_sampling() { } } -void signal_handler(int signum) { - if (signum == SIG_SAMPLING_FINISH) { +void signal_handler_finish(int signum) { + if (signum == RT_SIGNAL_FINISH) { cleanup_sampling(); running = false; } } int main(int argc, char **argv) { - - int parent_pid = 0; - int verbose = 0; - void *handle = NULL; + int parent_pid = 0; if (argc < 2) { _USAGE_MSG("", argv[0]); return 1; } - parent_pid = atoi(argv[1]); if (parent_pid <= 0) { - _ERROR_MSG("Invalid or missing parent PID."); - return 1; + _ERROR_MSG("Invalid or missing parent PID."); + return 1; } - - thapi_sampling_init();// Initialize sampling + + thapi_sampling_init(); // Initialize sampling // Load necessary libraries char *s = getenv("LTTNG_UST_ZE_LIBZE_LOADER"); + void *handle = NULL; if (s) { handle = dlopen(s, RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND); } else { @@ -768,23 +770,24 @@ int main(int argc, char **argv) { _DL_ERROR_MSG(); return 1; } - //Find zes symbols + // Find zes symbols + int verbose = 0; find_ze_symbols(handle, verbose); - //Initialize device and telemetry handles + // Initialize device and telemetry handles initializeHandles(); + // Run the signal loop - signal(SIG_SAMPLING_FINISH, signal_handler); + signal(RT_SIGNAL_FINISH, signal_handler_finish); - if (kill(parent_pid, SIG_SAMPLING_READY) != 0) { + if (kill(parent_pid, RT_SIGNAL_READY) != 0) { _ERROR_MSG("Failed to send READY signal to parent"); } // Process_sampling loop until SIG_SAMPLING_FINISH signal while (running) { process_sampling(); } - if (parent_pid != 0) - kill(parent_pid, SIG_SAMPLING_READY); - dlclose(handle); + if (parent_pid != 0) + kill(parent_pid, RT_SIGNAL_READY); return 0; }