Skip to content

Commit

Permalink
improve tests and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
lilatomic committed Nov 11, 2023
1 parent e354d3d commit 772d214
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 43 deletions.
24 changes: 20 additions & 4 deletions test/crypt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def process_openssl_objects(objs: dict):
if k in {"private-key", "public-key", "P"}:
# out[k] = bytes.fromhex(v.replace(":", ""))
out[k] = v.replace(":", "").encode("ascii")
elif re.match("\d+ \(0x\d+\)", v): # matches '2 (0x2)'
match = re.match("\d+ \(0x(\d+)\)", v)
elif re.match(r"\d+ \(0x\d+\)", v): # matches '2 (0x2)'
match = re.match(r"\d+ \(0x(\d+)\)", v)
out[k] = match.group(0).encode("utf-8")
out[k] = b"02" # TODO: this is a shim
else:
Expand Down Expand Up @@ -131,7 +131,23 @@ def gen_child_cert(workdir, cadir, regen: bool):
der_path = str(workdir / "rsa.x509.der")

subprocess.run(["openssl", "req", "-new", "-key", privkey_path, "-out", csr_path, "-subj", "/C=CA/O=example/CN=turkeyutils-leaf"])
subprocess.run(["openssl", "x509", "-req", "-in", csr_path, "-CA", str(cadir/"rsa.x509.der"), "-CAkey", str(cadir/"rsa.pem"), "-CAcreateserial", "-days", "365", "-outform", "DER", "-out", der_path])
subprocess.run([
"openssl",
"x509",
"-req",
"-in",
csr_path,
"-CA",
str(cadir / "rsa.x509.der"),
"-CAkey", str(cadir / "rsa.pem"),
"-CAcreateserial",
"-days",
"365",
"-outform",
"DER",
"-out",
der_path
])

