diff --git a/tuned/plugins/plugin_kthread.py b/tuned/plugins/plugin_kthread.py index 5d72a1e8..72f889b0 100644 --- a/tuned/plugins/plugin_kthread.py +++ b/tuned/plugins/plugin_kthread.py @@ -1,4 +1,4 @@ -from . import hotplug +from . import base from .decorators import * import tuned.consts as consts import tuned.logs @@ -31,12 +31,13 @@ class KthreadInfo(object): def __init__(self, proc): self.pid = proc.pid self.comm = procfs.process_cmdline(proc) - self.device = "%d:%s" % (self.pid, self.comm) # it seems we can check for fixed affinities too early, i.e. procfs # indicates "changeable", but we fail later when we try to actually change it # so: delay the check until later self.affinity_changeable = None self.sched_orig = None + def __str__(self): + return "%d:%s" % (self.pid, self.comm) # scheduling options class SchedOpts(object): @@ -53,7 +54,7 @@ def __init__(self, name, prio, sched, regex): self.sched = sched self.regex = regex -class KthreadPlugin(hotplug.Plugin): +class KthreadPlugin(base.Plugin): r""" `kthread`:: @@ -132,20 +133,15 @@ def __init__(self, monitor_repository, storage_factory, hardware_inventory, devi def cleanup(self): super(KthreadPlugin, self).cleanup() - # workaround for #662: shut down the monitor thread - self._perf_monitor_shutdown() self._perf_shutdown() # # plugin-level methods: devices and plugin options # def _init_devices(self): - self._devices_supported = True - self._free_devices = set() - self._assigned_devices = set() + self._devices_supported = False + self._kthread_pids_unassigned = set() self._kthread_scan(initial=True) - # workaround for #662: always run the monitor thread - self._perf_monitor_start() @classmethod def _get_config_options(cls): @@ -153,13 +149,16 @@ def _get_config_options(cls): # nothing here, the group.* options are covered by self._has_dynamic_options } - # - # helper functions - # - def _convert_device_to_pid(self, device): - """Extract the PID (as int) from the device name string""" - pid, _ = device.split(":", 1) - return int(pid) + def _plugin_add_kthread(self, pid): + for instance in self._instances.values(): + if self._get_matching_kthreads(instance, [pid]): + self._instance_add_kthread(instance, pid) + return + self._kthread_pids_unassigned.add(pid) + + def _plugin_remove_kthread(self, pid): + for instance in self._instances.values(): + self._instance_remove_kthread(instance, pid) # # instance-level methods: implement the Instance interface @@ -169,19 +168,20 @@ def _instance_init(self, instance): instance._has_dynamic_tuning = False # set our instance name, so the _instance_kthread command can find us instance.options["_instance"] = instance.name - # process group.* options if not already done via _get_matching_devices - if not hasattr(instance, "_groups"): - self._instance_prepare_device_matching(instance) - # warn in case we have device or udev expressions... those don't work in this plugin - if instance._devices_expression not in [None, "*"]: - log.warning("Ignoring devices expression '%s' of instance '%s'" % (instance._devices_expression, instance.name)) - if instance._devices_udev_regex is not None: - log.warning("Ignoring devices udev regex '%s' of instance '%s'" % (instance._devices_udev_regex, instance.name)) + # threads handled by instance, assigned and processed + instance._kthreads_assigned = set() + instance._kthreads_processed = set() + instance._tuning_active = False + instance._lock = threading.RLock() + # process group.* options + self._instance_prepare_matching(instance) + # grab initial set of kthreads + self._instance_acquire_kthreads(instance) def _instance_cleanup(self, instance): - pass + self._instance_release_kthreads(instance) - def _instance_prepare_device_matching(self, instance): + def _instance_prepare_matching(self, instance): """Process all group.* options and populate instance._groups""" groups = [] for k, v in instance.options.items(): @@ -246,52 +246,139 @@ def _get_instance_sched_options(self, instance, kthread): return group.sched return None - def _get_matching_devices(self, instance, devices): + def _get_matching_kthreads(self, instance, pids): """ overrides method in base.Plugin instead of matching with devices/udev regexes, we use the group.* definitions to determine threads that fit the instance """ - # this can be called before the instance is initialized via - # _instance_init(), so we need to make sure that we process our - # group.* options - if not hasattr(instance, "_groups"): - self._instance_prepare_device_matching(instance) - matching_devices = set() - for device in devices: - pid = self._convert_device_to_pid(device) + matching_kthreads = set() + for pid in pids: try: kthread = self._kthread_get(pid) except ThreadNoLongerExists: - self._kthread_remove(pid) + self._kthread_internal_remove(pid) continue if self._get_instance_sched_options(instance, kthread) is not None: - matching_devices.add(device) - return matching_devices - - # workaround for #662: - # calls to our _instance_[un]apply_static() are unreliable, so we can't - # use them to count active instances and start/stop the monitor thread - # on demand (https://github.com/redhat-performance/tuned/issues/662) - #def _instance_apply_static(self, instance): - # if self._instance_count == 0: - # # scan for kthreads that have appeared since plugin initialization - # self._kthread_scan(initial=False) - # self._perf_monitor_start() - # self._instance_count += 1 - # super(KthreadPlugin, self)._instance_apply_static(instance) - - #def _instance_unapply_static(self, instance, rollback): - # super(KthreadPlugin, self)._instance_unapply_static(instance, rollback) - # self._instance_count -= 1 - # if self._instance_count == 0: - # self._perf_monitor_shutdown() + matching_kthreads.add(pid) + return matching_kthreads + + def _instance_add_kthread(self, instance, pid): + """add a kthread to an instance, and tune it""" + with instance._lock: + if instance._tuning_active: + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + instance._kthreads_processed.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + else: + instance._kthreads_assigned.add(pid) + + def _instance_remove_kthread(self, instance, pid): + """remove a kthread from an instance, and unapply tuning""" + with instance._lock: + if pid in instance._kthreads_assigned: + instance._kthreads_assigned.remove(pid) + elif pid in instance._kthreads_processed: + try: + instance._kthreads_processed.remove(pid) + kthread = self._kthread_get(pid) + self._restore_kthread_tuning(kthread) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + else: + # kthread does not belong to instance. ignore. + pass + + def _instance_transfer_kthread(self, instance_from, instance_to, pid): + # TODO: implement transfer that moves directly from one tuning to the next, + # without restoring the original state + self._instance_remove_kthread(instance_from, pid) + self._instance_add_kthread(instance_to, pid) + + def _instance_acquire_kthreads(self, instance): + """assign all matching kthreads to an instance""" + # first the ones that are currently unassigned + with self._lock: + acquire_kthreads = self._get_matching_kthreads(instance, self._kthread_pids_unassigned) + self._kthread_pids_unassigned -= acquire_kthreads + for pid in acquire_kthreads: + self._instance_add_kthread(instance, pid) + # and then the ones from other instances + for other_instance in self._instances.values(): + if (other_instance == instance or instance.priority > other_instance.priority): + continue + transfer_kthreads = self._get_matching_kthreads(instance, other_instance._kthreads_assigned | other_instance._kthreads_processed) + for pid in transfer_kthreads: + self._instance_transfer_kthread(other_instance, instance, pid) + + def _instance_release_kthreads(self, instance): + """release all kthreads from an instance""" + free_kthreads = instance._kthreads_assigned | instance._kthreads_processed + # first the ones now claimed by other instances + for other_instance in self._instances.values(): + transfer_kthreads = self._get_matching_kthreads(other_instance, free_kthreads) + for pid in list(transfer_kthreads): + self._instance_transfer_kthread(instance, other_instance, pid) + free_kthreads.remove(pid) + # the remaining ones go back to unassigned + with self._lock: + for pid in free_kthreads: + self._instance_remove_kthread(instance, pid) + self._kthread_pids_unassigned.add(pid) + + def _instance_apply_static(self, instance): + if self._instance_count == 0: + # scan for kthreads that have appeared since plugin initialization + self._kthread_scan(initial=False) + self._perf_monitor_start() + self._instance_count += 1 + with instance._lock: + instance._tuning_active = True + for pid in list(instance._kthreads_assigned): + instance._kthreads_assigned.remove(pid) + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + instance._kthreads_processed.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + + def _instance_verify_static(self, instance, ignore_missing, devices): + result = True + with instance._lock: + for pid in list(instance._kthreads_processed): + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + result &= self._verify_kthread_tuning(kthread, opts) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + return result + + def _instance_unapply_static(self, instance, rollback): + with instance._lock: + instance._tuning_active = False + for pid in list(instance._kthreads_processed): + instance._kthreads_processed.remove(pid) + try: + kthread = self._kthread_get(pid) + self._restore_kthread_tuning(kthread) + instance._kthreads_assigned.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + self._instance_count -= 1 + if self._instance_count == 0: + self._perf_monitor_shutdown() # # internal bookkeeping (self._kthreads) # as these methods are called from the main thred and the perf monitor - # thread, we need to lock all accesses to self._kthreads and the - # hotplug.Plugin methods _add_device()/_remove_device() + # thread, we need to lock all accesses to self._kthreads # def _kthread_scan(self, initial=False): """Scan procfs for kernel threads and add them to our bookkeeping @@ -301,9 +388,9 @@ def _kthread_scan(self, initial=False): """ ps = procfs.pidstats() for pid in ps.keys(): - self._kthread_add(pid, initial) + self._kthread_internal_add(pid, initial) - def _kthread_add(self, pid, initial=False): + def _kthread_internal_add(self, pid, initial=False): """Add kernel thread to internal bookkeeping Args: @@ -325,12 +412,12 @@ def _kthread_add(self, pid, initial=False): return self._kthreads[kthread.pid] = kthread if initial: - self._free_devices.add(kthread.device) + self._kthread_pids_unassigned.add(kthread.pid) else: - self._add_device(kthread.device) - log.debug("Added kthread %s" % kthread.device) + self._plugin_add_kthread(kthread.pid) + log.debug("Added kthread %s" % kthread) - def _kthread_remove(self, pid): + def _kthread_internal_remove(self, pid): """Remove kernel thread from internal bookkeeping Args: @@ -338,9 +425,8 @@ def _kthread_remove(self, pid): """ try: with self._lock: - device = self._kthreads[pid].device del self._kthreads[pid] - self._remove_device(device) + self._plugin_remove_kthread(pid) except KeyError: return log.debug("Removed kthread %d" % pid) @@ -414,9 +500,9 @@ def _perf_monitor_thread(self): if event and hasattr(event, "type"): have_events = True if event.type == perf.RECORD_COMM: - self._kthread_add(event.tid) + self._kthread_internal_add(event.tid) elif event.type == perf.RECORD_EXIT: - self._kthread_remove(event.tid) + self._kthread_internal_remove(event.tid) log.debug("perf monitor thread shutting down") # @@ -492,15 +578,15 @@ def _apply_kthread_tuning(self, kthread, opts): try: self._set_affinity(kthread.pid, opts.affinity) kthread.affinity_changeable = True - log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread.device, opts.affinity)) + log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread, opts.affinity)) except AffinityNotChangeable: kthread.affinity_changeable = False - log.debug("The CPU affinity of kthread %s is not changeable"% kthread.device) + log.debug("The CPU affinity of kthread %s is not changeable" % kthread) if opts.policy is not None or opts.priority is not None: if opts.policy != current_policy or opts.priority != current_priority: self._set_schedopts(kthread.pid, opts.policy, opts.priority) log.debug("Set scheduling of kthread %s to '%s'" - % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + % (kthread, self._format_schedopts(opts.policy, opts.priority))) def _restore_kthread_tuning(self, kthread): opts = kthread.sched_orig @@ -510,21 +596,21 @@ def _restore_kthread_tuning(self, kthread): try: self._set_affinity(kthread.pid, opts.affinity) log.debug("Restored CPU affinity of kthread %s to '%s'" - % (kthread.device, opts.affinity)) + % (kthread, opts.affinity)) except AffinityNotChangeable: log.debug("Failed to restore CPU affinity of kthread %s to '%s'" - % (kthread.device, opts.affinity)) + % (kthread, opts.affinity)) if opts.policy != current_policy or opts.priority != current_priority: self._set_schedopts(kthread.pid, opts.policy, opts.priority) log.debug("Restored scheduling of kthread %s to '%s'" - % (kthread.device, self._format_schedopts(opts.policy, opts.priority))) + % (kthread, self._format_schedopts(opts.policy, opts.priority))) def _verify_kthread_tuning(self, kthread, opts): affinity_ok, priority_ok = True, True current_affinity = self._get_affinity(kthread.pid) current_policy, current_priority = self._get_schedopts(kthread.pid) if opts.affinity is not None and kthread.affinity_changeable: - desc = "CPU affinity of kthread %s" % kthread.device + desc = "CPU affinity of kthread %s" % kthread current = self._cmd.cpulist2string(self._cmd.cpulist_pack(current_affinity)) if opts.affinity == current_affinity: log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) @@ -533,7 +619,7 @@ def _verify_kthread_tuning(self, kthread, opts): log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) affinity_ok = False if opts.policy is not None or opts.priority is not None: - desc = "scheduling of kthread %s" % kthread.device + desc = "scheduling of kthread %s" % kthread current = self._format_schedopts(current_policy, current_priority) if opts.policy == current_policy and opts.priority == current_priority: log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) @@ -542,29 +628,3 @@ def _verify_kthread_tuning(self, kthread, opts): log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) priority_ok = False return affinity_ok and priority_ok - - # - # command definitions: entry point for device tuning - # - @command_custom("_instance", per_device=True) - def _instance_kthread(self, start, value, device, verify, ignore_missing): - """ - This is the actual entry point for tuning. - value (of the option "_instance") is the name of the instance, set in _instance_init - """ - pid = self._convert_device_to_pid(device) - try: - kthread = self._kthread_get(pid) - if verify: - instance = self._instances[value] - opts = self._get_instance_sched_options(instance, kthread) - return self._verify_kthread_tuning(kthread, opts) - if start: - instance = self._instances[value] - opts = self._get_instance_sched_options(instance, kthread) - self._apply_kthread_tuning(kthread, opts) - else: - self._restore_kthread_tuning(kthread) - except ThreadNoLongerExists: - self._kthread_remove(pid) - return None