From d52bc6424bf74a7c0abd585143de25cbc7d1a81c Mon Sep 17 00:00:00 2001 From: Adriaan Schmidt Date: Wed, 17 Apr 2024 14:14:09 +0200 Subject: [PATCH 1/5] plugin_scheduler: add switch to disable processing of kthreads Signed-off-by: Adriaan Schmidt --- tuned/plugins/plugin_scheduler.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/tuned/plugins/plugin_scheduler.py b/tuned/plugins/plugin_scheduler.py index 7d9bcbe0..f69fe8e9 100644 --- a/tuned/plugins/plugin_scheduler.py +++ b/tuned/plugins/plugin_scheduler.py @@ -446,6 +446,7 @@ def __init__(self, monitor_repository, storage_factory, hardware_inventory, devi # default is to whitelist all and blacklist none self._ps_whitelist = ".*" self._ps_blacklist = "" + self._kthread_process = True self._cgroup_ps_blacklist_re = "" self._cpus = perf.cpu_map() self._scheduler_storage_key = self._storage_key( @@ -550,6 +551,7 @@ def _get_config_options(cls): "cgroup_ps_blacklist": None, "ps_whitelist": None, "ps_blacklist": None, + "kthread_process": True, "irq_process": True, "default_irq_smp_affinity": "calc", "perf_mmap_pages": None, @@ -586,6 +588,8 @@ def get_processes(self): processes = {} for proc in ps.values(): try: + if not self._kthread_process and self._is_kthread(proc): + continue cmd = self._get_cmdline(proc) pid = proc["pid"] processes[pid] = cmd @@ -1053,6 +1057,9 @@ def _instance_verify_static(self, instance, ignore_missing, devices): def _add_pid(self, instance, pid, r): try: + proc = procfs.process(pid) + if not self._kthread_process and self._is_kthread(proc): + return cmd = self._get_cmdline(pid) except (OSError, IOError) as e: if e.errno == errno.ENOENT \ @@ -1128,6 +1135,14 @@ def _ps_blacklist(self, enabling, value, verify, ignore_missing): if enabling and value is not None: self._ps_blacklist = "|".join(["(%s)" % v for v in re.split(r"(? Date: Wed, 22 May 2024 09:00:00 +0200 Subject: [PATCH 2/5] feat: add plugin_kthread Signed-off-by: Adriaan Schmidt --- tuned/plugins/plugin_kthread.py | 562 ++++++++++++++++++++++++++++++++ 1 file changed, 562 insertions(+) create mode 100644 tuned/plugins/plugin_kthread.py diff --git a/tuned/plugins/plugin_kthread.py b/tuned/plugins/plugin_kthread.py new file mode 100644 index 00000000..78bdc639 --- /dev/null +++ b/tuned/plugins/plugin_kthread.py @@ -0,0 +1,562 @@ +from . import hotplug +from .decorators import * +import tuned.consts as consts +import tuned.logs + +# The scheduler interface in os was introduced in Python 3.3 +# To also support earlier versions, we import some logic from plugin_scheduler +from .plugin_scheduler import SchedulerUtils, SchedulerUtilsSchedutils + +import perf +import procfs + +import errno +import os +import re +import select +import threading + +log = tuned.logs.get() + +# threads can disappear at any time. in that case, we raise a custom exception +class ThreadNoLongerExists(Exception): + pass + +# another custom exception to signal non-changeability of affinities +class AffinityNotChangeable(Exception): + pass + +# the plugin manages each kthread as a "device", and keeps a KthreadInfo object +class KthreadInfo(object): + def __init__(self, proc): + self.pid = proc.pid + self.comm = procfs.process_cmdline(proc) + self.device = "%d:%s" % (self.pid, self.comm) + # it seems we can check for fixed affinities too early, i.e. procfs + # indicates "changeable", but we fail later when we try to actually change it + # so: delay the check until later + self.affinity_changeable = None + self.sched_orig = None + +# scheduling options +class SchedOpts(object): + def __init__(self, policy=None, priority=None, affinity=None): + self.policy = policy + self.priority = priority + self.affinity = affinity + +# group.* definitions from the instance options +class GroupCmd(object): + def __init__(self, name, prio, sched, regex): + self.name = name + self.prio = prio + self.sched = sched + self.regex = regex + +class KthreadPlugin(hotplug.Plugin): + r""" + `kthread`:: + + Allows tuning of kernel threads by setting their CPU affinities and + scheduling parameters. The plugin re-implements functionality already + present in the `scheduler` plugin. However, this plugin offers more + flexibility, as it allows tuning of individual kernel threads, which + are handled as `devices`. Multiple plugin instances can be defined, + each addressing different groups of kernel threads. + When using the `kthread` plugin, make sure to disable processing of kernel + threads in the `scheduler` plugin by setting its option + [option]`kthread_process=false`. + === + Tuning options are controlled by [option]`group` definitions. + + + A group definition has the form + `group. = :::` + + + with four required fields: + + + -- + `rule_prio`:: + priority of the group within this plugin instance (lower number indicates + higher priority) + `schedopts`:: + desired scheduling policy and priority, or either "*" or an empty string + to leave the scheduling options unchanged. + The first character defines the policy + + - f: SCHED_FIFO + - b: SCHED_BATCH + - r: SCHED_RR + - o: SCHED_OTHER + - i: SCHED_IDLE + + The remainder is the desired priority in the range 0..99. + For SCHED_OTHER, only a priority of 0 is allowed. + Examples: `f50` to set SCHED_FIFO with priority 50, `o0` for SCHED_OTHER + `affinity`:: + desired affinity (as cpulist string), or either "*" or an empty string + to leave the affinity unchanged + `regex`:: + regular expression to match kernel threads. Note that the thread name needs + to match the full regex, i.e. matching happens with re.fullmatch(). + -- + The [option]`group` options of the `kthread` plugin differ from those of + the `scheduler` plugin: + + - scheduling policy and priority are combined into one option + - affinities are specified as cpulist strings instead of masks + - regular expressions need to fully match the thread names + - no square brackets are added to the kernel thread names + + Example: + The `scheduler` definition + + group.ksoftirqd=0:f:2:*:^\[ksoftirqd + + is translated to the `kthread` definition + + group.ksoftirqd=0:f2:*:ksoftirqd.* + """ + def __init__(self, monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables): + super(KthreadPlugin, self).__init__(monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables) + self._has_dynamic_options = True + self._kthreads = {} + self._lock = threading.RLock() + self._instance_count = 0 + + try: + self._scheduler_utils = SchedulerUtils() + except AttributeError: + self._scheduler_utils = SchedulerUtilsSchedutils() + + self._perf_setup() + + def cleanup(self): + super(KthreadPlugin, self).cleanup() + self._perf_shutdown() + + # + # plugin-level methods: devices and plugin options + # + def _init_devices(self): + self._devices_supported = True + self._free_devices = set() + self._assigned_devices = set() + self._kthread_scan(initial=True) + + @classmethod + def _get_config_options(cls): + return { + # nothing here, the group.* options are covered by self._has_dynamic_options + } + + # + # helper functions + # + def _convert_device_to_pid(self, device): + """Extract the PID (as int) from the device name string""" + pid, _ = device.split(":", 1) + return int(pid) + + # + # instance-level methods: implement the Instance interface + # + def _instance_init(self, instance): + instance._has_static_tuning = True + instance._has_dynamic_tuning = False + # set our instance name, so the _instance_kthread command can find us + instance.options["_instance"] = instance.name + # process group.* options if not already done via _get_matching_devices + if not hasattr(instance, "_groups"): + self._instance_prepare_device_matching(instance) + # warn in case we have device or udev expressions... those don't work in this plugin + if instance._devices_expression not in [None, "*"]: + log.warning("Ignoring devices expression '%s' of instance '%s'" % (instance._devices_expression, instance.name)) + if instance._devices_udev_regex is not None: + log.warning("Ignoring devices udev regex '%s' of instance '%s'" % (instance._devices_udev_regex, instance.name)) + + def _instance_cleanup(self, instance): + pass + + def _instance_prepare_device_matching(self, instance): + """Process all group.* options and populate instance._groups""" + groups = [] + for k, v in instance.options.items(): + # group definitions have the format: + # group. = ::: + if not k.startswith("group."): + continue + name = k[len("group."):] + opt = self._variables.expand(v).split(":", 3) + if not len(opt) == 4: + log.error("Invalid definition for '%s': need exactly 4 arguments" % k) + continue + opt_rule_prio, opt_schedopts, opt_affinity, opt_regex = opt + # parse rule prio + try: + rule_prio = int(opt_rule_prio) + except ValueError: + log.error("Could not parse rule prio for '%s': '%s' is not a number" % (k, opt_rule_prio)) + continue + # parse scheduling options + policy, priority, affinity = None, None, None + if opt_schedopts in ["", "*"]: + pass + elif len(opt_schedopts) > 1 and opt_schedopts[0] in self._scheduler_utils._dict_schedcfg2num.keys(): + policy = self._scheduler_utils.sched_cfg_to_num(opt_schedopts[0]) + try: + priority = int(opt_schedopts[1:]) + except ValueError: + log.error("Could not parse scheduling priority for '%s': '%s' is not a number" % (k, opt_schedopts[1:])) + continue + if policy == os.SCHED_OTHER and priority != 0: + log.error("Could not parse scheduling priority for '%s': SCHED_OTHER requires priority 0" % k) + continue + if priority < 0 or priority > 99: + log.error("Could not parse scheduling priority for '%s': value '%d' out of range" % (k, priority)) + continue + else: + log.error("Could not parse scheduling priority for '%s': '%s' has wrong format" % (k, opt_schedopts)) + continue + if not opt_affinity in ["", "*"]: + affinity = set(self._cmd.cpulist_unpack(opt_affinity)) + if len(affinity) == 0: + log.error("Could not parse affinity for '%s': '%s' has wrong format" % (k, opt_affinity)) + continue + sched = SchedOpts(policy=policy, priority=priority, affinity=affinity) + # parse the regex + try: + regex = re.compile(opt_regex) + except re.error as e: + log.error("Could not compile regex for '%s': '%s'" % (k, e.msg)) + continue + groups.append(GroupCmd(name, rule_prio, sched, regex)) + instance._groups = sorted(groups, key=lambda x: x.prio) + + def _get_instance_sched_options(self, instance, kthread): + """ + determine options an instance would set for a kthread, None if the + instance would not set any (because none of the group.* regexes matches) + """ + for group in instance._groups: + if group.regex.fullmatch(kthread.comm): + return group.sched + return None + + def _get_matching_devices(self, instance, devices): + """ + overrides method in base.Plugin + instead of matching with devices/udev regexes, we use the group.* + definitions to determine threads that fit the instance + """ + # this can be called before the instance is initialized via + # _instance_init(), so we need to make sure that we process our + # group.* options + if not hasattr(instance, "_groups"): + self._instance_prepare_device_matching(instance) + matching_devices = set() + for device in devices: + pid = self._convert_device_to_pid(device) + try: + kthread = self._kthread_get(pid) + except ThreadNoLongerExists: + self._kthread_remove(pid) + continue + if self._get_instance_sched_options(instance, kthread) is not None: + matching_devices.add(device) + return matching_devices + + def _instance_apply_static(self, instance): + if self._instance_count == 0: + # scan for kthreads that have appeared since plugin initialization + self._kthread_scan(initial=False) + self._perf_monitor_start() + self._instance_count += 1 + super(KthreadPlugin, self)._instance_apply_static(instance) + + def _instance_unapply_static(self, instance, rollback): + super(KthreadPlugin, self)._instance_unapply_static(instance, rollback) + self._instance_count -= 1 + if self._instance_count == 0: + self._perf_monitor_shutdown() + + # + # internal bookkeeping (self._kthreads) + # as these methods are called from the main thred and the perf monitor + # thread, we need to lock all accesses to self._kthreads and the + # hotplug.Plugin methods _add_device()/_remove_device() + # + def _kthread_scan(self, initial=False): + """Scan procfs for kernel threads and add them to our bookkeeping + + Args: + initial (bool): is this the initial scan? passed on to _kthread_add() + """ + ps = procfs.pidstats() + for pid in ps.keys(): + self._kthread_add(pid, initial) + + def _kthread_add(self, pid, initial=False): + """Add kernel thread to internal bookkeeping + + Args: + pid (int): kernel thread pid + initial (bool): is this the initial scan? if yes, then add the new + kthread to _free_devices, else initiate hotplug mechanism via + _add_device() + """ + try: + proc = procfs.process(pid) + if not self._is_kthread(proc): + return + kthread = KthreadInfo(proc) + except (FileNotFoundError, ProcessLookupError): + return + + with self._lock: + if kthread.pid in self._kthreads: + return + self._kthreads[kthread.pid] = kthread + if initial: + self._free_devices.add(kthread.device) + else: + self._add_device(kthread.device) + log.debug("Added kthread %s" % kthread.device) + + def _kthread_remove(self, pid): + """Remove kernel thread from internal bookkeeping + + Args: + pid (int): kernel thread pid + """ + try: + with self._lock: + device = self._kthreads[pid].device + del self._kthreads[pid] + self._remove_device(device) + except KeyError: + return + log.debug("Removed kthread %d" % pid) + + def _kthread_get(self, pid): + """Get KthreadInfo object for a given PID + + Args: + pid (int): kernel thread pid + """ + try: + with self._lock: + return self._kthreads[pid] + except KeyError: + raise ThreadNoLongerExists() + + def _is_kthread(self, proc): + """helper to determine if a procfs process is a kernel thread""" + return proc["stat"]["flags"] & procfs.pidstat.PF_KTHREAD != 0 + + # + # methods to interact with perf + # + def _perf_setup(self): + self._cpus = perf.cpu_map() + self._threads = perf.thread_map() + self._evlist = perf.evlist(self._cpus, self._threads) + evsel = perf.evsel( + type=perf.TYPE_SOFTWARE, + config=perf.COUNT_SW_DUMMY, + task=1, + comm=1, + mmap=0, + freq=0, + wakeup_events=1, + watermark=1, + sample_type=perf.SAMPLE_TID|perf.SAMPLE_CPU, + ) + evsel.open(cpus=self._cpus, threads=self._threads) + self._evlist.add(evsel) + self._evlist.mmap() + + def _perf_shutdown(self): + if self._evlist: + for fd in self._evlist.get_pollfd(): + os.close(fd.name) + + def _perf_monitor_start(self): + self._terminate = threading.Event() + self._thread = threading.Thread(target=self._perf_monitor_thread) + self._thread.start() + + def _perf_monitor_shutdown(self): + self._terminate.set() + self._thread.join() + + def _perf_monitor_thread(self): + log.debug("perf monitor thread starting") + poll = select.poll() + fds = self._evlist.get_pollfd() + for fd in fds: + poll.register(fd) + while not self._terminate.is_set(): + if len(poll.poll(1000)) == 0: + continue + have_events = True + while have_events: + have_events = False + for cpu in self._cpus: + event = self._evlist.read_on_cpu(cpu) + if event and hasattr(event, "type"): + have_events = True + if event.type == perf.RECORD_COMM: + self._kthread_add(event.tid) + elif event.type == perf.RECORD_EXIT: + self._kthread_remove(event.tid) + log.debug("perf monitor thread shutting down") + + # + # methods for low-level manipulation of scheduling options + # via SchedulerUtils from .plugin_scheduler + # + def _set_affinity(self, pid, affinity): + try: + self._scheduler_utils.set_affinity(pid, affinity) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to set affinity of PID %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + try: + proc = procfs.process(pid) + changeable = not proc["stat"].is_bound_to_cpu() + except (OSError, IOError): + raise ThreadNoLongerExists() + if not changeable: + raise AffinityNotChangeable() + log.error("Failed to set affinity of PID %d to '%s': %s" % (pid, affinity, e)) + raise e + + def _get_affinity(self, pid): + try: + return self._scheduler_utils.get_affinity(pid) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to get affinity of PID %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to get affinity of PID %d: %s" % (pid, e)) + raise e + + def _set_schedopts(self, pid, policy, priority): + try: + self._scheduler_utils.set_scheduler(pid, policy, priority) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to set scheduling of kthread %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to set scheduling of kthread %d: %s" % (pid, e)) + raise e + + def _get_schedopts(self, pid): + try: + return self._scheduler_utils.get_scheduler(pid), self._scheduler_utils.get_priority(pid) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to get scheduling of kthread %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to get scheduling of kthread %d: %s" % (pid, e)) + raise e + + def _format_schedopts(self, policy, priority): + return "%s:%d" % (self._scheduler_utils.sched_num_to_const(policy), priority) + + # + # "high-level" methods that work on KthreadInfo objects: + # apply tuning while saving original settings + # + def _apply_kthread_tuning(self, kthread, opts): + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + if kthread.sched_orig is None: + orig_opts = SchedOpts(policy=current_policy, priority=current_priority, affinity=current_affinity) + kthread.sched_orig = orig_opts + + if opts.affinity is not None and opts.affinity != current_affinity: + try: + self._set_affinity(kthread.pid, opts.affinity) + kthread.affinity_changeable = True + log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread.device, opts.affinity)) + except AffinityNotChangeable: + kthread.affinity_changeable = False + log.debug("The CPU affinity of kthread %s is not changeable"% kthread.device) + if opts.policy is not None or opts.priority is not None: + if opts.policy != current_policy or opts.priority != current_priority: + self._set_schedopts(kthread.pid, opts.policy, opts.priority) + log.debug("Set scheduling of kthread %s to '%s'" + % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + + def _restore_kthread_tuning(self, kthread): + opts = kthread.sched_orig + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + if kthread.affinity_changeable and opts.affinity != current_affinity: + try: + self._set_affinity(kthread.pid, opts.affinity) + log.debug("Restored CPU affinity of kthread %s to '%s'" + % (kthread.device, opts.affinity)) + except AffinityNotChangeable: + log.debug("Failed to restore CPU affinity of kthread %s to '%s'" + % (kthread.device, opts.affinity)) + if opts.policy != current_policy or opts.priority != current_priority: + self._set_schedopts(kthread.pid, opts.policy, opts.priority) + log.debug("Restored scheduling of kthread %s to '%s'" + % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + + def _verify_kthread_tuning(self, kthread, opts): + affinity_ok, priority_ok = True, True + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + if opts.affinity is not None and kthread.affinity_changeable: + desc = "CPU affinity of kthread %s" % kthread.device + current = self._cmd.cpulist2string(self._cmd.cpulist_pack(current_affinity)) + if opts.affinity == current_affinity: + log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) + else: + desired = self._cmd.cpulist2string(self._cmd.cpulist_pack(opts.affinity)) + log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) + affinity_ok = False + if opts.policy is not None or opts.priority is not None: + desc = "scheduling of kthread %s" % kthread.device + current = self._format_schedopts(current_policy, current_priority) + if opts.policy == current_policy and opts.priority == current_priority: + log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) + else: + desired = self._format_schedopts(opts.policy, opts.priority) + log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) + priority_ok = False + return affinity_ok and priority_ok + + # + # command definitions: entry point for device tuning + # + @command_custom("_instance", per_device=True) + def _instance_kthread(self, start, value, device, verify, ignore_missing): + """ + This is the actual entry point for tuning. + value (of the option "_instance") is the name of the instance, set in _instance_init + """ + pid = self._convert_device_to_pid(device) + try: + kthread = self._kthread_get(pid) + if verify: + instance = self._instances[value] + opts = self._get_instance_sched_options(instance, kthread) + return self._verify_kthread_tuning(kthread, opts) + if start: + instance = self._instances[value] + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + else: + self._restore_kthread_tuning(kthread) + except ThreadNoLongerExists: + self._kthread_remove(pid) + return None From 41cac0fd7b5d46ed3ed120d125bbf757d1d7a429 Mon Sep 17 00:00:00 2001 From: Adriaan Schmidt Date: Thu, 25 Jul 2024 07:44:02 +0200 Subject: [PATCH 3/5] fix: workaround for #662 Calls to our _instance_[un]apply_static() are unreliable [1], so we can't use them to keep track of the active-instance count, to start/stop the perf monitor thread on demand. Instead, this keeps the thread running continuously. [1] https://github.com/redhat-performance/tuned/issues/662 Signed-off-by: Adriaan Schmidt --- tuned/plugins/plugin_kthread.py | 34 ++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tuned/plugins/plugin_kthread.py b/tuned/plugins/plugin_kthread.py index 78bdc639..5d72a1e8 100644 --- a/tuned/plugins/plugin_kthread.py +++ b/tuned/plugins/plugin_kthread.py @@ -132,6 +132,8 @@ def __init__(self, monitor_repository, storage_factory, hardware_inventory, devi def cleanup(self): super(KthreadPlugin, self).cleanup() + # workaround for #662: shut down the monitor thread + self._perf_monitor_shutdown() self._perf_shutdown() # @@ -142,6 +144,8 @@ def _init_devices(self): self._free_devices = set() self._assigned_devices = set() self._kthread_scan(initial=True) + # workaround for #662: always run the monitor thread + self._perf_monitor_start() @classmethod def _get_config_options(cls): @@ -265,19 +269,23 @@ def _get_matching_devices(self, instance, devices): matching_devices.add(device) return matching_devices - def _instance_apply_static(self, instance): - if self._instance_count == 0: - # scan for kthreads that have appeared since plugin initialization - self._kthread_scan(initial=False) - self._perf_monitor_start() - self._instance_count += 1 - super(KthreadPlugin, self)._instance_apply_static(instance) - - def _instance_unapply_static(self, instance, rollback): - super(KthreadPlugin, self)._instance_unapply_static(instance, rollback) - self._instance_count -= 1 - if self._instance_count == 0: - self._perf_monitor_shutdown() + # workaround for #662: + # calls to our _instance_[un]apply_static() are unreliable, so we can't + # use them to count active instances and start/stop the monitor thread + # on demand (https://github.com/redhat-performance/tuned/issues/662) + #def _instance_apply_static(self, instance): + # if self._instance_count == 0: + # # scan for kthreads that have appeared since plugin initialization + # self._kthread_scan(initial=False) + # self._perf_monitor_start() + # self._instance_count += 1 + # super(KthreadPlugin, self)._instance_apply_static(instance) + + #def _instance_unapply_static(self, instance, rollback): + # super(KthreadPlugin, self)._instance_unapply_static(instance, rollback) + # self._instance_count -= 1 + # if self._instance_count == 0: + # self._perf_monitor_shutdown() # # internal bookkeeping (self._kthreads) From cc9a1e1833ce7d710109bdc86eb9c497634716d4 Mon Sep 17 00:00:00 2001 From: Adriaan Schmidt Date: Tue, 3 Dec 2024 08:50:53 +0100 Subject: [PATCH 4/5] feat: allow dynamic instances of non-hotplug plugins Signed-off-by: Adriaan Schmidt --- tuned/daemon/controller.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/tuned/daemon/controller.py b/tuned/daemon/controller.py index 726e3a23..4510a202 100644 --- a/tuned/daemon/controller.py +++ b/tuned/daemon/controller.py @@ -467,10 +467,6 @@ def instance_create(self, plugin_name, instance_name, options, caller = None): log.error(rets) return (False, rets) plugin = plugins[plugin_name] - if not isinstance(plugin, hotplug.Plugin): - rets = "Plugin '%s' does not support hotplugging or dynamic instances." % plugin.name - log.error(rets) - return (False, rets) devices = options.pop("devices", None) devices_udev_regex = options.pop("devices_udev_regex", None) script_pre = options.pop("script_pre", None) @@ -525,13 +521,11 @@ def instance_destroy(self, instance_name, caller = None): log.error(rets) return (False, rets) plugin = instance.plugin - if not isinstance(plugin, hotplug.Plugin): - rets = "Plugin '%s' does not support hotplugging or dynamic instances." % plugin.name - log.error(rets) - return (False, rets) + has_devices = isinstance(plugin, hotplug.Plugin) devices = instance.processed_devices.copy() try: - plugin._remove_devices_nocheck(instance, devices) + if has_devices: + plugin._remove_devices_nocheck(instance, devices) self._daemon._unit_manager.instances.remove(instance) plugin.instance_unapply_tuning(instance) plugin.destroy_instance(instance) @@ -540,7 +534,8 @@ def instance_destroy(self, instance_name, caller = None): log.error(rets) return (False, rets) log.info("Deleted instance '%s'" % instance_name) - for device in devices: - # _add_device() will find a suitable plugin instance - plugin._add_device(device) + if has_devices: + for device in devices: + # _add_device() will find a suitable plugin instance + plugin._add_device(device) return (True, "OK") From 4c924b9b741dbbf1a6e4ffec3f53e397605dd94e Mon Sep 17 00:00:00 2001 From: Adriaan Schmidt Date: Tue, 3 Dec 2024 11:41:30 +0100 Subject: [PATCH 5/5] refactor: base plugin_kthread on base.Plugin Signed-off-by: Adriaan Schmidt --- tuned/plugins/plugin_kthread.py | 266 +++++++++++++++++++------------- 1 file changed, 163 insertions(+), 103 deletions(-) diff --git a/tuned/plugins/plugin_kthread.py b/tuned/plugins/plugin_kthread.py index 5d72a1e8..72f889b0 100644 --- a/tuned/plugins/plugin_kthread.py +++ b/tuned/plugins/plugin_kthread.py @@ -1,4 +1,4 @@ -from . import hotplug +from . import base from .decorators import * import tuned.consts as consts import tuned.logs @@ -31,12 +31,13 @@ class KthreadInfo(object): def __init__(self, proc): self.pid = proc.pid self.comm = procfs.process_cmdline(proc) - self.device = "%d:%s" % (self.pid, self.comm) # it seems we can check for fixed affinities too early, i.e. procfs # indicates "changeable", but we fail later when we try to actually change it # so: delay the check until later self.affinity_changeable = None self.sched_orig = None + def __str__(self): + return "%d:%s" % (self.pid, self.comm) # scheduling options class SchedOpts(object): @@ -53,7 +54,7 @@ def __init__(self, name, prio, sched, regex): self.sched = sched self.regex = regex -class KthreadPlugin(hotplug.Plugin): +class KthreadPlugin(base.Plugin): r""" `kthread`:: @@ -132,20 +133,15 @@ def __init__(self, monitor_repository, storage_factory, hardware_inventory, devi def cleanup(self): super(KthreadPlugin, self).cleanup() - # workaround for #662: shut down the monitor thread - self._perf_monitor_shutdown() self._perf_shutdown() # # plugin-level methods: devices and plugin options # def _init_devices(self): - self._devices_supported = True - self._free_devices = set() - self._assigned_devices = set() + self._devices_supported = False + self._kthread_pids_unassigned = set() self._kthread_scan(initial=True) - # workaround for #662: always run the monitor thread - self._perf_monitor_start() @classmethod def _get_config_options(cls): @@ -153,13 +149,16 @@ def _get_config_options(cls): # nothing here, the group.* options are covered by self._has_dynamic_options } - # - # helper functions - # - def _convert_device_to_pid(self, device): - """Extract the PID (as int) from the device name string""" - pid, _ = device.split(":", 1) - return int(pid) + def _plugin_add_kthread(self, pid): + for instance in self._instances.values(): + if self._get_matching_kthreads(instance, [pid]): + self._instance_add_kthread(instance, pid) + return + self._kthread_pids_unassigned.add(pid) + + def _plugin_remove_kthread(self, pid): + for instance in self._instances.values(): + self._instance_remove_kthread(instance, pid) # # instance-level methods: implement the Instance interface @@ -169,19 +168,20 @@ def _instance_init(self, instance): instance._has_dynamic_tuning = False # set our instance name, so the _instance_kthread command can find us instance.options["_instance"] = instance.name - # process group.* options if not already done via _get_matching_devices - if not hasattr(instance, "_groups"): - self._instance_prepare_device_matching(instance) - # warn in case we have device or udev expressions... those don't work in this plugin - if instance._devices_expression not in [None, "*"]: - log.warning("Ignoring devices expression '%s' of instance '%s'" % (instance._devices_expression, instance.name)) - if instance._devices_udev_regex is not None: - log.warning("Ignoring devices udev regex '%s' of instance '%s'" % (instance._devices_udev_regex, instance.name)) + # threads handled by instance, assigned and processed + instance._kthreads_assigned = set() + instance._kthreads_processed = set() + instance._tuning_active = False + instance._lock = threading.RLock() + # process group.* options + self._instance_prepare_matching(instance) + # grab initial set of kthreads + self._instance_acquire_kthreads(instance) def _instance_cleanup(self, instance): - pass + self._instance_release_kthreads(instance) - def _instance_prepare_device_matching(self, instance): + def _instance_prepare_matching(self, instance): """Process all group.* options and populate instance._groups""" groups = [] for k, v in instance.options.items(): @@ -246,52 +246,139 @@ def _get_instance_sched_options(self, instance, kthread): return group.sched return None - def _get_matching_devices(self, instance, devices): + def _get_matching_kthreads(self, instance, pids): """ overrides method in base.Plugin instead of matching with devices/udev regexes, we use the group.* definitions to determine threads that fit the instance """ - # this can be called before the instance is initialized via - # _instance_init(), so we need to make sure that we process our - # group.* options - if not hasattr(instance, "_groups"): - self._instance_prepare_device_matching(instance) - matching_devices = set() - for device in devices: - pid = self._convert_device_to_pid(device) + matching_kthreads = set() + for pid in pids: try: kthread = self._kthread_get(pid) except ThreadNoLongerExists: - self._kthread_remove(pid) + self._kthread_internal_remove(pid) continue if self._get_instance_sched_options(instance, kthread) is not None: - matching_devices.add(device) - return matching_devices - - # workaround for #662: - # calls to our _instance_[un]apply_static() are unreliable, so we can't - # use them to count active instances and start/stop the monitor thread - # on demand (https://github.com/redhat-performance/tuned/issues/662) - #def _instance_apply_static(self, instance): - # if self._instance_count == 0: - # # scan for kthreads that have appeared since plugin initialization - # self._kthread_scan(initial=False) - # self._perf_monitor_start() - # self._instance_count += 1 - # super(KthreadPlugin, self)._instance_apply_static(instance) - - #def _instance_unapply_static(self, instance, rollback): - # super(KthreadPlugin, self)._instance_unapply_static(instance, rollback) - # self._instance_count -= 1 - # if self._instance_count == 0: - # self._perf_monitor_shutdown() + matching_kthreads.add(pid) + return matching_kthreads + + def _instance_add_kthread(self, instance, pid): + """add a kthread to an instance, and tune it""" + with instance._lock: + if instance._tuning_active: + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + instance._kthreads_processed.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + else: + instance._kthreads_assigned.add(pid) + + def _instance_remove_kthread(self, instance, pid): + """remove a kthread from an instance, and unapply tuning""" + with instance._lock: + if pid in instance._kthreads_assigned: + instance._kthreads_assigned.remove(pid) + elif pid in instance._kthreads_processed: + try: + instance._kthreads_processed.remove(pid) + kthread = self._kthread_get(pid) + self._restore_kthread_tuning(kthread) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + else: + # kthread does not belong to instance. ignore. + pass + + def _instance_transfer_kthread(self, instance_from, instance_to, pid): + # TODO: implement transfer that moves directly from one tuning to the next, + # without restoring the original state + self._instance_remove_kthread(instance_from, pid) + self._instance_add_kthread(instance_to, pid) + + def _instance_acquire_kthreads(self, instance): + """assign all matching kthreads to an instance""" + # first the ones that are currently unassigned + with self._lock: + acquire_kthreads = self._get_matching_kthreads(instance, self._kthread_pids_unassigned) + self._kthread_pids_unassigned -= acquire_kthreads + for pid in acquire_kthreads: + self._instance_add_kthread(instance, pid) + # and then the ones from other instances + for other_instance in self._instances.values(): + if (other_instance == instance or instance.priority > other_instance.priority): + continue + transfer_kthreads = self._get_matching_kthreads(instance, other_instance._kthreads_assigned | other_instance._kthreads_processed) + for pid in transfer_kthreads: + self._instance_transfer_kthread(other_instance, instance, pid) + + def _instance_release_kthreads(self, instance): + """release all kthreads from an instance""" + free_kthreads = instance._kthreads_assigned | instance._kthreads_processed + # first the ones now claimed by other instances + for other_instance in self._instances.values(): + transfer_kthreads = self._get_matching_kthreads(other_instance, free_kthreads) + for pid in list(transfer_kthreads): + self._instance_transfer_kthread(instance, other_instance, pid) + free_kthreads.remove(pid) + # the remaining ones go back to unassigned + with self._lock: + for pid in free_kthreads: + self._instance_remove_kthread(instance, pid) + self._kthread_pids_unassigned.add(pid) + + def _instance_apply_static(self, instance): + if self._instance_count == 0: + # scan for kthreads that have appeared since plugin initialization + self._kthread_scan(initial=False) + self._perf_monitor_start() + self._instance_count += 1 + with instance._lock: + instance._tuning_active = True + for pid in list(instance._kthreads_assigned): + instance._kthreads_assigned.remove(pid) + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + instance._kthreads_processed.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + + def _instance_verify_static(self, instance, ignore_missing, devices): + result = True + with instance._lock: + for pid in list(instance._kthreads_processed): + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + result &= self._verify_kthread_tuning(kthread, opts) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + return result + + def _instance_unapply_static(self, instance, rollback): + with instance._lock: + instance._tuning_active = False + for pid in list(instance._kthreads_processed): + instance._kthreads_processed.remove(pid) + try: + kthread = self._kthread_get(pid) + self._restore_kthread_tuning(kthread) + instance._kthreads_assigned.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + self._instance_count -= 1 + if self._instance_count == 0: + self._perf_monitor_shutdown() # # internal bookkeeping (self._kthreads) # as these methods are called from the main thred and the perf monitor - # thread, we need to lock all accesses to self._kthreads and the - # hotplug.Plugin methods _add_device()/_remove_device() + # thread, we need to lock all accesses to self._kthreads # def _kthread_scan(self, initial=False): """Scan procfs for kernel threads and add them to our bookkeeping @@ -301,9 +388,9 @@ def _kthread_scan(self, initial=False): """ ps = procfs.pidstats() for pid in ps.keys(): - self._kthread_add(pid, initial) + self._kthread_internal_add(pid, initial) - def _kthread_add(self, pid, initial=False): + def _kthread_internal_add(self, pid, initial=False): """Add kernel thread to internal bookkeeping Args: @@ -325,12 +412,12 @@ def _kthread_add(self, pid, initial=False): return self._kthreads[kthread.pid] = kthread if initial: - self._free_devices.add(kthread.device) + self._kthread_pids_unassigned.add(kthread.pid) else: - self._add_device(kthread.device) - log.debug("Added kthread %s" % kthread.device) + self._plugin_add_kthread(kthread.pid) + log.debug("Added kthread %s" % kthread) - def _kthread_remove(self, pid): + def _kthread_internal_remove(self, pid): """Remove kernel thread from internal bookkeeping Args: @@ -338,9 +425,8 @@ def _kthread_remove(self, pid): """ try: with self._lock: - device = self._kthreads[pid].device del self._kthreads[pid] - self._remove_device(device) + self._plugin_remove_kthread(pid) except KeyError: return log.debug("Removed kthread %d" % pid) @@ -414,9 +500,9 @@ def _perf_monitor_thread(self): if event and hasattr(event, "type"): have_events = True if event.type == perf.RECORD_COMM: - self._kthread_add(event.tid) + self._kthread_internal_add(event.tid) elif event.type == perf.RECORD_EXIT: - self._kthread_remove(event.tid) + self._kthread_internal_remove(event.tid) log.debug("perf monitor thread shutting down") # @@ -492,15 +578,15 @@ def _apply_kthread_tuning(self, kthread, opts): try: self._set_affinity(kthread.pid, opts.affinity) kthread.affinity_changeable = True - log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread.device, opts.affinity)) + log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread, opts.affinity)) except AffinityNotChangeable: kthread.affinity_changeable = False - log.debug("The CPU affinity of kthread %s is not changeable"% kthread.device) + log.debug("The CPU affinity of kthread %s is not changeable" % kthread) if opts.policy is not None or opts.priority is not None: if opts.policy != current_policy or opts.priority != current_priority: self._set_schedopts(kthread.pid, opts.policy, opts.priority) log.debug("Set scheduling of kthread %s to '%s'" - % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + % (kthread, self._format_schedopts(opts.policy, opts.priority))) def _restore_kthread_tuning(self, kthread): opts = kthread.sched_orig @@ -510,21 +596,21 @@ def _restore_kthread_tuning(self, kthread): try: self._set_affinity(kthread.pid, opts.affinity) log.debug("Restored CPU affinity of kthread %s to '%s'" - % (kthread.device, opts.affinity)) + % (kthread, opts.affinity)) except AffinityNotChangeable: log.debug("Failed to restore CPU affinity of kthread %s to '%s'" - % (kthread.device, opts.affinity)) + % (kthread, opts.affinity)) if opts.policy != current_policy or opts.priority != current_priority: self._set_schedopts(kthread.pid, opts.policy, opts.priority) log.debug("Restored scheduling of kthread %s to '%s'" - % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + % (kthread, self._format_schedopts(opts.policy, opts.priority))) def _verify_kthread_tuning(self, kthread, opts): affinity_ok, priority_ok = True, True current_affinity = self._get_affinity(kthread.pid) current_policy, current_priority = self._get_schedopts(kthread.pid) if opts.affinity is not None and kthread.affinity_changeable: - desc = "CPU affinity of kthread %s" % kthread.device + desc = "CPU affinity of kthread %s" % kthread current = self._cmd.cpulist2string(self._cmd.cpulist_pack(current_affinity)) if opts.affinity == current_affinity: log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) @@ -533,7 +619,7 @@ def _verify_kthread_tuning(self, kthread, opts): log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) affinity_ok = False if opts.policy is not None or opts.priority is not None: - desc = "scheduling of kthread %s" % kthread.device + desc = "scheduling of kthread %s" % kthread current = self._format_schedopts(current_policy, current_priority) if opts.policy == current_policy and opts.priority == current_priority: log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) @@ -542,29 +628,3 @@ def _verify_kthread_tuning(self, kthread, opts): log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) priority_ok = False return affinity_ok and priority_ok - - # - # command definitions: entry point for device tuning - # - @command_custom("_instance", per_device=True) - def _instance_kthread(self, start, value, device, verify, ignore_missing): - """ - This is the actual entry point for tuning. - value (of the option "_instance") is the name of the instance, set in _instance_init - """ - pid = self._convert_device_to_pid(device) - try: - kthread = self._kthread_get(pid) - if verify: - instance = self._instances[value] - opts = self._get_instance_sched_options(instance, kthread) - return self._verify_kthread_tuning(kthread, opts) - if start: - instance = self._instances[value] - opts = self._get_instance_sched_options(instance, kthread) - self._apply_kthread_tuning(kthread, opts) - else: - self._restore_kthread_tuning(kthread) - except ThreadNoLongerExists: - self._kthread_remove(pid) - return None