From 8b1d84d714ce6d354c21b2b338e3648d578a97e0 Mon Sep 17 00:00:00 2001 From: sbekele Date: Wed, 13 Nov 2024 20:29:50 +0000 Subject: [PATCH] updated sampling daemon --- xprof/xprof.rb.in | 93 ++++++++++++++++++++++---------------------- ze/sampling_daemon.c | 91 ++++++++++++++++--------------------------- 2 files changed, 80 insertions(+), 104 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index bb099e3e..348b223a 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -739,46 +739,52 @@ def gm_rename_folder thapi_trace_dir_root end -SIGRTMIN = 34 -SIG_SAMPLING_READY = SIGRTMIN -SIG_SAMPLING_FINISH = SIGRTMIN + 1 +class SamplingDaemon + SIGRTMIN = 34 + SIG_SAMPLING_READY = SIGRTMIN + SIG_SAMPLING_FINISH = SIGRTMIN + 1 -def start_sampling_daemon(parent_pid) - puts "Starting sampling daemon for parent process PID #{parent_pid}" - sampling_daemon_pid = spawn("/home/sbekele/sampling_daemon/bin/sampling_daemon #{parent_pid}") - Process.detach(sampling_daemon_pid) - sampling_daemon_pid -end + attr_reader :pid -# Wait for the READY signal from the sampling daemon -def wait_for_ready_signal - received_ready = false - Signal.trap(SIG_SAMPLING_READY) do - puts "Received READY signal from sampling daemon" - received_ready = true + def initialize + @pid = nil end - sleep(0.1) while !received_ready # Wait loop until READY signal is received -end -# Send the FINISH signal to terminate the sampling daemon -def send_finish_signal(sampling_daemon_pid) - Process.kill(SIG_SAMPLING_FINISH, sampling_daemon_pid) - puts "Sent FINISH signal to sampling daemon PID #{sampling_daemon_pid}" + def start(parent_pid) + return unless sampling? + + daemon_path = "#{__dir__}/sampling_daemon" + raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) + @pid = spawn("#{daemon_path} #{parent_pid}") + Process.detach(@pid) + + wait_for_ready_signal + end + + def finalize + return unless @pid + + Process.kill(SIG_SAMPLING_FINISH, @pid) + LOGGER.debug("Sent FINISH signal to sampling daemon PID #{@pid}") + end + + private + + def wait_for_ready_signal + received_ready = false + Signal.trap(SIG_SAMPLING_READY) do + received_ready = true + end + sleep(0.1) while !received_ready # Wait until READY signal is received + end end - def trace_and_on_node_processing(usr_argv) - def teardown_lttng(syncd, pids, sampling_daemon_pid = nil) - # We need to be sure that all the local ranks are finished + def teardown_lttng(syncd, sampling_daemon, pids) syncd.local_barrier('waiting_for_application_ending') - # Everything from now on, is some local-master processing - # The `Sync_daemon` context will handle the call to the global barrier - # for the early exiting ranks return unless mpi_local_master? - #stop_sampling_daemon(sampling_daemon_pid) if sampling_daemon_pid - # Stop Lttng session and babeltrace daemons lm_lttng_teardown_session if OPTIONS[:archive] LOGGER.debug("Waiting for babeltrace_thapi and dirwatch (#{pids}) to finish") @@ -787,47 +793,40 @@ def trace_and_on_node_processing(usr_argv) XprofExitCode.update(status, "babeltrace_thapi or dirwatch #{pid}") end end - # we can kill the session daemon lm_lttng_kill_sessiond + sampling_daemon.finalize if sampling_daemon end SyncDaemon.open do |syncd| - # Load Tracers and APILoaders Lib + sampling_daemon = nil + sampling_daemon = SamplingDaemon.new if sampling? + sampling_daemon&.start(Process.pid) + backends, h = env_tracers - # All ranks need to set the LLTTNG_HOME env - # so they can have access to the daemon ENV['LTTNG_HOME'] = lttng_home_dir - # Only local master spawn LTTNG daemon and start session pids = if mpi_local_master? lm_setup_lttng(backends) lm_babeltrace(backends) if OPTIONS[:archive] - end - + end + syncd.local_barrier('waiting_for_lttng_setup') - if sampling? - sampling_daemon_pid = start_sampling_daemon(Process.pid) - puts "Started sampling daemon with PID #{sampling_daemon_pid}" - wait_for_ready_signal - end # Launch User Command begin XprofExitCode.update(launch_usr_bin(h, usr_argv), usr_argv.join(' ')) rescue Errno::ENOENT - teardown_lttng(syncd, pids) + teardown_lttng(syncd, sampling_daemon, pids) raise end - send_finish_signal(sampling_daemon_pid) if sampling_daemon_pid - teardown_lttng(syncd, pids, sampling_daemon_pid) + + teardown_lttng(syncd, sampling_daemon, pids) return unless mpi_local_master? - # Preprocess trace lm_babeltrace(backends) unless OPTIONS[:archive] lm_move_to_shared end - # Global master rename the unique trace folder to a more - # human friendly name + gm_rename_folder if mpi_master? end diff --git a/ze/sampling_daemon.c b/ze/sampling_daemon.c index 0b118266..2bdd973d 100644 --- a/ze/sampling_daemon.c +++ b/ze/sampling_daemon.c @@ -1,37 +1,20 @@ #include "sampling_daemon.h" #include "../sampling/thapi_sampling.h" -#include "uthash.h" -#include "utlist.h" -#include "ze.h.include" #include "ze_build.h" -#include "ze_profiling.h" -#include "ze_properties.h" #include "ze_sampling.h" -#include "ze_structs_tracepoints.h" -#include "ze_tracepoints.h" -#include "zel_structs_tracepoints.h" -#include "zel_tracepoints.h" -#include "zes_structs_tracepoints.h" -#include "zes_tracepoints.h" -#include "zet_structs_tracepoints.h" -#include "zet_tracepoints.h" -#include "zex_structs_tracepoints.h" -#include "zex_tracepoints.h" #include #include #include -#include -#include -#include -#include -#include #include #include #include #include #include +#include +#include +#include -#define SIG_SAMPLING_READY SIGRTMIN +#define SIG_SAMPLING_READY (SIGRTMIN) #define SIG_SAMPLING_FINISH (SIGRTMIN + 1) #define ZES_INIT_PTR zesInit_ptr @@ -261,7 +244,7 @@ static void find_ze_symbols(void *handle, int verbose) { if (!ZES_MEMORY_GET_BANDWIDTH_PTR && verbose) fprintf(stderr, "Missing symbol zesMemoryGetBandwidth!\n"); } - +volatile bool running = true; thapi_sampling_handle_t _sampling_handle = NULL; static int _sampling_freq_initialized = 0; static int _sampling_fabricPorts_initialized = 0; @@ -285,22 +268,21 @@ static uint32_t **_sampling_powerDomainCounts = NULL; static uint32_t **_sampling_engineCounts = NULL; //////////////////////////////////////////// -#define _ZE_ERROR_MSG(NAME, RES) \ - do { \ - fprintf(stderr, "%s() failed at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ - } while (0) -#define _ZE_ERROR_MSG_NOTERMINATE(NAME, RES) \ - do { \ - fprintf(stderr, "%s() error at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \ - } while (0) -#define _ERROR_MSG(MSG) \ - { \ - perror((MSG)) do { \ - { \ - perror((MSG)); \ - fprintf(stderr, "errno=%d at %d(%s)", errno, __LINE__, __FILE__); \ - } \ - while (0) +#define _ZE_ERROR_MSG(NAME,RES) do {\ + fprintf(stderr,"%s() failed at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ +} while (0) +#define _ZE_ERROR_MSG_NOTERMINATE(NAME,RES) do {\ + fprintf(stderr,"%s() error at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\ +} while (0) +#define _ERROR_MSG(MSG) do {\ +perror((MSG));fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__);\ +} while (0) +#define _USAGE_MSG(MSG, ARGV0) do {\ + fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG));\ +} while (0) +#define _DL_ERROR_MSG() do {\ + fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__);\ +} while(0) static void intializeFrequency() { ze_result_t res; @@ -753,7 +735,6 @@ static void thapi_sampling_energy() { } } } -volatile bool running = true; void process_sampling() { struct timespec interval; @@ -771,8 +752,6 @@ void cleanup_sampling() { void signal_handler(int signum) { if (signum == SIG_SAMPLING_FINISH) { - printf("Received FINISH signal, stopping daemon...\n"); - // running = false; cleanup_sampling(); running = false; } @@ -780,15 +759,17 @@ void signal_handler(int signum) { int main(int argc, char **argv) { - fprintf(stderr, "Entering Main.\n"); if (argc < 2) { - fprintf(stderr, "Usage: %s \n", argv[0]); + _USAGE_MSG("", argv[0]); return 1; } int parent_pid = atoi(argv[1]); + if (parent_pid <= 0) { + _ERROR_MSG("Invalid or missing parent PID. A positive integer is required."); + return 1; + } int verbose = 0; - fprintf(stderr, "Thapi sampling init.\n"); - thapi_sampling_init(); + thapi_sampling_init();// Initialize sampling // Load necessary libraries void *handle = NULL; @@ -800,27 +781,23 @@ int main(int argc, char **argv) { } if (!handle) { - fprintf(stderr, "Failure: could not load ze library!\n"); + _DL_ERROR_MSG(); return 1; } - - // Initialize daemon + //Find zes symbols find_ze_symbols(handle, verbose); - fprintf(stderr, "Initialize the system.\n"); + //Initialize device and telemetry handles initializeHandles(); - fprintf(stderr, "Daemon initialized and entering signal loop.\n"); // Run the signal loop signal(SIG_SAMPLING_FINISH, signal_handler); - if (parent_pid > 0) { - kill(parent_pid, SIG_SAMPLING_READY); - fprintf(stderr, "Daemon sent READY signal to parent PID %d\n", parent_pid); + + if (kill(parent_pid, SIG_SAMPLING_READY) != 0) { + _ERROR_MSG("Failed to send READY signal to parent"); } - fprintf(stderr, "Daemon waiting for signals in signal_loop.\n"); - // Clearunningnup before exiting + // Process_sampling loop until SIG_SAMPLING_FINISH signal while (running) { - process_sampling(); // Wait for a signal to be received + process_sampling(); } dlclose(handle); - printf("Daemon exiting \n"); return 0; }