-
Notifications
You must be signed in to change notification settings - Fork 350
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
strip-private: T6355: rework the strip-private filter
to only remove passwords and use an explicit list of secret paths
- Loading branch information
Showing
1 changed file
with
132 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
#!/usr/bin/python3 | ||
|
||
# Copyright 2021-2023 VyOS maintainers and contributors <[email protected]> | ||
# Copyright 2021-2024 VyOS maintainers and contributors <[email protected]> | ||
# | ||
# This library is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU Lesser General Public | ||
|
@@ -15,139 +15,143 @@ | |
# You should have received a copy of the GNU Lesser General Public | ||
# License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
import argparse | ||
import re | ||
import sys | ||
import copy | ||
|
||
from netaddr import IPNetwork, AddrFormatError | ||
|
||
parser = argparse.ArgumentParser(description='strip off private information from VyOS config') | ||
|
||
strictness = parser.add_mutually_exclusive_group() | ||
strictness.add_argument('--loose', action='store_true', help='remove only information specified as arguments') | ||
strictness.add_argument('--strict', action='store_true', help='remove any private information (implies all arguments below). This is the default behavior.') | ||
|
||
parser.add_argument('--mac', action='store_true', help='strip off MAC addresses') | ||
parser.add_argument('--hostname', action='store_true', help='strip off system host and domain names') | ||
parser.add_argument('--username', action='store_true', help='strip off user names') | ||
parser.add_argument('--dhcp', action='store_true', help='strip off DHCP shared network and static mapping names') | ||
parser.add_argument('--domain', action='store_true', help='strip off domain names') | ||
parser.add_argument('--asn', action='store_true', help='strip off BGP ASNs') | ||
parser.add_argument('--snmp', action='store_true', help='strip off SNMP location information') | ||
parser.add_argument('--lldp', action='store_true', help='strip off LLDP location information') | ||
|
||
address_preserval = parser.add_mutually_exclusive_group() | ||
address_preserval.add_argument('--address', action='store_true', help='strip off all IPv4 and IPv6 addresses') | ||
address_preserval.add_argument('--public-address', action='store_true', help='only strip off public IPv4 and IPv6 addresses') | ||
address_preserval.add_argument('--keep-address', action='store_true', help='preserve all IPv4 and IPv6 addresses') | ||
|
||
# Censor the first half of the address. | ||
ipv4_re = re.compile(r'(\d{1,3}\.){2}(\d{1,3}\.\d{1,3})') | ||
ipv4_subst = r'xxx.xxx.\2' | ||
|
||
# Censor all but the first two fields. | ||
ipv6_re = re.compile(r'([0-9a-fA-F]{1,4}\:){2}([0-9a-fA-F:]+)') | ||
ipv6_subst = r'xxxx:xxxx:\2' | ||
|
||
def ip_match(match: re.Match, subst: str) -> str: | ||
""" | ||
Take a Match and a substitution pattern, check if the match contains a valid IP address, strip | ||
information if it is. This routine is intended to be passed to `re.sub' as a replacement pattern. | ||
""" | ||
result = match.group(0) | ||
# Is this a valid IP address? | ||
try: | ||
addr = IPNetwork(result).ip | ||
# No? Then we've got nothing to do with it. | ||
except AddrFormatError: | ||
return result | ||
# Should we strip it? | ||
if args.address or (args.public_address and not addr.is_private()): | ||
return match.expand(subst) | ||
# No? Then we'll leave it as is. | ||
else: | ||
return result | ||
|
||
def strip_address(line: str) -> str: | ||
""" | ||
Strip IPv4 and IPv6 addresses from the given string. | ||
""" | ||
return ipv4_re.sub(lambda match: ip_match(match, ipv4_subst), ipv6_re.sub(lambda match: ip_match(match, ipv6_subst), line)) | ||
|
||
def strip_lines(rules: tuple) -> None: | ||
""" | ||
Read stdin line by line and apply the given stripping rules. | ||
""" | ||
import vyos.configtree | ||
|
||
|
||
def anonymize_password(v): | ||
return "<PASSWORD REDACTED>" | ||
|
||
def anonymize_key(v): | ||
return "<KEY DATA REDACTED>" | ||
|
||
|
||
secret_paths = [ | ||
# System user password hashes | ||
{"base_path": ['system', 'login', 'user'], "secret_path": ["authentication", "encrypted-password"], "func": anonymize_password}, | ||
|
||
# PKI data | ||
{"base_path": ["pki", "ca"], "secret_path": ["private", "key"], "func": anonymize_key}, | ||
{"base_path": ["pki", "certificate"], "secret_path": ["private", "key"], "func": anonymize_key}, | ||
{"base_path": ["pki", "key-pair"], "secret_path": ["private", "key"], "func": anonymize_key}, | ||
{"base_path": ["pki", "openssh"], "secret_path": ["private", "key"], "func": anonymize_key}, | ||
{"base_path": ["pki", "openvpn", "shared-secret"], "secret_path": ["key"], "func": anonymize_key}, | ||
{"base_path": ["pki", "dh"], "secret_path": ["parameters"], "func": anonymize_key}, | ||
|
||
# IPsec pre-shared secrets | ||
{"base_path": ['vpn', 'ipsec', 'authentication', 'psk'], "secret_path": ["secret"], "func": anonymize_password}, | ||
|
||
# IPsec x509 passphrases | ||
{"base_path": ['vpn', 'ipsec', 'site-to-site', 'peer'], "secret_path": ['authentication', 'x509'], "func": anonymize_password}, | ||
|
||
# IPsec remote-access secrets and passwords | ||
{"base_path": ["vpn", "ipsec", "remote-access", "connection"], "secret_path": ["authentication", "pre-shared-secret"], "func": anonymize_password}, | ||
# Passwords in remote-access IPsec local users have their own fixup | ||
# due to deeper nesting. | ||
|
||
# PPTP passwords | ||
{"base_path": ['vpn', 'pptp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": anonymize_password}, | ||
|
||
# L2TP passwords | ||
{"base_path": ['vpn', 'l2tp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": anonymize_password}, | ||
{"path": ['vpn', 'l2tp', 'remote-access', 'ipsec-settings', 'authentication', 'pre-shared-secret'], "func": anonymize_password}, | ||
|
||
# SSTP passwords | ||
{"base_path": ['vpn', 'sstp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": anonymize_password}, | ||
|
||
# OpenConnect passwords | ||
{"base_path": ['vpn', 'openconnect', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": anonymize_password}, | ||
|
||
# PPPoE server passwords | ||
{"base_path": ['service', 'pppoe-server', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": anonymize_password}, | ||
|
||
# RADIUS PSKs for VPN services | ||
{"base_path": ["vpn", "sstp", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
{"base_path": ["vpn", "l2tp", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
{"base_path": ["vpn", "pptp", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
{"base_path": ["vpn", "openconnect", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
{"base_path": ["service", "ipoe-server", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
{"base_path": ["service", "pppoe-server", "authentication", "radius", "server"], "secret_path": ["key"], "func": anonymize_password}, | ||
|
||
# VRRP passwords | ||
{"base_path": ['high-availability', 'vrrp', 'group'], "secret_path": ['authentication', 'password'], "func": anonymize_password}, | ||
|
||
# BGP neighbor and peer group passwords | ||
{"base_path": ['protocols', 'bgp', 'neighbor'], "secret_path": ["password"], "func": anonymize_password}, | ||
{"base_path": ['protocols', 'bgp', 'peer-group'], "secret_path": ["password"], "func": anonymize_password}, | ||
|
||
# WireGuard private keys | ||
{"base_path": ["interfaces", "wireguard"], "secret_path": ["private-key"], "func": anonymize_password}, | ||
|
||
# NHRP passwords | ||
{"base_path": ["protocols", "nhrp", "tunnel"], "secret_path": ["cisco-authentication"], "func": anonymize_password}, | ||
|
||
# RIP passwords | ||
{"base_path": ["protocols", "rip", "interface"], "secret_path": ["authentication", "plaintext-password"], "func": anonymize_password}, | ||
|
||
# IS-IS passwords | ||
{"path": ["protocols", "isis", "area-password", "plaintext-password"], "func": anonymize_password}, | ||
{"base_path": ["protocols", "isis", "interface"], "secret_path": ["password", "plaintext-password"], "func": anonymize_password}, | ||
|
||
# HTTP API servers | ||
{"base_path": ["service", "https", "api", "keys", "id"], "secret_path": ["key"], "func": anonymize_password}, | ||
|
||
# Telegraf | ||
{"path": ["service", "monitoring", "telegraf", "prometheus-client", "authentication", "password"], "func": anonymize_password}, | ||
{"path": ["service", "monitoring", "telegraf", "influxdb", "authentication", "token"], "func": anonymize_password}, | ||
{"path": ["service", "monitoring", "telegraf", "azure-data-explorer", "authentication", "client-secret"], "func": anonymize_password}, | ||
{"path": ["service", "monitoring", "telegraf", "splunk", "authentication", "token"], "func": anonymize_password}, | ||
|
||
# SNMPv3 passwords | ||
{"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["privacy", "encrypted-password"], "func": anonymize_password}, | ||
{"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["privacy", "plaintext-password"], "func": anonymize_password}, | ||
{"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["auth", "encrypted-password"], "func": anonymize_password}, | ||
{"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["auth", "encrypted-password"], "func": anonymize_password}, | ||
] | ||
|
||
def strip_private(config_source): | ||
ct = vyos.configtree.ConfigTree(config_source) | ||
|
||
# Fixup for remote-access IPsec local users that are nested under two tag nodes | ||
# We generate the list of their paths dynamically | ||
ipsec_ra_base = {"base_path": ["vpn", "ipsec", "remote-access", "connection"], "func": anonymize_password} | ||
if ct.exists(ipsec_ra_base["base_path"]): | ||
for conn in ct.list_nodes(ipsec_ra_base["base_path"]): | ||
if ct.exists(ipsec_ra_base["base_path"] + [conn] + ["authentication", "local-users", "username"]): | ||
for u in ct.list_nodes(ipsec_ra_base["base_path"] + [conn] + ["authentication", "local-users", "username"]): | ||
p = copy.copy(ipsec_ra_base) | ||
p["base_path"] = p["base_path"] + [conn] + ["authentication", "local-users", "username"] | ||
p["secret_path"] = ["password"] | ||
secret_paths.append(p) | ||
|
||
for sp in secret_paths: | ||
if "base_path" in sp: | ||
if ct.exists(sp["base_path"]): | ||
for n in ct.list_nodes(sp["base_path"]): | ||
if ct.exists(sp["base_path"] + [n] + sp["secret_path"]): | ||
secret = ct.return_value(sp["base_path"] + [n] + sp["secret_path"]) | ||
ct.set(sp["base_path"] + [n] + sp["secret_path"], value=sp["func"](secret)) | ||
elif "path" in sp: | ||
if ct.exists(sp["path"]): | ||
secret = ct.return_value(sp["path"]) | ||
ct.set(sp["path"], value=sp["func"](secret)) | ||
else: | ||
raise ValueError("Malformed secret path dict, has neither base_path nor path in it ") | ||
|
||
|
||
return ct.to_string() | ||
|
||
def read_input(): | ||
try: | ||
for line in sys.stdin: | ||
if not args.keep_address: | ||
line = strip_address(line) | ||
for (condition, regexp, subst) in rules: | ||
if condition: | ||
line = regexp.sub(subst, line) | ||
print(line, end='') | ||
return sys.stdin.read() | ||
# stdin can be cut for any reason, such as user interrupt or the pager terminating before the text can be read. | ||
# All we can do is gracefully exit. | ||
except (BrokenPipeError, EOFError, KeyboardInterrupt): | ||
sys.exit(1) | ||
|
||
if __name__ == "__main__": | ||
args = parser.parse_args() | ||
# Strict mode is the default and the absence of loose mode implies presence of strict mode. | ||
if not args.loose: | ||
args.mac = args.domain = args.hostname = args.username = args.dhcp = args.asn = args.snmp = args.lldp = True | ||
if not args.public_address and not args.keep_address: | ||
args.address = True | ||
elif not args.address and not args.public_address: | ||
args.keep_address = True | ||
|
||
# (condition, precompiled regexp, substitution string) | ||
stripping_rules = [ | ||
# Strip passwords | ||
(True, re.compile(r'password \S+'), 'password xxxxxx'), | ||
(True, re.compile(r'cisco-authentication \S+'), 'cisco-authentication xxxxxx'), | ||
# Strip public key information | ||
(True, re.compile(r'public-keys \S+'), 'public-keys [email protected]'), | ||
(True, re.compile(r'type \'ssh-(rsa|dss)\''), 'type ssh-xxx'), | ||
(True, re.compile(r' key \S+'), ' key xxxxxx'), | ||
# Strip bucket | ||
(True, re.compile(r' bucket \S+'), ' bucket xxxxxx'), | ||
# Strip tokens | ||
(True, re.compile(r' token \S+'), ' token xxxxxx'), | ||
# Strip OpenVPN secrets | ||
(True, re.compile(r'(shared-secret-key-file|ca-cert-file|cert-file|dh-file|key-file|client) (\S+)'), r'\1 xxxxxx'), | ||
# Strip IPSEC secrets | ||
(True, re.compile(r'pre-shared-secret \S+'), 'pre-shared-secret xxxxxx'), | ||
(True, re.compile(r'secret \S+'), 'secret xxxxxx'), | ||
# Strip OSPF md5-key | ||
(True, re.compile(r'md5-key \S+'), 'md5-key xxxxxx'), | ||
# Strip WireGuard private-key | ||
(True, re.compile(r'private-key \S+'), 'private-key xxxxxx'), | ||
|
||
# Strip MAC addresses | ||
(args.mac, re.compile(r'([0-9a-fA-F]{2}\:){5}([0-9a-fA-F]{2}((\:{0,1})){3})'), r'xx:xx:xx:xx:xx:\2'), | ||
|
||
# Strip host-name, domain-name, domain-search and url | ||
(args.hostname, re.compile(r'(host-name|domain-name|domain-search|url) \S+'), r'\1 xxxxxx'), | ||
|
||
# Strip user-names | ||
(args.username, re.compile(r'(user|username|user-id) \S+'), r'\1 xxxxxx'), | ||
# Strip full-name | ||
(args.username, re.compile(r'(full-name) [ -_A-Z a-z]+'), r'\1 xxxxxx'), | ||
|
||
# Strip DHCP static-mapping and shared network names | ||
(args.dhcp, re.compile(r'(shared-network-name|static-mapping) \S+'), r'\1 xxxxxx'), | ||
|
||
# Strip host/domain names | ||
(args.domain, re.compile(r' (peer|remote-host|local-host|server) ([\w-]+\.)+[\w-]+'), r' \1 xxxxx.tld'), | ||
|
||
# Strip BGP ASNs | ||
(args.asn, re.compile(r'(bgp|remote-as) (\d+)'), r'\1 XXXXXX'), | ||
|
||
# Strip LLDP location parameters | ||
(args.lldp, re.compile(r'(altitude|datum|latitude|longitude|ca-value|country-code) (\S+)'), r'\1 xxxxxx'), | ||
|
||
# Strip SNMP location | ||
(args.snmp, re.compile(r'(location) \S+'), r'\1 xxxxxx'), | ||
] | ||
strip_lines(stripping_rules) | ||
config_source = read_input() | ||
stripped_config = strip_private(config_source) | ||
print(stripped_config) |