diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index f95a1e25..b01ff18e 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -24,24 +24,24 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. # By default, queries listed here will override any specified in a config file. # Prefix the list here with "+" to use these queries and those in the config file. - + # Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs # queries: security-extended,security-and-quality - + - name: Autobuild - uses: github/codeql-action/autobuild@v2 + uses: github/codeql-action/autobuild@v3 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 with: category: "/language:${{matrix.language}}" diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 00000000..5ca6081d --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,14 @@ +name: pre-commit + +on: + pull_request: + push: + branches: [master] + +jobs: + pre-commit: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - uses: pre-commit/action@v3.0.1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..78a1e08c --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,22 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: trailing-whitespace + exclude: (^testing/Baseline|^auxil) + - id: end-of-file-fixer + exclude: (^testing/Baseline|examples/.*Baseline.*|^auxil) + +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.8.1 + hooks: + - id: ruff-format + - id: ruff + args: [--fix] + +- repo: https://github.com/rhysd/actionlint + rev: v1.7.4 + hooks: + - id: actionlint diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 3754ea08..00000000 --- a/.travis.yml +++ /dev/null @@ -1,42 +0,0 @@ -dist: bionic - -language: python -python: - - "3.5" - - "3.6" - - "3.7" - - "3.8" - - "3.9" - -addons: - apt: - packages: - - libpcap-dev - - swig - -branches: - only: - - master - -notifications: - email: - recipients: - - zeek-commits-internal@zeek.org - -before_install: - # Clone the zeek git repo and replace zeekctl. - - cd .. - - tar cf zeekctl.tar --exclude=.git zeekctl - - rm -rf zeekctl - - git clone --recursive https://github.com/zeek/zeek zeek - - rm -rf zeek/auxil/zeekctl - - tar xf zeekctl.tar -C zeek/auxil - - rm -f zeekctl.tar - - cd zeek/auxil/zeekctl/testing - -install: - - make buildzeek - -script: ../../btest/btest -f diag.log -A - -after_failure: cat diag.log diff --git a/BroControl/__init__.py b/BroControl/__init__.py index a5915533..72063999 100644 --- a/BroControl/__init__.py +++ b/BroControl/__init__.py @@ -1,7 +1,6 @@ -from __future__ import print_function import sys -msg=""" +msg = """ Error: ZeekControl plugin uses legacy BroControl API. Use 'import ZeekControl.plugin' instead of 'import BroControl.plugin'. """ diff --git a/BroControl/cmdresult.py b/BroControl/cmdresult.py index b1317fa5..d366779e 100644 --- a/BroControl/cmdresult.py +++ b/BroControl/cmdresult.py @@ -3,4 +3,5 @@ # import ZeekControl.cmdresult + CmdResult = ZeekControl.cmdresult.CmdResult diff --git a/BroControl/config.py b/BroControl/config.py index f9595515..2592b58b 100644 --- a/BroControl/config.py +++ b/BroControl/config.py @@ -3,4 +3,5 @@ # import ZeekControl.config + Config = ZeekControl.config.Config diff --git a/BroControl/plugin.py b/BroControl/plugin.py index 43b43674..456a730a 100644 --- a/BroControl/plugin.py +++ b/BroControl/plugin.py @@ -3,4 +3,5 @@ # import ZeekControl.plugin + Plugin = ZeekControl.plugin.Plugin diff --git a/CHANGES b/CHANGES index 53b63021..48c2a002 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,38 @@ +2.5.0-74 | 2024-12-10 17:22:23 -0700 + + * Update cmake submodule to master (Tim Wojtulewicz, Corelight) + + * Update trace-summary submodule for python upgrade (Tim Wojtulewicz, Corelight) + + * Update pysubnettree submodule (Tim Wojtulewicz, Corelight) + + * Add workflow for running pre-commit (Tim Wojtulewicz, Corelight) + + * Update codeql action versions, add linting for workflows (Tim Wojtulewicz, Corelight) + + * Fix references to python 3.5 in CMakeLists and docs (Tim Wojtulewicz, Corelight) + + * Add 'F' to ruff, fix findings (Tim Wojtulewicz, Corelight) + + * Add 'ISC' to ruff, fix findings (there weren't any) (Tim Wojtulewicz, Corelight) + + * Add 'I' to ruff, fix findings (Tim Wojtulewicz, Corelight) + + * Add 'C4' to ruff, fix findings (Tim Wojtulewicz, Corelight) + + * Use f-strings or .format() for string formatting (Tim Wojtulewicz, Corelight) + + * Add ruff linting, enabling and fixing the 'upgrade' finds (Tim Wojtulewicz, Corelight) + + This disables the format string finding (UP031) temporarily. It is + handled in a separate commit because it's so many changes. + + * Add pre-commit hook for ruff-format, fix all of the findings (Tim Wojtulewicz, Corelight) + + * Add pre-commit for trailing whitespace, fix findings (Tim Wojtulewicz, Corelight) + + * Remove long-outdated travis configuration (Tim Wojtulewicz, Corelight) + 2.5.0-58 | 2024-08-08 09:25:59 -0700 * Stop installing the broctl symlink (Tim Wojtulewicz, Corelight) diff --git a/CMakeLists.txt b/CMakeLists.txt index a38aea35..a6a1fb62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ endif () find_package(Zeek) list(APPEND Python_ADDITIONAL_VERSIONS 3) -set(ZEEKCTL_PYTHON_MIN 3.5.0) +set(ZEEKCTL_PYTHON_MIN 3.9.0) find_package(Python ${ZEEKCTL_PYTHON_MIN} REQUIRED COMPONENTS Interpreter) find_package(SubnetTree) find_package(PCAP) diff --git a/VERSION b/VERSION index 4eb1f5dc..66c3ccec 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.5.0-58 +2.5.0-74 diff --git a/ZeekControl/cmdresult.py b/ZeekControl/cmdresult.py index 4a2c32ac..e2e538d7 100644 --- a/ZeekControl/cmdresult.py +++ b/ZeekControl/cmdresult.py @@ -2,6 +2,7 @@ from ZeekControl import node as node_mod + class CmdResult: """Class representing the result of a zeekctl command.""" @@ -104,4 +105,3 @@ def set_node_data(self, node, success, data): else: self.fail_count += 1 self.ok = False - diff --git a/ZeekControl/config.py b/ZeekControl/config.py index 169e69c1..87c6e73b 100644 --- a/ZeekControl/config.py +++ b/ZeekControl/config.py @@ -1,20 +1,20 @@ # Functions to read and access the zeekctl configuration. +import configparser import hashlib import os +import re import socket import subprocess -import re import sys -import configparser from ZeekControl import node as node_mod from ZeekControl import options from ZeekControl.exceptions import ConfigurationError, RuntimeEnvironmentError + from .state import SqliteState from .version import VERSION - # Class storing the zeekctl configuration. # # This class provides access to four types of configuration/state: @@ -23,7 +23,8 @@ # - the node configuration from node.cfg # - dynamic state variables which are kept across restarts in spool/state.db -Config = None # Globally accessible instance of Configuration. +Config = None # Globally accessible instance of Configuration. + class NodeStore: def __init__(self): @@ -46,14 +47,18 @@ def add_node(self, node): if nn.lower() == namelower: matchname = nn break - raise ConfigurationError('node name "%s" is a duplicate of "%s"' % (node.name, matchname)) + raise ConfigurationError( + f'node name "{node.name}" is a duplicate of "{matchname}"' + ) self.nodestore[node.name] = node self.nodenameslower.append(namelower) class Configuration: - def __init__(self, basedir, libdir, libdirinternal, cfgfile, zeekscriptdir, ui, state=None): + def __init__( + self, basedir, libdir, libdirinternal, cfgfile, zeekscriptdir, ui, state=None + ): self.ui = ui self.basedir = basedir self.libdir = libdir @@ -108,7 +113,9 @@ def _initialize_options(self): if opt.legacy_name: old_key = opt.legacy_name.lower() if old_key in self.config: - self.ui.error("option '%s' is no longer supported, please use '%s' instead" % (opt.legacy_name, opt.name)) + self.ui.error( + f"option '{opt.legacy_name}' is no longer supported, please use '{opt.name}' instead" + ) errors = True continue @@ -118,14 +125,14 @@ def _initialize_options(self): sys.exit(1) # Set defaults for options we derive dynamically. - self.init_option("mailto", "%s" % os.getenv("USER")) - self.init_option("mailfrom", "Zeek " % socket.gethostname()) + self.init_option("mailto", "{}".format(os.getenv("USER"))) + self.init_option("mailfrom", f"Zeek ") self.init_option("mailalarmsto", self.config["mailto"]) # Determine operating system. success, output = execute.run_localcmd("uname") if not success or not output: - raise RuntimeEnvironmentError("failed to run uname: %s" % output) + raise RuntimeEnvironmentError(f"failed to run uname: {output}") self.init_option("os", output.strip()) # Determine the CPU pinning command. @@ -161,42 +168,64 @@ def _check_options(self): for key, value in self.config.items(): if re.match(allowedchars, key) is None: - raise ConfigurationError('zeekctl option name "%s" contains invalid characters (allowed characters: a-z, 0-9, ., and _)' % key) + raise ConfigurationError( + f'zeekctl option name "{key}" contains invalid characters (allowed characters: a-z, 0-9, ., and _)' + ) if re.match(nostartdigit, key) is None: - raise ConfigurationError('zeekctl option name "%s" cannot start with a number' % key) + raise ConfigurationError( + f'zeekctl option name "{key}" cannot start with a number' + ) # No zeekctl option ever requires the entire value to be wrapped in # quotes, and since doing so can cause problems, we don't allow it. if isinstance(value, str): - if (value.startswith('"') and value.endswith('"') or - value.startswith("'") and value.endswith("'")): - raise ConfigurationError('value of zeekctl option "%s" cannot be wrapped in quotes' % key) - - dirs = ("zeekbase", "logdir", "spooldir", "cfgdir", "zeekscriptdir", - "bindir", "libdirinternal", "plugindir", "scriptsdir") - files = ("makearchivename", ) + if ( + value.startswith('"') + and value.endswith('"') + or value.startswith("'") + and value.endswith("'") + ): + raise ConfigurationError( + f'value of zeekctl option "{key}" cannot be wrapped in quotes' + ) + + dirs = ( + "zeekbase", + "logdir", + "spooldir", + "cfgdir", + "zeekscriptdir", + "bindir", + "libdirinternal", + "plugindir", + "scriptsdir", + ) + files = ("makearchivename",) for d in dirs: v = self.config[d] if not os.path.isdir(v): - raise ConfigurationError('zeekctl option "%s" directory not found: %s' % (d, v)) + raise ConfigurationError( + f'zeekctl option "{d}" directory not found: {v}' + ) for f in files: v = self.config[f] if not os.path.isfile(v): - raise ConfigurationError('zeekctl option "%s" file not found: %s' % (f, v)) + raise ConfigurationError(f'zeekctl option "{f}" file not found: {v}') # Verify that logs don't expire more quickly than the rotation interval logexpireseconds = 60 * self.config["logexpireminutes"] if 0 < logexpireseconds < self.config["logrotationinterval"]: - raise ConfigurationError("Log expire interval cannot be shorter than the log rotation interval") - + raise ConfigurationError( + "Log expire interval cannot be shorter than the log rotation interval" + ) # Convert a time interval string (from the value of the given option name) # to an integer number of minutes. def _get_interval_minutes(self, optname): # Conversion table for time units to minutes. - units = {"day": 24*60, "hr": 60, "min": 1} + units = {"day": 24 * 60, "hr": 60, "min": 1} ss = self.config[optname] try: @@ -210,7 +239,9 @@ def _get_interval_minutes(self, optname): # space, followed by a time unit. mm = re.match("([0-9]+) ?(day|hr|min)s?$", ss) if mm is None: - raise ConfigurationError('value of zeekctl option "%s" is invalid (value must be integer followed by a time unit "day", "hr", or "min"): %s' % (optname, ss)) + raise ConfigurationError( + f'value of zeekctl option "{optname}" is invalid (value must be integer followed by a time unit "day", "hr", or "min"): {ss}' + ) v = int(mm.group(1)) v *= units[mm.group(2)] @@ -227,10 +258,10 @@ def initPostPlugins(self): try: global_env_vars = self._get_env_var_dict(varlist) except ConfigurationError as err: - raise ConfigurationError("zeekctl config: %s" % err) + raise ConfigurationError(f"zeekctl config: {err}") for node in self.nodes(): - for (key, val) in global_env_vars.items(): + for key, val in global_env_vars.items(): # Values from node.cfg take precedence over zeekctl.cfg node.env_vars.setdefault(key, val) @@ -251,7 +282,7 @@ def __getattr__(self, attr): return self.config[attr] if attr in self.state: return self.state[attr] - raise AttributeError("unknown config attribute %s" % attr) + raise AttributeError(f"unknown config attribute {attr}") # Returns a sorted list of all zeekctl.cfg entries. # Includes dynamic variables if dynamic is true. @@ -342,8 +373,7 @@ def subst(self, text): if value is None: value = "" - text = text[0:match.start(1)] + value + text[match.end(1):] - + text = text[0 : match.start(1)] + value + text[match.end(1) :] # Convert string into list of integers (ValueError is raised if any # item in the list is not a non-negative integer). @@ -374,11 +404,15 @@ def _get_env_var_dict(self, text): try: key, val = keyval.split("=", 1) except ValueError: - raise ConfigurationError("missing '=' in env_vars option value: %s" % keyval) + raise ConfigurationError( + f"missing '=' in env_vars option value: {keyval}" + ) key = key.strip() if not key: - raise ConfigurationError("env_vars option value must contain at least one environment variable name: %s" % keyval) + raise ConfigurationError( + f"env_vars option value must contain at least one environment variable name: {keyval}" + ) env_vars[key] = val.strip() @@ -390,7 +424,7 @@ def _read_nodes(self): fname = self.nodecfg try: if not config.read(fname): - raise ConfigurationError("cannot read node config file: %s" % fname) + raise ConfigurationError(f"cannot read node config file: {fname}") except configparser.MissingSectionHeaderError as err: raise ConfigurationError(err) @@ -401,12 +435,13 @@ def _read_nodes(self): node = node_mod.Node(self, sec) # Note that the keys are converted to lowercase by configparser. - for (key, val) in config.items(sec): - + for key, val in config.items(sec): key = key.replace(".", "_") if key not in node_mod.Node._keys: - self.ui.warn("ignoring unrecognized node config option '%s' given for node '%s'" % (key, sec)) + self.ui.warn( + f"ignoring unrecognized node config option '{key}' given for node '{sec}'" + ) continue node.__dict__[key] = val @@ -422,18 +457,22 @@ def _read_nodes(self): def _check_node(self, node, nodestore, counts): if not node.type: - raise ConfigurationError("no type given for node %s" % node.name) + raise ConfigurationError(f"no type given for node {node.name}") if node.type not in node_mod.node_types(): - raise ConfigurationError("unknown node type '%s' given for node '%s'" % (node.type, node.name)) + raise ConfigurationError( + f"unknown node type '{node.type}' given for node '{node.name}'" + ) if not node.host: - raise ConfigurationError("no host given for node '%s'" % node.name) + raise ConfigurationError(f"no host given for node '{node.name}'") try: addrinfo = socket.getaddrinfo(node.host, None, 0, 0, socket.SOL_TCP) except socket.gaierror as e: - raise ConfigurationError("hostname lookup failed for '%s' in node config [%s]" % (node.host, e.args[1])) + raise ConfigurationError( + f"hostname lookup failed for '{node.host}' in node config [{e.args[1]}]" + ) addrs = [addr[4][0] for addr in addrinfo] @@ -453,7 +492,7 @@ def _check_node(self, node, nodestore, counts): try: node.env_vars = self._get_env_var_dict(node.env_vars) except ConfigurationError as err: - raise ConfigurationError("node '%s' config: %s" % (node.name, err)) + raise ConfigurationError(f"node '{node.name}' config: {err}") # Each node gets a number unique across its type. try: @@ -467,57 +506,81 @@ def _check_node(self, node, nodestore, counts): if node.lb_procs: if not node_mod.is_worker(node): - raise ConfigurationError("node '%s' config: load balancing node config options are only for worker nodes" % node.name) + raise ConfigurationError( + f"node '{node.name}' config: load balancing node config options are only for worker nodes" + ) try: numprocs = int(node.lb_procs) except ValueError: - raise ConfigurationError("number of load-balanced processes must be an integer for node '%s'" % node.name) + raise ConfigurationError( + f"number of load-balanced processes must be an integer for node '{node.name}'" + ) if numprocs < 1: - raise ConfigurationError("number of load-balanced processes must be greater than zero for node '%s'" % node.name) + raise ConfigurationError( + f"number of load-balanced processes must be greater than zero for node '{node.name}'" + ) elif node.lb_method: - raise ConfigurationError("number of load-balanced processes not specified for node '%s'" % node.name) + raise ConfigurationError( + f"number of load-balanced processes not specified for node '{node.name}'" + ) try: pin_cpus = self._get_pin_cpu_list(node.pin_cpus, numprocs) except ValueError: - raise ConfigurationError("pin cpus list must contain only non-negative integers for node '%s'" % node.name) + raise ConfigurationError( + f"pin cpus list must contain only non-negative integers for node '{node.name}'" + ) if pin_cpus: node.pin_cpus = pin_cpus[0] if node.lb_procs: if not node.lb_method: - raise ConfigurationError("no load balancing method given for node '%s'" % node.name) - - if node.lb_method not in ("af_packet", "pf_ring", "myricom", "custom", "interfaces"): - raise ConfigurationError("unknown load balancing method '%s' given for node '%s'" % (node.lb_method, node.name)) + raise ConfigurationError( + f"no load balancing method given for node '{node.name}'" + ) + + if node.lb_method not in ( + "af_packet", + "pf_ring", + "myricom", + "custom", + "interfaces", + ): + raise ConfigurationError( + f"unknown load balancing method '{node.lb_method}' given for node '{node.name}'" + ) if node.lb_method == "interfaces": if not node.lb_interfaces: - raise ConfigurationError("list of load-balanced interfaces not specified for node '%s'" % node.name) + raise ConfigurationError( + f"list of load-balanced interfaces not specified for node '{node.name}'" + ) # get list of interfaces to use, and assign one to each node netifs = node.lb_interfaces.split(",") if len(netifs) != numprocs: - raise ConfigurationError("number of load-balanced interfaces is not same as number of load-balanced processes for node '%s'" % node.name) + raise ConfigurationError( + f"number of load-balanced interfaces is not same as number of load-balanced processes for node '{node.name}'" + ) node.interface = netifs.pop().strip() origname = node.name # node names will have a numerical suffix - node.name = "%s-1" % node.name + node.name = f"{node.name}-1" for num in range(2, numprocs + 1): newnode = node.copy() # Update the node attrs that need to be changed - newname = "%s-%d" % (origname, num) + newname = f"{origname}-{num:d}" newnode.name = newname counts[node.type] += 1 newnode.count = counts[node.type] if pin_cpus: - newnode.pin_cpus = pin_cpus[num-1] + newnode.pin_cpus = pin_cpus[num - 1] if newnode.lb_method == "interfaces": newnode.interface = netifs.pop().strip() @@ -541,13 +604,19 @@ def _check_nodestore(self, nodestore): for n in nodestore.values(): if node_mod.is_manager(n): if manager: - raise ConfigurationError("only one manager can be defined in node config") + raise ConfigurationError( + "only one manager can be defined in node config" + ) manager = True if n.addr in localhostaddrs: manageronlocalhost = True if n.addr not in self.localaddrs: - raise ConfigurationError("must run zeekctl on same machine as the manager node. The manager node has IP address %s and this machine has IP addresses: %s" % (n.addr, ", ".join(self.localaddrs))) + raise ConfigurationError( + "must run zeekctl on same machine as the manager node. The manager node has IP address {} and this machine has IP addresses: {}".format( + n.addr, ", ".join(self.localaddrs) + ) + ) elif node_mod.is_proxy(n): proxy = True @@ -555,11 +624,17 @@ def _check_nodestore(self, nodestore): elif node_mod.is_standalone(n): standalone = True if n.addr not in self.localaddrs: - raise ConfigurationError("must run zeekctl on same machine as the standalone node. The standalone node has IP address %s and this machine has IP addresses: %s" % (n.addr, ", ".join(self.localaddrs))) + raise ConfigurationError( + "must run zeekctl on same machine as the standalone node. The standalone node has IP address {} and this machine has IP addresses: {}".format( + n.addr, ", ".join(self.localaddrs) + ) + ) if standalone: if len(nodestore) > 1: - raise ConfigurationError("more than one node defined in standalone node config") + raise ConfigurationError( + "more than one node defined in standalone node config" + ) else: if not manager: raise ConfigurationError("no manager defined in node config") @@ -570,15 +645,16 @@ def _check_nodestore(self, nodestore): if manageronlocalhost: for n in nodestore.values(): if not node_mod.is_manager(n) and n.addr not in localhostaddrs: - raise ConfigurationError("all nodes must use localhost/127.0.0.1/::1 when manager uses it") - + raise ConfigurationError( + "all nodes must use localhost/127.0.0.1/::1 when manager uses it" + ) def _to_bool(self, val): if val.lower() in ("1", "true"): return True if val.lower() in ("0", "false"): return False - raise ValueError("invalid boolean: '%s'" % val) + raise ValueError(f"invalid boolean: '{val}'") # Parses zeekctl.cfg and returns a dictionary of all entries. def _read_config(self, fname): @@ -593,7 +669,7 @@ def _read_config(self, fname): if opt.legacy_name: opt_names.add(opt.legacy_name.lower()) - with open(fname, "r") as f: + with open(fname) as f: for line in f: line = line.strip() if not line or line.startswith("#"): @@ -601,7 +677,7 @@ def _read_config(self, fname): args = line.split("=", 1) if len(args) != 2: - raise ConfigurationError("zeekctl config syntax error: %s" % line) + raise ConfigurationError(f"zeekctl config syntax error: {line}") key, val = args # Option names are not case-sensitive. @@ -610,7 +686,7 @@ def _read_config(self, fname): # Warn about unrecognized options, but we can't check plugin # options here because no plugins have been loaded yet. if "." not in key and key not in opt_names: - self.ui.warn("ignoring unrecognized zeekctl option: %s" % key) + self.ui.warn(f"ignoring unrecognized zeekctl option: {key}") continue # if the key already exists, just overwrite with new value @@ -624,7 +700,9 @@ def _read_config(self, fname): try: config[key] = type_converters[opt.type](config[key]) except ValueError: - raise ConfigurationError("zeekctl option '%s' has invalid value '%s' for type %s" % (key, config[key], opt.type)) + raise ConfigurationError( + f"zeekctl option '{key}' has invalid value '{config[key]}' for type {opt.type}" + ) return config @@ -675,7 +753,12 @@ def _get_local_addrs_ifconfig(self): # On Linux, ifconfig is often not in the user's standard PATH. # Also need to set LANG here to ensure that the output of ifconfig # is consistent regardless of which locale the system is using. - proc = subprocess.Popen(["PATH=$PATH:/sbin:/usr/sbin LANG=C ifconfig", "-a"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + proc = subprocess.Popen( + ["PATH=$PATH:/sbin:/usr/sbin LANG=C ifconfig", "-a"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + ) out, err = proc.communicate() except OSError: return False @@ -720,7 +803,9 @@ def _get_local_addrs_ifconfig(self): localaddrs.append(addrstr) if not localaddrs: - self.ui.warn('failed to extract IP addresses from the "ifconfig -a" command output') + self.ui.warn( + 'failed to extract IP addresses from the "ifconfig -a" command output' + ) return localaddrs @@ -728,7 +813,12 @@ def _get_local_addrs_ifconfig(self): def _get_local_addrs_ip(self): try: # On Linux, "ip" is sometimes not in the user's standard PATH. - proc = subprocess.Popen(["PATH=$PATH:/sbin:/usr/sbin ip address"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + proc = subprocess.Popen( + ["PATH=$PATH:/sbin:/usr/sbin ip address"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + ) out, err = proc.communicate() except OSError: return False @@ -759,7 +849,9 @@ def _get_local_addrs_ip(self): localaddrs.append(addrstr) if not localaddrs: - self.ui.warn('failed to extract IP addresses from the "ip address" command output') + self.ui.warn( + 'failed to extract IP addresses from the "ip address" command output' + ) return localaddrs @@ -775,11 +867,15 @@ def _get_local_addrs(self): # Fallback to localhost if we did not find any IP addrs. if not localaddrs: - self.ui.warn('failed to find local IP addresses with "ifconfig -a" or "ip address" commands') + self.ui.warn( + 'failed to find local IP addresses with "ifconfig -a" or "ip address" commands' + ) localaddrs = ["127.0.0.1", "::1"] try: - addrinfo = socket.getaddrinfo(socket.gethostname(), None, 0, 0, socket.SOL_TCP) + addrinfo = socket.getaddrinfo( + socket.gethostname(), None, 0, 0, socket.SOL_TCP + ) except Exception: addrinfo = [] @@ -802,13 +898,17 @@ def _update_cfg_state(self): def is_cfg_changed(self): try: if "configchksum" in self.state: - if self.state["configchksum"] != self._get_zeekctlcfg_hash(filehash=True): + if self.state["configchksum"] != self._get_zeekctlcfg_hash( + filehash=True + ): return True if "confignodechksum" in self.state: - if self.state["confignodechksum"] != self._get_nodecfg_hash(filehash=True): + if self.state["confignodechksum"] != self._get_nodecfg_hash( + filehash=True + ): return True - except IOError: + except OSError: # If we can't read the config files, then do nothing. pass @@ -816,7 +916,9 @@ def is_cfg_changed(self): # Check if the user has already run the "install" or "deploy" commands. def is_zeekctl_installed(self): - return os.path.isfile(os.path.join(self.config["policydirsiteinstallauto"], "zeekctl-config.zeek")) + return os.path.isfile( + os.path.join(self.config["policydirsiteinstallauto"], "zeekctl-config.zeek") + ) # Warn user to run zeekctl deploy if any changes are detected to zeekctl # config options, node config, Zeek version, or if certain state variables @@ -829,7 +931,9 @@ def warn_zeekctl_install(self): nodehash = self._get_nodecfg_hash() if nodehash != self.state["hash-nodecfg"]: - self.ui.warn('zeekctl node config has changed (run the zeekctl "deploy" command)') + self.ui.warn( + 'zeekctl node config has changed (run the zeekctl "deploy" command)' + ) return else: @@ -849,7 +953,9 @@ def warn_zeekctl_install(self): version = self._get_zeek_version() if version != oldversion: - self.ui.warn('new zeek version detected (run the zeekctl "deploy" command)') + self.ui.warn( + 'new zeek version detected (run the zeekctl "deploy" command)' + ) return else: missingstate = True @@ -858,7 +964,9 @@ def warn_zeekctl_install(self): if "hash-zeekctlcfg" in self.state: cfghash = self._get_zeekctlcfg_hash() if cfghash != self.state["hash-zeekctlcfg"]: - self.ui.warn('zeekctl config has changed (run the zeekctl "deploy" command)') + self.ui.warn( + 'zeekctl config has changed (run the zeekctl "deploy" command)' + ) return else: missingstate = True @@ -867,7 +975,9 @@ def warn_zeekctl_install(self): # (this would most likely indicate an upgrade install was performed # over an old version that didn't have the state.db file). if missingstate: - self.ui.warn('state database needs updating (run the zeekctl "deploy" command)') + self.ui.warn( + 'state database needs updating (run the zeekctl "deploy" command)' + ) return # Warn if there might be any dangling Zeek nodes (i.e., nodes that are @@ -899,7 +1009,9 @@ def _warn_dangling_zeek(self): # If node is not a known node or if host has changed, then # we must warn about dangling Zeek node. if nname not in nodes or hname != nodes[nname]: - self.ui.warn('Zeek node "%s" possibly still running on host "%s" (PID %s)' % (nname, hname, pid)) + self.ui.warn( + f'Zeek node "{nname}" possibly still running on host "{hname}" (PID {pid})' + ) # Set the "expected running" flag to False so cron doesn't try # to start this node. expectkey = key.replace("-pid", "-expect-running") @@ -910,7 +1022,7 @@ def _warn_dangling_zeek(self): # Return a hash value (as a string) of the current zeekctl configuration. def _get_zeekctlcfg_hash(self, filehash=False): if filehash: - with open(self.cfgfile, "r") as ff: + with open(self.cfgfile) as ff: data = ff.read() else: data = str(sorted(self.config.items())) @@ -924,12 +1036,20 @@ def _get_zeekctlcfg_hash(self, filehash=False): # Return a hash value (as a string) of the current zeekctl node config. def _get_nodecfg_hash(self, filehash=False): if filehash: - with open(self.nodecfg, "r") as ff: + with open(self.nodecfg) as ff: data = ff.read() else: nn = [] for n in self.nodes(): - nn.append(tuple([(key, val) for key, val in n.items() if not key.startswith("_")])) + nn.append( + tuple( + [ + (key, val) + for key, val in n.items() + if not key.startswith("_") + ] + ) + ) data = str(nn) data = data.encode() @@ -952,21 +1072,23 @@ def _get_zeek_version(self): zeek = self.config["zeek"] if not os.path.lexists(zeek): - raise ConfigurationError("cannot find Zeek binary: %s" % zeek) + raise ConfigurationError(f"cannot find Zeek binary: {zeek}") version = "" - success, output = execute.run_localcmd("%s -v" % zeek) + success, output = execute.run_localcmd(f"{zeek} -v") if success and output: version = output.splitlines()[-1] else: msg = " with no output" if output: - msg = " with output:\n%s" % output - raise RuntimeEnvironmentError('running "zeek -v" failed%s' % msg) + msg = f" with output:\n{output}" + raise RuntimeEnvironmentError(f'running "zeek -v" failed{msg}') match = re.search(".* version ([^ ]*).*$", version) if not match: - raise RuntimeEnvironmentError('cannot determine Zeek version ("zeek -v" output: %s)' % version.strip()) + raise RuntimeEnvironmentError( + f'cannot determine Zeek version ("zeek -v" output: {version.strip()})' + ) version = match.group(1) # If zeek is built with the "--enable-debug" configure option, then it @@ -984,8 +1106,7 @@ def _is_valid_addr(ipstr): socket.inet_pton(socket.AF_INET6, ipstr) else: socket.inet_pton(socket.AF_INET, ipstr) - except socket.error: + except OSError: return False return True - diff --git a/ZeekControl/control.py b/ZeekControl/control.py index d9d72788..a653023b 100644 --- a/ZeekControl/control.py +++ b/ZeekControl/control.py @@ -1,20 +1,14 @@ # Functions to control the nodes' operations. -from collections import namedtuple import glob +import logging import os import shutil import time -import logging +from collections import namedtuple -from ZeekControl import execute -from ZeekControl import events -from ZeekControl import util -from ZeekControl import config -from ZeekControl import install -from ZeekControl import cron +from ZeekControl import cmdresult, config, cron, events, execute, install, util from ZeekControl import node as node_mod -from ZeekControl import cmdresult # Waits for the nodes' Zeek processes to reach the given status. @@ -27,7 +21,7 @@ def _make_zeek_params(node, live): try: # Interface name needs quotes so that shell doesn't interpret any # potential metacharacters in the name. - args += ["-i", "'%s'" % node.interface] + args += ["-i", f"'{node.interface}'"] except AttributeError: pass @@ -44,9 +38,9 @@ def _make_zeek_params(node, live): args += ["-p", "standalone"] for prefix in config.Config.prefixes.split(":"): - args += ["-p", "%s" % prefix] + args += ["-p", f"{prefix}"] - args += ["-p", "%s" % node.name] + args += ["-p", f"{node.name}"] # The order of loaded scripts is as follows: # 1) SitePolicyScripts (local.zeek by default) gives a common set of loaded @@ -78,13 +72,14 @@ def _make_zeek_params(node, live): return args + # Build the environment variables for the given node. def _make_env_params(node, returnlist=False): envs = [] if not node_mod.is_standalone(node): - envs.append("CLUSTER_NODE=%s" % node.name) + envs.append(f"CLUSTER_NODE={node.name}") - envs += ["%s=%s" % (key, val) for (key, val) in sorted(node.env_vars.items())] + envs += [f"{key}={val}" for (key, val) in sorted(node.env_vars.items())] if returnlist: envlist = [("-v", i) for i in envs] @@ -121,7 +116,7 @@ def start(self, nodes): self._start_nodes(loggers, results) if not results.ok: - for n in (manager + proxies + workers): + for n in manager + proxies + workers: results.set_node_fail(n) return results @@ -129,7 +124,7 @@ def start(self, nodes): self._start_nodes(manager, results) if not results.ok: - for n in (proxies + workers): + for n in proxies + workers: results.set_node_fail(n) return results @@ -146,14 +141,13 @@ def start(self, nodes): return results - # Starts the given nodes. def _start_nodes(self, nodes, results): - self.ui.info("starting %s ..." % node_mod.nodes_describe(nodes)) + self.ui.info(f"starting {node_mod.nodes_describe(nodes)} ...") filtered = [] # Ignore nodes which are still running. - for (node, isrunning) in self._isrunning(nodes): + for node, isrunning in self._isrunning(nodes): if not isrunning: filtered += [node] @@ -162,17 +156,21 @@ def _start_nodes(self, nodes, results): # Generate crash report for any crashed nodes. crashed = [node for node in nodes if node.hasCrashed()] if crashed: - self.ui.info("creating crash report for previously crashed nodes: %s" % ", ".join([n.name for n in crashed])) + self.ui.info( + "creating crash report for previously crashed nodes: {}".format( + ", ".join([n.name for n in crashed]) + ) + ) self._make_crash_reports(crashed) # Make working directories. dirs = [(node, node.cwd()) for node in nodes] nodes = [] - for (node, success, output) in self.executor.mkdirs(dirs): + for node, success, output in self.executor.mkdirs(dirs): if success: nodes += [node] else: - self.ui.error("cannot create working directory for %s" % node.name) + self.ui.error(f"cannot create working directory for {node.name}") results.set_node_fail(node) # Start Zeek process. @@ -186,15 +184,21 @@ def _start_nodes(self, nodes, results): pin_cpu = -1 envs = _make_env_params(node, True) - cmds += [(node, "start", envs + [node.cwd(), str(pin_cpu)] + _make_zeek_params(node, True))] + cmds += [ + ( + node, + "start", + envs + [node.cwd(), str(pin_cpu)] + _make_zeek_params(node, True), + ) + ] nodes = [] # Note: the shell is used to interpret the command because zeekargs # might contain quoted arguments. - for (node, success, output) in self.executor.run_helper(cmds, shell=True): + for node, success, output in self.executor.run_helper(cmds, shell=True): if success: if not output: - self.ui.error("failed to get PID of %s" % node.name) + self.ui.error(f"failed to get PID of {node.name}") results.set_node_fail(node) continue @@ -202,14 +206,14 @@ def _start_nodes(self, nodes, results): try: pid = int(pidstr) except ValueError: - self.ui.error("invalid PID for %s: %s" % (node.name, pidstr)) + self.ui.error(f"invalid PID for {node.name}: {pidstr}") results.set_node_fail(node) continue nodes += [node] node.setPID(pid) else: - self.ui.error('cannot start %s; check output of "diag"' % node.name) + self.ui.error(f'cannot start {node.name}; check output of "diag"') results.set_node_fail(node) if output: self.ui.error(output) @@ -218,7 +222,7 @@ def _start_nodes(self, nodes, results): hanging = [] running = [] - for (node, success) in self._waitforzeeks(nodes, "RUNNING", 3, True): + for node, success in self._waitforzeeks(nodes, "RUNNING", 3, True): if success: running += [node] else: @@ -229,13 +233,15 @@ def _start_nodes(self, nodes, results): # that the process has been started (_waitforzeeks ensures that). # If by now there is not a TERMINATED status, we assume that it # is doing fine and will move on to RUNNING once DNS is done. - for (node, success) in self._waitforzeeks(hanging, "TERMINATED", 0, False): + for node, success in self._waitforzeeks(hanging, "TERMINATED", 0, False): if success: - self.ui.error('%s terminated immediately after starting; check output with "diag"' % node.name) + self.ui.error( + f'{node.name} terminated immediately after starting; check output with "diag"' + ) node.clearPID() results.set_node_fail(node) else: - self.ui.info("(%s still initializing)" % node.name) + self.ui.info(f"({node.name} still initializing)") running += [node] for node in running: @@ -245,7 +251,6 @@ def _start_nodes(self, nodes, results): return results def _isrunning(self, nodes, setcrashed=True): - results = [] cmds = [] @@ -257,11 +262,11 @@ def _isrunning(self, nodes, setcrashed=True): cmds += [(node, "check-pid", [str(pid)])] - for (node, success, output) in self.executor.run_helper(cmds): + for node, success, output in self.executor.run_helper(cmds): # If we cannot run the helper script, then we ignore this node # because the process might actually be running but we can't tell. if not success: - self.ui.error("failed to run check-pid on node %s" % node.name) + self.ui.error(f"failed to run check-pid on node {node.name}") continue running = output.strip() == "running" @@ -287,7 +292,7 @@ def _waitforzeeks(self, nodes, status, timeout, ensurerunning): # Determine set of nodes still to check. todo = {} - for (node, isrunning) in running: + for node, isrunning in running: if isrunning: todo[node.name] = node else: @@ -303,9 +308,9 @@ def _waitforzeeks(self, nodes, status, timeout, ensurerunning): # Check nodes' .status file cmds = [] for node in nodelist: - cmds += [(node, "first-line", ["%s/.status" % node.cwd()])] + cmds += [(node, "first-line", [f"{node.cwd()}/.status"])] - for (node, success, output) in self.executor.run_helper(cmds): + for node, success, output in self.executor.run_helper(cmds): if not success or not output: continue @@ -320,7 +325,7 @@ def _waitforzeeks(self, nodes, status, timeout, ensurerunning): del todo[node.name] results += [(node, False)] - for (node, isrunning) in running: + for node, isrunning in running: if node.name in todo and not isrunning: # Alright, a dead node's status will not change anymore. del todo[node.name] @@ -354,7 +359,7 @@ def _log_action(self, node, action): return t = time.time() with open(self.config.statslog, "a") as out: - out.write("%s %s action %s\n" % (t, node, action)) + out.write(f"{t} {node} action {action}\n") # Do a "post-terminate crash" for the given nodes. def _make_crash_reports(self, nodes): @@ -366,9 +371,11 @@ def _make_crash_reports(self, nodes): msg_header_no_backtrace = "This crash report does not include a backtrace. In order for crash reports\nto be useful when Zeek crashes, a backtrace is needed.\n" postterminate = os.path.join(self.config.scriptsdir, "post-terminate") - cmds = [(node, postterminate, [node.type, node.cwd(), "crash"]) for node in nodes] + cmds = [ + (node, postterminate, [node.type, node.cwd(), "crash"]) for node in nodes + ] - for (node, success, output) in self.executor.run_cmds(cmds): + for node, success, output in self.executor.run_cmds(cmds): if success: crashreport = output @@ -381,11 +388,17 @@ def _make_crash_reports(self, nodes): else: msg = msg_header_no_backtrace + crashreport - msuccess, moutput = self._sendmail("Crash report from %s" % node.name, msg) + msuccess, moutput = self._sendmail( + f"Crash report from {node.name}", msg + ) if not msuccess: - self.ui.error("error occurred while trying to send mail: %s" % moutput) + self.ui.error( + f"error occurred while trying to send mail: {moutput}" + ) else: - self.ui.error("error running post-terminate for %s:\n%s" % (node.name, output)) + self.ui.error( + f"error running post-terminate for {node.name}:\n{output}" + ) node.clearCrashed() @@ -393,7 +406,9 @@ def _sendmail(self, subject, body): if not self.config.sendmail: return True, "" - cmd = "%s '%s'" % (os.path.join(self.config.scriptsdir, "send-mail"), subject) + cmd = "{} '{}'".format( + os.path.join(self.config.scriptsdir, "send-mail"), subject + ) return execute.run_localcmd(cmd, inputtext=body) # Stop Zeek processes on nodes. @@ -411,7 +426,7 @@ def stop(self, nodes): self._stop_nodes(workers, results) if not results.ok: - for n in (proxies + manager + loggers): + for n in proxies + manager + loggers: results.set_node_fail(n) return results @@ -419,7 +434,7 @@ def stop(self, nodes): self._stop_nodes(proxies, results) if not results.ok: - for n in (manager + loggers): + for n in manager + loggers: results.set_node_fail(n) return results @@ -437,12 +452,12 @@ def stop(self, nodes): return results def _stop_nodes(self, nodes, results): - self.ui.info("stopping %s ..." % node_mod.nodes_describe(nodes)) + self.ui.info(f"stopping {node_mod.nodes_describe(nodes)} ...") running = [] # Check which nodes are still running. - for (node, isrunning) in self._isrunning(nodes): + for node, isrunning in self._isrunning(nodes): if isrunning: running += [node] else: @@ -451,7 +466,11 @@ def _stop_nodes(self, nodes, results): # Generate crash report for any crashed nodes. crashed = [node for node in nodes if node.hasCrashed()] if crashed: - self.ui.info("creating crash report for previously crashed nodes: %s" % ", ".join([n.name for n in crashed])) + self.ui.info( + "creating crash report for previously crashed nodes: {}".format( + ", ".join([n.name for n in crashed]) + ) + ) self._make_crash_reports(crashed) # Helper function to stop nodes with given signal. @@ -463,11 +482,11 @@ def stop(nodes, signal): return self.executor.run_helper(cmds) # Stop nodes. - for (node, success, output) in stop(running, 15): + for node, success, output in stop(running, 15): if not success: # Give up on this node. Most likely either we cannot connect # to the host, or we don't have permission to kill the process. - self.ui.error("unable to stop %s: %s" % (node.name, output)) + self.ui.error(f"unable to stop {node.name}: {output}") results.set_node_fail(node) running.remove(node) @@ -477,17 +496,19 @@ def stop(nodes, signal): # Check whether they terminated. terminated = [] kill = [] - for (node, success) in self._waitforzeeks(running, "TERMINATED", self.config.stoptimeout, False): + for node, success in self._waitforzeeks( + running, "TERMINATED", self.config.stoptimeout, False + ): if not success: # Check whether it crashed during shutdown ... result = self._isrunning([node]) - for (node, isrunning) in result: + for node, isrunning in result: if isrunning: - self.ui.info("%s did not terminate ... killing ..." % node.name) + self.ui.info(f"{node.name} did not terminate ... killing ...") kill += [node] else: # crashed flag is set by _isrunning(). - self.ui.info("%s crashed during shutdown" % node.name) + self.ui.info(f"{node.name} crashed during shutdown") if kill: # Kill those which did not terminate gracefully. @@ -504,11 +525,10 @@ def stop(nodes, signal): todo[node.name] = node while True: - nodelist = sorted(todo.values(), key=node_mod.sortnode) running = self._isrunning(nodelist, setcrashed=False) - for (node, isrunning) in running: + for node, isrunning in running: if node.name in todo and not isrunning: # Alright, it's gone. del todo[node.name] @@ -540,11 +560,13 @@ def stop(nodes, signal): cmds += [(node, postterminate, [node.type, node.cwd(), crashflag])] - for (node, success, output) in self.executor.run_cmds(cmds): + for node, success, output in self.executor.run_cmds(cmds): if success: self._log_action(node, "stopped") else: - self.ui.error("error running post-terminate for %s:\n%s" % (node.name, output)) + self.ui.error( + f"error running post-terminate for {node.name}:\n{output}" + ) self._log_action(node, "stopped (failed)") node.clearPID() @@ -552,7 +574,6 @@ def stop(nodes, signal): return results - # Output status summary for nodes. def status(self, nodes): results = cmdresult.CmdResult() @@ -566,14 +587,20 @@ def status(self, nodes): running = [] cmds = [] - for (node, isrunning) in nodestatus: + for node, isrunning in nodestatus: if isrunning: running += [node] - cmds += [(node, "first-line", ["%s/.status" % node.cwd(), "%s/.startup" % node.cwd()])] + cmds += [ + ( + node, + "first-line", + [f"{node.cwd()}/.status", f"{node.cwd()}/.startup"], + ) + ] statuses = {} startups = {} - for (n, success, output) in self.executor.run_helper(cmds): + for n, success, output in self.executor.run_helper(cmds): out = output.splitlines() try: val = out[0].split()[0].lower() if (success and out[0]) else "???" @@ -593,7 +620,7 @@ def status(self, nodes): self.ui.info("Getting peer status ...") peers = {} nodes = [n for n in running if statuses[n.name] == "running"] - for (node, success, args) in self._query_peerstatus(nodes): + for node, success, args in self._query_peerstatus(nodes): if success and args: peers[node.name] = [] for f in args[0].split(): @@ -604,7 +631,7 @@ def status(self, nodes): if val: peers[node.name] += [val] - for (node, isrunning) in nodestatus: + for node, isrunning in nodestatus: node_info = { "name": node.name, "type": node.type, @@ -645,34 +672,35 @@ def check(self, nodes): def scripts(self, nodes, check): return self._check_config(nodes, not check, True) - def _check_config(self, nodes, installed, list_scripts): results = cmdresult.CmdResult() - nodetmpdirs = [(node, os.path.join(self.config.tmpdir, "check-config-%s" % node.name)) for node in nodes] + nodetmpdirs = [ + (node, os.path.join(self.config.tmpdir, f"check-config-{node.name}")) + for node in nodes + ] nodes = [] - for (node, cwd) in nodetmpdirs: + for node, cwd in nodetmpdirs: if os.path.isdir(cwd): try: shutil.rmtree(cwd) except OSError as err: - self.ui.error("cannot remove directory %s: %s" % (cwd, err)) + self.ui.error(f"cannot remove directory {cwd}: {err}") results.ok = False return results try: os.makedirs(cwd) except OSError as err: - self.ui.error("cannot create temporary directory: %s" % err) + self.ui.error(f"cannot create temporary directory: {err}") results.ok = False return results nodes += [(node, cwd)] cmds = [] - for (node, cwd) in nodes: - + for node, cwd in nodes: env = _make_env_params(node) installed_policies = "1" if installed else "0" @@ -685,20 +713,29 @@ def _check_config(self, nodes, installed, list_scripts): results.ok = False return results - if not install.make_zeekctl_config_policy(cwd, self.ui, self.pluginregistry): + if not install.make_zeekctl_config_policy( + cwd, self.ui, self.pluginregistry + ): results.ok = False return results - cmd = os.path.join(self.config.scriptsdir, "check-config") + " %s %s %s %s" % (installed_policies, print_scripts, cwd, " ".join(_make_zeek_params(node, False))) + cmd = os.path.join( + self.config.scriptsdir, "check-config" + ) + " {} {} {} {}".format( + installed_policies, + print_scripts, + cwd, + " ".join(_make_zeek_params(node, False)), + ) cmd += " zeekctl/check" cmds += [((node, cwd), cmd, env, None)] - for ((node, cwd), success, output) in execute.run_localcmds(cmds): + for (node, cwd), success, output in execute.run_localcmds(cmds): results.set_node_output(node, success, output) try: shutil.rmtree(cwd) - except OSError as err: + except OSError: # Don't bother reporting an error now. pass @@ -708,16 +745,25 @@ def _query_peerstatus(self, nodes): running = self._isrunning(nodes) eventlist = [] - for (node, isrunning) in running: + for node, isrunning in running: if isrunning: - eventlist += [(node, "Control::peer_status_request", [], "Control::peer_status_response")] + eventlist += [ + ( + node, + "Control::peer_status_request", + [], + "Control::peer_status_response", + ) + ] return events.send_events_parallel(eventlist, config.Config.controltopic) def execute_cmd(self, nodes, cmd): results = cmdresult.CmdResult() - for node, success, out in self.executor.run_shell_cmds([(n, cmd) for n in nodes]): + for node, success, out in self.executor.run_shell_cmds( + [(n, cmd) for n in nodes] + ): results.set_node_output(node, success, out) return results @@ -729,22 +775,21 @@ def cleanup(self, nodes, cleantmp=False): # Given a set of node names "orig" and command results "res", add # all node names to "orig" that have a failed result in "res". def addfailed(orig, res): - for (node, status, output) in res: + for node, status, output in res: # if status is Fail, then add the node name if not status: orig.add(node.name) return orig - results = cmdresult.CmdResult() result = self._isrunning(nodes) - running = [node for (node, on) in result if on] + running = [node for (node, on) in result if on] notrunning = [node for (node, on) in result if not on] for node in running: - self.ui.info(" %s is still running, not cleaning work directory" % node) + self.ui.info(f" {node} is still running, not cleaning work directory") results1 = self.executor.rmdirs([(n, n.cwd()) for n in notrunning]) results2 = self.executor.mkdirs([(n, n.cwd()) for n in notrunning]) @@ -756,9 +801,13 @@ def addfailed(orig, res): node.clearCrashed() if cleantmp: - self.ui.info("cleaning %s ..." % self.config.tmpdir) - results3 = self.executor.rmdirs([(n, self.config.tmpdir) for n in running + notrunning]) - results4 = self.executor.mkdirs([(n, self.config.tmpdir) for n in running + notrunning]) + self.ui.info(f"cleaning {self.config.tmpdir} ...") + results3 = self.executor.rmdirs( + [(n, self.config.tmpdir) for n in running + notrunning] + ) + results4 = self.executor.mkdirs( + [(n, self.config.tmpdir) for n in running + notrunning] + ) failed = addfailed(failed, results3) failed = addfailed(failed, results4) @@ -777,9 +826,9 @@ def diag(self, nodes): crashdiag = os.path.join(self.config.scriptsdir, "crash-diag") cmds = [(node, crashdiag, [node.cwd()]) for node in nodes] - for (node, success, output) in self.executor.run_cmds(cmds): + for node, success, output in self.executor.run_cmds(cmds): if not success: - errmsgs = "error running crash-diag for %s\n" % node.name + errmsgs = f"error running crash-diag for {node.name}\n" errmsgs += output results.set_node_output(node, False, errmsgs) continue @@ -792,10 +841,16 @@ def capstats(self, nodes, interval): results = cmdresult.CmdResult() if not self.config.capstatspath: - results.set_node_data(nodes[0], False, {"output": 'Error: cannot run capstats because zeekctl option "capstatspath" is not defined'}) + results.set_node_data( + nodes[0], + False, + { + "output": 'Error: cannot run capstats because zeekctl option "capstatspath" is not defined' + }, + ) return results - for (node, netif, success, vals) in self.get_capstats_output(nodes, interval): + for node, netif, success, vals in self.get_capstats_output(nodes, interval): if not success: vals = {"output": vals} results.set_node_data(node, success, vals) @@ -837,13 +892,16 @@ def get_capstats_output(self, nodes, interval): nodenetifs.append((node, netif)) capstats = self.config.capstatspath - cmds = [(node, capstats, ["-I", str(interval), "-n", "1", "-i", interface]) for (node, interface) in nodenetifs] + cmds = [ + (node, capstats, ["-I", str(interval), "-n", "1", "-i", interface]) + for (node, interface) in nodenetifs + ] outputs = self.executor.run_cmds(cmds) totals = {} - for (node, success, output) in outputs: + for node, success, output in outputs: netif = self._capstats_interface(node) if output: @@ -853,19 +911,35 @@ def get_capstats_output(self, nodes, interval): if not success: if output: - results += [(node, netif, False, "%s: capstats failed (%s)" % (node.name, outputline))] + results += [ + ( + node, + netif, + False, + f"{node.name}: capstats failed ({outputline})", + ) + ] else: - results += [(node, netif, False, "%s: cannot execute capstats" % node.name)] + results += [ + (node, netif, False, f"{node.name}: cannot execute capstats") + ] continue if not output: - results += [(node, netif, False, "%s: no capstats output" % node.name)] + results += [(node, netif, False, f"{node.name}: no capstats output")] continue fields = outputline.split()[1:] if not fields: - results += [(node, netif, False, "%s: unexpected capstats output: %s" % (node.name, outputline))] + results += [ + ( + node, + netif, + False, + f"{node.name}: unexpected capstats output: {outputline}", + ) + ] continue vals = {} @@ -882,7 +956,14 @@ def get_capstats_output(self, nodes, interval): totals[key] = val except ValueError: - results += [(node, netif, False, "%s: unexpected capstats output: %s" % (node.name, outputline))] + results += [ + ( + node, + netif, + False, + f"{node.name}: unexpected capstats output: {outputline}", + ) + ] continue results += [(node, netif, True, vals)] @@ -893,7 +974,6 @@ def get_capstats_output(self, nodes, interval): return results - # Convert a Zeek network interface name to one that capstats can use. def _capstats_interface(self, node): netif = node.interface @@ -924,9 +1004,22 @@ def _capstats_interface(self, node): def df(self, nodes): results = cmdresult.CmdResult() - DiskInfo = namedtuple("DiskInfo", ("fs", "total", "used", "available", "percent")) - dirs = ("logdir", "bindir", "helperdir", "cfgdir", "spooldir", - "policydir", "libdir", "libdir64", "tmpdir", "staticdir", "scriptsdir") + DiskInfo = namedtuple( + "DiskInfo", ("fs", "total", "used", "available", "percent") + ) + dirs = ( + "logdir", + "bindir", + "helperdir", + "cfgdir", + "spooldir", + "policydir", + "libdir", + "libdir64", + "tmpdir", + "staticdir", + "scriptsdir", + ) df = {} for node in nodes: @@ -935,7 +1028,11 @@ def df(self, nodes): cmds = [] for node in nodes: for key in dirs: - if key == "logdir" and not (node_mod.is_logger(node) or node_mod.is_manager(node) or node_mod.is_standalone(node)): + if key == "logdir" and not ( + node_mod.is_logger(node) + or node_mod.is_manager(node) + or node_mod.is_standalone(node) + ): # Don't need to check this on nodes that don't write logs. continue @@ -947,7 +1044,7 @@ def df(self, nodes): cmds += [(node, "df", [path])] - for (node, success, output) in self.executor.run_helper(cmds): + for node, success, output in self.executor.run_helper(cmds): if success: fields = output.split() if len(fields) != 4: @@ -964,7 +1061,7 @@ def df(self, nodes): used = float(fields[2]) avail = float(fields[3]) except ValueError as err: - df[node.name]["FAIL"] = "bad output from df helper: %s" % err + df[node.name]["FAIL"] = f"bad output from df helper: {err}" continue perc = used * 100.0 / (used + avail) @@ -983,7 +1080,6 @@ def df(self, nodes): # dict which maps tags to their values. Tags are "pid", "vsize", # "rss", "cpu", and "cmd". def get_top_output(self, nodes): - results = [] running = self._isrunning(nodes) @@ -992,7 +1088,7 @@ def get_top_output(self, nodes): pids = {} - for (node, isrunning) in running: + for node, isrunning in running: if isrunning: pids[node.name] = node.getPID() else: @@ -1006,7 +1102,7 @@ def get_top_output(self, nodes): hosts = {} # Now run top once per host. - for node in nodes: # Do the loop again to keep the order. + for node in nodes: # Do the loop again to keep the order. if node.name not in pids: continue @@ -1021,7 +1117,7 @@ def get_top_output(self, nodes): return results res = {} - for (node, success, output) in self.executor.run_helper(cmds): + for node, success, output in self.executor.run_helper(cmds): res[node.host] = success, output # Gather results for all the nodes that are running @@ -1035,7 +1131,7 @@ def get_top_output(self, nodes): # The error msg gets written to stats.log, so we only want # the first line. errmsg = output.splitlines()[0] if output else "" - results += [(node, "top failed: %s" % errmsg, {})] + results += [(node, f"top failed: {errmsg}", {})] continue if not output: @@ -1051,7 +1147,7 @@ def get_top_output(self, nodes): procinfo = line.split() break except (IndexError, ValueError) as err: - results += [(node, "bad output from top: %s" % err, {})] + results += [(node, f"bad output from top: {err}", {})] continue if not procinfo: @@ -1064,12 +1160,14 @@ def get_top_output(self, nodes): try: pid = int(procinfo[0]) vals["pid"] = pid - vals["vsize"] = int(float(procinfo[1])) #May be something like 2.17684e+9 + vals["vsize"] = int( + float(procinfo[1]) + ) # May be something like 2.17684e+9 vals["rss"] = int(float(procinfo[2])) vals["cpu"] = procinfo[3] vals["cmd"] = " ".join(procinfo[4:]) except (IndexError, ValueError) as err: - results += [(node, "unexpected top output: %s" % err, {})] + results += [(node, f"unexpected top output: {err}", {})] continue results += [(node, None, vals)] @@ -1080,11 +1178,18 @@ def get_top_output(self, nodes): def top(self, nodes): results = cmdresult.CmdResult() - for (node, error, vals) in self.get_top_output(nodes): - top_info = {"name": node.name, "type": node.type, - "host": node.host, "pid": None, - "vsize": None, "rss": None, "cpu": None, - "cmd": None, "error": None} + for node, error, vals in self.get_top_output(nodes): + top_info = { + "name": node.name, + "type": node.type, + "host": node.host, + "pid": None, + "vsize": None, + "rss": None, + "cpu": None, + "cmd": None, + "error": None, + } if error: top_info["error"] = error results.set_node_data(node, False, {"procs": top_info}) @@ -1102,15 +1207,24 @@ def print_id(self, nodes, id): running = self._isrunning(nodes) eventlist = [] - for (node, isrunning) in running: + for node, isrunning in running: if isrunning: - eventlist += [(node, "Control::id_value_request", [id], "Control::id_value_response")] + eventlist += [ + ( + node, + "Control::id_value_request", + [id], + "Control::id_value_response", + ) + ] if not eventlist: results.set_node_output(nodes[0], False, "no running instances of Zeek") return results - for (node, success, args) in events.send_events_parallel(eventlist, config.Config.controltopic): + for node, success, args in events.send_events_parallel( + eventlist, config.Config.controltopic + ): if success: out = "\n".join(args) else: @@ -1119,20 +1233,26 @@ def print_id(self, nodes, id): return results - def _query_netstats(self, nodes): running = self._isrunning(nodes) eventlist = [] - for (node, isrunning) in running: + for node, isrunning in running: if isrunning: - eventlist += [(node, "Control::net_stats_request", [], "Control::net_stats_response")] + eventlist += [ + ( + node, + "Control::net_stats_request", + [], + "Control::net_stats_response", + ) + ] return events.send_events_parallel(eventlist, config.Config.controltopic) def peerstatus(self, nodes): results = cmdresult.CmdResult() - for (node, success, args) in self._query_peerstatus(nodes): + for node, success, args in self._query_peerstatus(nodes): if success: if args: out = args[0] @@ -1149,7 +1269,7 @@ def peerstatus(self, nodes): def netstats(self, nodes): results = cmdresult.CmdResult() - for (node, success, args) in self._query_netstats(nodes): + for node, success, args in self._query_netstats(nodes): if success: if args: out = args[0].strip() @@ -1168,7 +1288,7 @@ def process(self, trace, zeek_options, zeek_scripts): results = cmdresult.CmdResult() if not os.path.isfile(trace): - self.ui.error("trace file not found: %s" % trace) + self.ui.error(f"trace file not found: {trace}") results.ok = False return results @@ -1183,14 +1303,14 @@ def process(self, trace, zeek_options, zeek_scripts): try: shutil.rmtree(cwd) except OSError as err: - self.ui.error("cannot remove directory: %s" % err) + self.ui.error(f"cannot remove directory: {err}") results.ok = False return results try: os.makedirs(cwd) except OSError as err: - self.ui.error("cannot create directory: %s" % err) + self.ui.error(f"cannot create directory: {err}") results.ok = False return results @@ -1202,7 +1322,10 @@ def process(self, trace, zeek_options, zeek_scripts): if zeek_scripts: zeek_args += " " + " ".join(zeek_scripts) - cmd = os.path.join(self.config.scriptsdir, "run-zeek-on-trace") + " %s %s %s %s" % (0, cwd, trace, zeek_args) + cmd = ( + os.path.join(self.config.scriptsdir, "run-zeek-on-trace") + + f" {0} {cwd} {trace} {zeek_args}" + ) self.ui.info(cmd) @@ -1212,7 +1335,7 @@ def process(self, trace, zeek_options, zeek_scripts): results.ok = False self.ui.info(output) - self.ui.info("### Zeek output in %s" % cwd) + self.ui.info(f"### Zeek output in {cwd}") return results @@ -1222,22 +1345,25 @@ def install(self, local_only): try: self.config.record_zeek_version() except config.ConfigurationError as err: - self.ui.error("%s" % err) + self.ui.error(f"{err}") results.ok = False return results manager = self.config.manager() # Delete previously installed policy files to not mix things up. - policies = [self.config.policydirsiteinstall, self.config.policydirsiteinstallauto] + policies = [ + self.config.policydirsiteinstall, + self.config.policydirsiteinstallauto, + ] for dirpath in policies: if os.path.isdir(dirpath): - self.ui.info("removing old policies in %s ..." % dirpath) + self.ui.info(f"removing old policies in {dirpath} ...") try: shutil.rmtree(dirpath) except OSError as err: - self.ui.error("failed to remove directory %s: %s" % (dirpath, err)) + self.ui.error(f"failed to remove directory {dirpath}: {err}") results.ok = False return results @@ -1246,7 +1372,7 @@ def install(self, local_only): try: os.makedirs(dirpath) except OSError as err: - self.ui.error("failed to create directory: %s" % err) + self.ui.error(f"failed to create directory: {err}") results.ok = False return results @@ -1267,12 +1393,16 @@ def install(self, local_only): return results self.ui.info("generating local-networks.zeek ...") - if not install.make_local_networks(self.config.policydirsiteinstallauto, self.ui): + if not install.make_local_networks( + self.config.policydirsiteinstallauto, self.ui + ): results.ok = False return results self.ui.info("generating zeekctl-config.zeek ...") - if not install.make_zeekctl_config_policy(self.config.policydirsiteinstallauto, self.ui, self.pluginregistry): + if not install.make_zeekctl_config_policy( + self.config.policydirsiteinstallauto, self.ui, self.pluginregistry + ): results.ok = False return results @@ -1286,9 +1416,9 @@ def install(self, local_only): current = self.config.subst(os.path.join(self.config.logdir, "current")) try: util.force_symlink(node_cwd, current) - except (IOError, OSError) as err: + except OSError as err: results.ok = False - self.ui.error("failed to update symlink '%s': %s" % (current, err)) + self.ui.error(f"failed to update symlink '{current}': {err}") return results self.ui.info("generating zeekctl-config.sh ...") @@ -1324,16 +1454,20 @@ def install(self, local_only): syncs = install.get_nfssyncs() - syncs = [(dir, mirror) for (dir, mirror, optional) in syncs if not optional or os.path.exists(self.config.subst(dir))] + syncs = [ + (dir, mirror) + for (dir, mirror, optional) in syncs + if not optional or os.path.exists(self.config.subst(dir)) + ] createdirs = [self.config.subst(dir) for (dir, mirror) in syncs if not mirror] for n in nodes: for dir in createdirs: dirs.append((n, dir)) - for (node, success, output) in self.executor.mkdirs(dirs): + for node, success, output in self.executor.mkdirs(dirs): if not success: - self.ui.error("cannot create a directory on node %s" % node.name) + self.ui.error(f"cannot create a directory on node {node.name}") if output: self.ui.error(output) results.ok = False @@ -1349,7 +1483,6 @@ def install(self, local_only): return results - # Triggers all activity which is to be done regularly via cron. def cron(self, watch): if not self.config.cronenabled: @@ -1363,7 +1496,9 @@ def cron(self, watch): return cronui = cron.CronUI() - tasks = cron.CronTasks(cronui, self.config, self, self.executor, self.pluginregistry) + tasks = cron.CronTasks( + cronui, self.config, self, self.executor, self.pluginregistry + ) cronui.buffer_output() @@ -1372,7 +1507,7 @@ def cron(self, watch): # necessary. startlist = [] stoplist = [] - for (node, isrunning) in self._isrunning(self.config.nodes()): + for node, isrunning in self._isrunning(self.config.nodes()): expectrunning = node.getExpectRunning() if not isrunning and expectrunning: @@ -1381,9 +1516,9 @@ def cron(self, watch): stoplist.append(node) if startlist: - results = self.start(startlist) + self.start(startlist) if stoplist: - results = self.stop(stoplist) + self.stop(stoplist) # Check for dead hosts. tasks.check_hosts() @@ -1411,8 +1546,7 @@ def cron(self, watch): if output: success, out = self._sendmail("cron: " + output.splitlines()[0], output) if not success: - self.ui.error("zeekctl cron failed to send mail: %s" % out) - self.ui.info("Output of zeekctl cron:\n%s" % output) + self.ui.error(f"zeekctl cron failed to send mail: {out}") + self.ui.info(f"Output of zeekctl cron:\n{output}") logging.debug("cron done") - diff --git a/ZeekControl/cron.py b/ZeekControl/cron.py index 4958bc24..262d02d8 100644 --- a/ZeekControl/cron.py +++ b/ZeekControl/cron.py @@ -1,22 +1,23 @@ # Tasks which are to be done on a regular basis from cron. -from __future__ import print_function import io import os -import time import shutil +import time from ZeekControl import execute from ZeekControl import node as node_mod + class CronUI: def __init__(self): self.buffer = None def info(self, txt): if self.buffer: - self.buffer.write("%s\n" % txt) + self.buffer.write(f"{txt}\n") else: print(txt) + error = info warn = info @@ -55,38 +56,42 @@ def log_stats(self, interval): try: with open(self.config.statslog, "a") as out: - for (node, error, vals) in top: + for node, error, vals in top: if not error: - for (val, key) in sorted(vals.items()): - out.write("%s %s parent %s %s\n" % (t, node, val, key)) + for val, key in sorted(vals.items()): + out.write(f"{t} {node} parent {val} {key}\n") else: - out.write("%s %s error error %s\n" % (t, node, error)) + out.write(f"{t} {node} error error {error}\n") - for (node, netif, success, vals) in capstats: + for node, netif, success, vals in capstats: if not success: - out.write("%s %s error error %s\n" % (t, node, vals)) + out.write(f"{t} {node} error error {vals}\n") continue - for (key, val) in sorted(vals.items()): - out.write("%s %s interface %s %s\n" % (t, node, key, val)) + for key, val in sorted(vals.items()): + out.write(f"{t} {node} interface {key} {val}\n") if key == "pkts" and str(node) != "$total": # Report if we don't see packets on an interface. - tag = "lastpkts-%s" % node.name + tag = f"lastpkts-{node.name}" last = self.config.get_state(tag, default=-1.0) if self.config.mailreceivingpackets: if val == 0.0 and last != 0.0: - self.ui.info("%s is not seeing any packets on interface %s" % (node.host, netif)) + self.ui.info( + f"{node.host} is not seeing any packets on interface {netif}" + ) if val != 0.0 and last == 0.0: - self.ui.info("%s is seeing packets again on interface %s" % (node.host, netif)) + self.ui.info( + f"{node.host} is seeing packets again on interface {netif}" + ) self.config.set_state(tag, val) - except IOError as err: - self.ui.error("failed to append to file: %s" % err) + except OSError as err: + self.ui.error(f"failed to append to file: {err}") return def check_disk_space(self): @@ -95,7 +100,7 @@ def check_disk_space(self): return results = self.controller.df(self.config.hosts()) - for (node, _, dfs) in results.get_node_data(): + for node, _, dfs in results.get_node_data(): host = node.host for key, df in dfs.items(): @@ -106,7 +111,7 @@ def check_disk_space(self): fs = df.fs perc = df.percent - key = ("disk-space-%s%s" % (host, fs.replace("/", "-"))) + key = "disk-space-{}{}".format(host, fs.replace("/", "-")) if perc > 100 - minspace: last = self.config.get_state(key, default=-1) @@ -114,19 +119,24 @@ def check_disk_space(self): # Already reported. continue - self.ui.warn("Disk space low on %s:%s - %.1f%% used." % (host, fs, perc)) + self.ui.warn(f"Disk space low on {host}:{fs} - {perc:.1f}% used.") self.config.set_state(key, perc) def expire_logs(self): - if self.config.logexpireminutes == 0 and self.config.statslogexpireinterval == 0: + if ( + self.config.logexpireminutes == 0 + and self.config.statslogexpireinterval == 0 + ): return if self.config.standalone: - success, output = execute.run_localcmd(os.path.join(self.config.scriptsdir, "expire-logs")) + success, output = execute.run_localcmd( + os.path.join(self.config.scriptsdir, "expire-logs") + ) if not success: - self.ui.error("expire-logs failed\n%s" % output) + self.ui.error(f"expire-logs failed\n{output}") else: nodes = self.config.hosts(tag=node_mod.logger_group()) @@ -136,13 +146,12 @@ def expire_logs(self): expirelogs = os.path.join(self.config.scriptsdir, "expire-logs") cmds = [(node, expirelogs, []) for node in nodes] - for (node, success, output) in self.executor.run_cmds(cmds): + for node, success, output in self.executor.run_cmds(cmds): if not success: - self.ui.error("expire-logs failed for node %s\n" % node) + self.ui.error(f"expire-logs failed for node {node}\n") if output: self.ui.error(output) - def expire_crash(self): if self.config.crashexpireinterval == 0: return @@ -150,15 +159,15 @@ def expire_crash(self): expirecrash = os.path.join(self.config.scriptsdir, "expire-crash") cmds = [(node, expirecrash, []) for node in self.config.hosts()] - for (node, success, output) in self.executor.run_cmds(cmds): + for node, success, output in self.executor.run_cmds(cmds): if not success: - self.ui.error("expire-crash failed for node %s\n" % node) + self.ui.error(f"expire-crash failed for node {node}\n") if output: self.ui.error(output) def check_hosts(self): for host, status in self.executor.host_status(): - tag = "alive-%s" % host + tag = f"alive-{host}" alive = status previous = self.config.get_state(tag) @@ -167,7 +176,7 @@ def check_hosts(self): self.pluginregistry.hostStatusChanged(host, alive) if self.config.mailhostupdown: up_or_down = "up" if alive else "down" - self.ui.info("host %s %s" % (host, up_or_down)) + self.ui.info(f"host {host} {up_or_down}") self.config.set_state(tag, alive) @@ -180,36 +189,38 @@ def update_http_stats(self): try: os.makedirs(self.config.statsdir) except OSError as err: - self.ui.error("failure creating directory in zeekctl option statsdir: %s" % err) + self.ui.error( + f"failure creating directory in zeekctl option statsdir: {err}" + ) return - self.ui.info("creating directory for stats file: %s" % self.config.statsdir) + self.ui.info(f"creating directory for stats file: {self.config.statsdir}") metadat = os.path.join(self.config.statsdir, "meta.dat") try: with open(metadat, "w") as meta: for node in self.config.hosts(): - meta.write("node %s %s %s\n" % (node, node.type, node.host)) + meta.write(f"node {node} {node.type} {node.host}\n") - meta.write("time %s\n" % time.asctime()) - meta.write("version %s\n" % self.config.version) + meta.write(f"time {time.asctime()}\n") + meta.write(f"version {self.config.version}\n") success, output = execute.run_localcmd("uname -a") if success and output: # Note: "output" already has a '\n' - meta.write("os %s" % output) + meta.write(f"os {output}") else: meta.write("os \n") success, output = execute.run_localcmd("hostname") if success and output: # Note: "output" already has a '\n' - meta.write("host %s" % output) + meta.write(f"host {output}") else: meta.write("host \n") - except IOError as err: - self.ui.error("failure creating file: %s" % err) + except OSError as err: + self.ui.error(f"failure creating file: {err}") return wwwdir = os.path.join(self.config.statsdir, "www") @@ -217,34 +228,35 @@ def update_http_stats(self): try: os.makedirs(wwwdir) except OSError as err: - self.ui.error("failed to create directory: %s" % err) + self.ui.error(f"failed to create directory: {err}") return # Update the WWW data statstocsv = os.path.join(self.config.scriptsdir, "stats-to-csv") - success, output = execute.run_localcmd("%s %s %s %s" % (statstocsv, self.config.statslog, metadat, wwwdir)) + success, output = execute.run_localcmd( + f"{statstocsv} {self.config.statslog} {metadat} {wwwdir}" + ) if success: shutil.copy(metadat, wwwdir) else: - self.ui.error("error reported by stats-to-csv\n%s" % output) + self.ui.error(f"error reported by stats-to-csv\n{output}") # Append the current stats.log in spool to the one in ${statsdir} dst = os.path.join(self.config.statsdir, os.path.basename(self.config.statslog)) try: - with open(self.config.statslog, "r") as fsrc: + with open(self.config.statslog) as fsrc: with open(dst, "a") as fdst: shutil.copyfileobj(fsrc, fdst) - except IOError as err: - self.ui.error("failed to append file: %s" % err) + except OSError as err: + self.ui.error(f"failed to append file: {err}") return os.unlink(self.config.statslog) - def run_cron_cmd(self): # Run external command if we have one. if self.config.croncmd: success, output = execute.run_localcmd(self.config.croncmd) if not success: - self.ui.error("failure running croncmd: %s" % self.config.croncmd) + self.ui.error(f"failure running croncmd: {self.config.croncmd}") diff --git a/ZeekControl/doc.py b/ZeekControl/doc.py index e2cd5994..3dca2719 100644 --- a/ZeekControl/doc.py +++ b/ZeekControl/doc.py @@ -4,6 +4,7 @@ import inspect + def api(*deco_args): if len(deco_args) == 1 and callable(deco_args[0]): # No argument to decorator. @@ -16,19 +17,22 @@ def api(*deco_args): def _api(method): method._doc = deco_args[0] return method + return _api + def print_indented(text, level): out = "" if not isinstance(text, list): text = text.splitlines() for line in text: - out += "%s %s\n" % (" " * level, line) + out += "{} {}\n".format(" " * level, line) out += "\n" return out + # Prints API documentation for a class. Includes all methods tagged with # @api(tag). (Use an unknown tag to not exclude all methods.) If header is # False, the class's name and doc string is not included. @@ -36,7 +40,7 @@ def print_class(cls, tag="", header=True): out = "" methods = {} - for (key, val) in cls.__dict__.items(): + for key, val in cls.__dict__.items(): if not inspect.isfunction(val): continue @@ -47,11 +51,11 @@ def print_class(cls, tag="", header=True): methods[key] = val if header: - out += ".. _%s:\n\n" % cls.__name__ - out += "Class ``%s``\n" % cls.__name__ + out += f".. _{cls.__name__}:\n\n" + out += f"Class ``{cls.__name__}``\n" out += "~~~~~~~~%s~~" % ("~" * len(cls.__name__)) out += "\n\n" - out += "class **%s**\n" % cls.__name__ + out += f"class **{cls.__name__}**\n" out += print_indented(inspect.getdoc(cls), 1) for name in sorted(methods.keys()): @@ -59,9 +63,8 @@ def print_class(cls, tag="", header=True): args, varargs, keywords, defaults = inspect.getargspec(func) - out += print_indented(".. _%s.%s:" % (cls.__name__, name), 1) - out += print_indented("**%s** (%s)" % (name, ", ".join(args)), 1) + out += print_indented(f".. _{cls.__name__}.{name}:", 1) + out += print_indented("**{}** ({})".format(name, ", ".join(args)), 1) out += print_indented(inspect.getdoc(func), 2) return out - diff --git a/ZeekControl/events.py b/ZeekControl/events.py index 839ed449..bae2f1b2 100644 --- a/ZeekControl/events.py +++ b/ZeekControl/events.py @@ -29,18 +29,19 @@ # result event, or [] if no result_event was specified. # If success is False, results_args is a string with an error message. -def send_events_parallel(events, topic): +def send_events_parallel(events, topic): results = [] sent = [] - for (node, event, args, result_event) in events: - + for node, event, args, result_event in events: if not broker: - results += [(node, False, "Python bindings for Broker: %s" % errmsg)] + results += [(node, False, f"Python bindings for Broker: {errmsg}")] continue - success, endpoint, sub = _send_event_init(node, event, args, result_event, topic) + success, endpoint, sub = _send_event_init( + node, event, args, result_event, topic + ) if success and result_event: sent += [(node, result_event, endpoint, sub)] @@ -49,7 +50,7 @@ def send_events_parallel(events, topic): endpoint.shutdown() results += [(node, success, "")] - for (node, result_event, endpoint, sub) in sent: + for node, result_event, endpoint, sub in sent: success, result_args = _send_event_wait(node, result_event, endpoint, sub) sub.reset() endpoint.shutdown() @@ -57,8 +58,8 @@ def send_events_parallel(events, topic): return results -def _send_event_init(node, event, args, result_event, topic): +def _send_event_init(node, event, args, result_event, topic): host = node.addr endpoint = broker.Endpoint() subscriber = endpoint.make_subscriber(topic) @@ -76,8 +77,12 @@ def _send_event_init(node, event, args, result_event, topic): if msg.code() == broker.SC.PeerAdded: ev = broker.zeek.Event(event, *args) endpoint.publish(topic + "/" + repr(msg.context()), ev) - logging.debug("broker: %s(%s) to node %s", event, - ", ".join(args), node.name) + logging.debug( + "broker: %s(%s) to node %s", + event, + ", ".join(args), + node.name, + ) return (True, endpoint, subscriber) tries += 1 @@ -85,6 +90,7 @@ def _send_event_init(node, event, args, result_event, topic): if tries > config.Config.commtimeout: return (False, "time-out", None) + def _send_event_wait(node, result_event, bc, sub): if not result_event: return (True, []) @@ -99,8 +105,9 @@ def _send_event_wait(node, result_event, bc, sub): (topic, event) = msg ev = broker.zeek.Event(event) args = ev.args() - logging.debug("broker: %s(%s) from node %s", result_event, - ", ".join(args), node.name) + logging.debug( + "broker: %s(%s) from node %s", result_event, ", ".join(args), node.name + ) return (True, args) tries += 1 @@ -108,4 +115,3 @@ def _send_event_wait(node, result_event, bc, sub): if tries > config.Config.commtimeout: logging.debug("broker: timeout during receive from node %s", node.name) return (False, "time-out") - diff --git a/ZeekControl/exceptions.py b/ZeekControl/exceptions.py index b8dcbef9..071ab731 100644 --- a/ZeekControl/exceptions.py +++ b/ZeekControl/exceptions.py @@ -8,21 +8,27 @@ # exceptions are not expected to occur, so if one is raised a stack trace # can provide valuable information on the source of the problem. + class ZeekControlError(Exception): """This is the base class for ZeekControl exceptions.""" + class LockError(ZeekControlError): """Indicates that ZeekControl was unable to obtain a lock.""" + class RuntimeEnvironmentError(ZeekControlError): """Indicates an error in the runtime environment (e.g. running as wrong user, or some files/directories have wrong permissions or location).""" + class InvalidNodeError(ZeekControlError): """Indicates an attempt to lookup an invalid node name.""" + class ConfigurationError(ZeekControlError): """Indicates a problem with the ZeekControl configuration.""" + class CommandSyntaxError(ZeekControlError): """Indicates a syntax error in a ZeekControl command.""" diff --git a/ZeekControl/execute.py b/ZeekControl/execute.py index 56319f61..af761c82 100644 --- a/ZeekControl/execute.py +++ b/ZeekControl/execute.py @@ -1,13 +1,12 @@ # These modules provides a set of functions to execute actions on a host. # If the host is local, it's done direcly; if it's remote we log in via SSH. +import logging import os import shutil import subprocess -import logging -from ZeekControl import ssh_runner -from ZeekControl import util +from ZeekControl import ssh_runner, util # Copy src to dstdir, preserving permission bits and file type. The src @@ -15,7 +14,7 @@ # recursively). If the target pathname already exists, it is not clobbered. def install(src, dstdir, cmdout): if not os.path.lexists(src): - cmdout.error("pathname not found: %s" % src) + cmdout.error(f"pathname not found: {src}") return False dst = os.path.join(dstdir, os.path.basename(src)) @@ -34,32 +33,37 @@ def install(src, dstdir, cmdout): elif os.path.isdir(src): shutil.copytree(src, dst, symlinks=True) else: - cmdout.error("failed to copy %s: not a file, dir, or symlink" % src) + cmdout.error(f"failed to copy {src}: not a file, dir, or symlink") return False except OSError: # Python 2.6 has a bug where this may fail on NFS. So we just # ignore errors. pass - except IOError as err: - cmdout.error("failed to copy: %s" % err) + except OSError as err: + cmdout.error(f"failed to copy: {err}") return False return True + # rsyncs paths from localhost to destination hosts. def sync(nodes, paths, cmdout): result = True cmds = [] for n in nodes: - args = ['-rRl', '--delete', '--rsh="ssh -o BatchMode=yes -o LogLevel=error -o ConnectTimeout=30"'] - dst = ["%s:/" % util.format_rsync_addr(n.addr)] + args = [ + "-rRl", + "--delete", + '--rsh="ssh -o BatchMode=yes -o LogLevel=error -o ConnectTimeout=30"', + ] + dst = [f"{util.format_rsync_addr(n.addr)}:/"] args += paths + dst - cmdline = "rsync %s" % " ".join(args) + cmdline = "rsync {}".format(" ".join(args)) cmds += [(n, cmdline, "", None)] - for (id, success, output) in run_localcmds(cmds): + for id, success, output in run_localcmds(cmds): if not success: - cmdout.error("rsync to %s failed: %s" % (id.addr, output)) + cmdout.error(f"rsync to {id.addr} failed: {output}") result = False return result @@ -75,6 +79,7 @@ def run_localcmd(cmd, env=None, inputtext=None): proc = _run_localcmd_init("single", cmd, env) return _run_localcmd_wait(proc, inputtext) + # Same as run_localcmd() but runs a set of local commands in parallel. # Cmds is a list of (id, cmd, envs, inputtext) tuples, where id is # an arbitrary cookie identifying each command. @@ -83,18 +88,18 @@ def run_localcmds(cmds): results = [] running = [] - for (id, cmd, envs, inputtext) in cmds: + for id, cmd, envs, inputtext in cmds: proc = _run_localcmd_init(id, cmd, envs) running += [(id, proc, inputtext)] - for (id, proc, inputtext) in running: + for id, proc, inputtext in running: success, output = _run_localcmd_wait(proc, inputtext) results += [(id, success, output)] return results -def _run_localcmd_init(id, cmd, env): +def _run_localcmd_init(id, cmd, env): if env: cmdline = env + " " + cmd else: @@ -103,12 +108,19 @@ def _run_localcmd_init(id, cmd, env): logging.debug(cmdline) # os.setsid makes sure that the child process doesn't receive our CTRL-Cs. - proc = subprocess.Popen([cmdline], stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - close_fds=True, shell=True, preexec_fn=os.setsid) + proc = subprocess.Popen( + [cmdline], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + close_fds=True, + shell=True, + preexec_fn=os.setsid, + ) return proc + def _run_localcmd_wait(proc, inputtext): if inputtext: inputtext = inputtext.encode() @@ -131,9 +143,10 @@ def _run_localcmd_wait(proc, inputtext): # only fix I can come up with. def _emptyDel(self): pass -subprocess.Popen.__del__ = _emptyDel +subprocess.Popen.__del__ = _emptyDel + class Executor: def __init__(self, config): @@ -183,14 +196,16 @@ def run_cmds(self, cmds, shell=False, helper=False): if shell: if args: - cmdargs = ["%s %s" % (cmdargs[0], " ".join(args))] + cmdargs = ["{} {}".format(cmdargs[0], " ".join(args))] else: cmdargs += args nodecmdlist.append((zeeknode.addr, cmdargs)) logging.debug("%s: %s", zeeknode.host, " ".join(cmdargs)) - for host, result in self.sshrunner.exec_multihost_commands(nodecmdlist, shell, self.config.commandtimeout): + for host, result in self.sshrunner.exec_multihost_commands( + nodecmdlist, shell, self.config.commandtimeout + ): nodecmd = dd[host].pop(0) zeeknode = nodecmd[0] if not isinstance(result, Exception): @@ -228,10 +243,10 @@ def mkdirs(self, dirs): results = [] cmds = [] - for (node, dir) in dirs: + for node, dir in dirs: cmds += [(node, "mkdir", ["-p", dir])] - for (node, success, output) in self.run_cmds(cmds): + for node, success, output in self.run_cmds(cmds): results += [(node, success, output)] return results @@ -247,14 +262,13 @@ def rmdirs(self, dirs): results = [] cmds = [] - for (node, dir) in dirs: - cmds += [(node, "if [ -d %s ]; then rm -rf %s ; fi" % (dir, dir), [])] + for node, dir in dirs: + cmds += [(node, f"if [ -d {dir} ]; then rm -rf {dir} ; fi", [])] - for (node, success, output) in self.run_cmds(cmds, shell=True): + for node, success, output in self.run_cmds(cmds, shell=True): results += [(node, success, output)] return results def host_status(self): return self.sshrunner.host_status() - diff --git a/ZeekControl/install.py b/ZeekControl/install.py index b941bbac..507977c4 100644 --- a/ZeekControl/install.py +++ b/ZeekControl/install.py @@ -1,14 +1,14 @@ # Functions to install files on all nodes. -import os import binascii +import os -from ZeekControl import util -from ZeekControl import config +from ZeekControl import config, util # In all paths given in this file, ${