with open(der_path, mode="rb") as der_file:
der = der_file.read()
Expand All @@ -157,7 +173,7 @@ def rsa_keys(workdir: Path, regen=True):
ca_key, ca_cert = gen_rsa(workdir / "ca", regen)
unsigned_key, unsigned_cert = gen_rsa(workdir / "unsigned", regen)
leaf_key, leaf_cert = gen_child_cert(workdir / "leaf", workdir / "ca", regen)
pkcs8 = rsa_to_pkcs8(workdir/ "unsigned")
pkcs8 = rsa_to_pkcs8(workdir / "unsigned")
return {
"ca": ca_cert,
"unsigned": unsigned_cert,
Expand Down
47 changes: 42 additions & 5 deletions test/key_req.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#!/tmp/venv/bin/python3
import ast
import os
import sys
import textwrap
from pathlib import Path


def report(s):
with open("/tmp/file.txt", mode="a") as f:
f.write(str(s))
f.write("\n")


def debug(keyid, desc, callout, keyring_id):
import keyutils

Expand All @@ -27,14 +32,46 @@ def debug(keyid, desc, callout, keyring_id):
keyutils.instantiate(keyid, f"Debug {callout}".encode("utf-8"), keyring_id)


def install(tmpdir, install):
"""Install the keyutils config file"""
if install:
report("install request-key handler")
# Create a launcher script to get around a parser limitation in `/sbin/request-key`.
# request-key will take the executable path as everything up to the last "/",
# which is a problem for having both the python and the script file with absolute paths.
# eg "/path/to/bin/python /path/to/script.py %k %d %c %S"
# is parsed as ["/path/to/bin/python /path/to/script.py", "%k", "%d", "%c", "%S"]
launcher_path = Path(tmpdir) / "key_req.sh"
with open(launcher_path, mode="w", encoding="utf-8") as launcher:
launcher.write(textwrap.dedent(f"""\
#!/bin/bash
{sys.executable} {Path(__file__)} $@
"""))
os.chmod(launcher_path, 0o755)

with open(Path("/etc/request-key.d/turkeyutils.conf"), mode="w", encoding="utf-8") as config:
# add a config for to call into our executable for key requests
config.write(textwrap.dedent(f"""\
#OP TYPE DESCRIPTION CALLOUT INFO PROGRAM ARG1 ARG2 ARG3 ...
#====== ======= =============== =============== ===============================
create user turkeyutils:* * {launcher_path} %k %d %c %S
"""))
else:
report("uninstall request-key handler")
Path("/etc/request-key.d/turkeyutils.conf").unlink(missing_ok=True)


if __name__ == "__main__":
report(sys.argv)
if len(sys.argv) != 5:
raise ValueError("need to pass 4 arguments")

_, keyid, desc, callout, keyring_id = sys.argv
try:
debug(int(keyid), desc, callout, int(keyring_id))
if len(sys.argv) == 5:
_, keyid, desc, callout, keyring_id = sys.argv
debug(int(keyid), desc, callout, int(keyring_id))
elif len(sys.argv) == 3:
_, tmpdir, install_raw = sys.argv
install(tmpdir, bool(ast.literal_eval(install_raw)))
else:
raise ValueError("wrong number of args passed")
except Exception as e:
report(e)
sys.exit(1)
Expand Down
73 changes: 39 additions & 34 deletions test/keyutils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import os
import subprocess
import sys
import textwrap
import time
import unittest
from pathlib import Path
Expand All @@ -29,6 +28,17 @@
from test import crypt_utils


def has_sudo():
"""
Test that we have root capabilities which we need for privileged key operations
"""
proc = subprocess.run(["sudo", "-v", "-n"], capture_output=True, text=True)
return proc.returncode == 0


needs_sudo = pytest.mark.skipif(not has_sudo(), reason="requires sudo permissions")


@pytest.fixture(scope="function")
def ring(request):
return keyutils.add_ring(request.function.__name__.encode("utf-8"), keyutils.KEY_SPEC_THREAD_KEYRING)
Expand All @@ -41,7 +51,6 @@ def rings(parent: int, n: int = 2):
return rings



class BasicTest(unittest.TestCase):
def testSet(self):
keyDesc = b"test:key:01"
Expand Down Expand Up @@ -123,6 +132,9 @@ def testLink(self):
keyutils.link(child, parent)
self.assertEqual(keyutils.search(parent, desc), keyId)

keyutils.unlink(child, parent)
self.assertEqual(keyutils.search(parent, desc), None)

def testTimeout(self):
desc = b"dummyKey"
value = b"dummyValue"
Expand Down Expand Up @@ -238,44 +250,30 @@ def test_move_exclusive(self, ring):
def test_capabilities(self):
caps = keyutils.capabilities()
assert caps
assert not caps.startswith(b'\x00') # the first byte will contain the results, and it should contain _something_
assert not caps.startswith(b'\x00') # the first byte will contain the results, and it should contain _something_


def test_get_keyring_id():
keyring = keyutils.get_keyring_id(keyutils.KEY_SPEC_THREAD_KEYRING, False)
assert keyring is not None and keyring != 0


@pytest.mark.skip
class TestNeedsSudo:
def test_keyring_chown(self):
key_id = keyutils.add_key(b"chown_n", b"chown_v", keyutils.KEY_SPEC_THREAD_KEYRING)
@pytest.mark.xfail(reason="Not implemented")
@needs_sudo
def test_keyring_chown(self):
# TODO: implement this
keyutils.add_key(b"chown_n", b"chown_v", keyutils.KEY_SPEC_THREAD_KEYRING)
raise NotImplementedError()


@pytest.fixture
def request_key(tmpdir):
# Create a launcher script to get around a parser limitation in `/sbin/request-key`.
# request-key will take the executable path as everything up to the last "/",
# which is a problem for having both the python and the script file with absolute paths.
# eg "/path/to/bin/python /path/to/script.py %k %d %c %S"
# is parsed as ["/path/to/bin/python /path/to/script.py", "%k", "%d", "%c", "%S"]
launcher_path = Path(tmpdir) / "key_req.sh"
with open(launcher_path, mode="w", encoding="utf-8") as launcher:
launcher.write(textwrap.dedent(f"""\
#!/bin/bash
{sys.executable} {Path(__file__).parent / "key_req.py"} $@
"""))
os.chmod(launcher_path, 0o755)

with open(Path("/etc/request-key.d/turkeyutils.conf"), mode="w", encoding="utf-8") as config:
# add a config for to call into our executable for key requests
config.write(textwrap.dedent(f"""\
#OP TYPE DESCRIPTION CALLOUT INFO PROGRAM ARG1 ARG2 ARG3 ...
#====== ======= =============== =============== ===============================
create user turkeyutils:* * {launcher_path} %k %d %c %S
"""))


@pytest.mark.skip
subprocess.run(["sudo", "-n", sys.executable, Path(__file__).parent / "key_req.py", tmpdir, "True"])
yield
subprocess.run(["sudo", "-n", sys.executable, Path(__file__).parent / "key_req.py", tmpdir, "False"])


@needs_sudo
class TestInstantiate:
def test_instantiate(self, request_key):
key = keyutils.request_key(b"turkeyutils:instantiate", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"pytest")
Expand All @@ -291,9 +289,13 @@ def test_negate(self, request_key):

def test_reject(self, request_key):
with pytest.raises(keyutils.KeyutilsError) as e:
key = keyutils.request_key(b"turkeyutils:reject", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"reject")
keyutils.request_key(b"turkeyutils:reject", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"reject")
assert e.value.args[0] == 128

def test_instantiation_failed(self, request_key):
key = keyutils.request_key(b"aaaa:aaaa", keyutils.KEY_SPEC_THREAD_KEYRING, callout_info=b"pytest")
assert key is None


@pytest.fixture
def dh_keys(tmpdir):
Expand Down Expand Up @@ -360,11 +362,15 @@ def test_restrict_keyring(self, rsa_keys):


def supports_pkcs8():
"""
We need the pkcs8_key_parser to be loaded in order to handle asymmetric operations
Try loading it with `sudo modprobe pkcs8_key_parser`
"""
lsmod = subprocess.run(["lsmod"], capture_output=True, text=True)
return "pkcs8_key_parser" in lsmod.stdout


needs_pkcs8 = pytest.mark.skipif(not supports_pkcs8(), reason="requires pkcs8 parser to insert ")
needs_pkcs8 = pytest.mark.skipif(not supports_pkcs8(), reason="requires pkcs8 parser to insert asymmetric keys")


class TestPKey:
Expand Down Expand Up @@ -394,10 +400,9 @@ def test_sign(self, ring, rsa_keys):

bad_data = b"some bad data"
with pytest.raises(keyutils.KeyutilsError) as e:
bad_verify = keyutils.pkey_verify(pkey, bad_data, sig, info=b'enc=pkcs1 hash=sha256')
keyutils.pkey_verify(pkey, bad_data, sig, info=b'enc=pkcs1 hash=sha256')
assert e.value.args[1] == 'Key was rejected by service'



if __name__ == "__main__":
sys.exit(unittest.main())

0 comments on commit 772d214

Please sign in to comment.