Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: lmp-el2go-auto-register: fix PKCS#11 slot initialization #850

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Copyright (c) 2022, Foundries.io Ltd.

from contextlib import contextmanager
import re
import os
import logging
import subprocess
Expand All @@ -18,6 +19,7 @@ PIN = os.environ.get("PKCS11_PIN", "87654321")
SO_PIN = os.environ.get("PKCS11_SOPIN", "12345678")
SOTA_DIR = os.environ.get("SOTA_DIR", "/var/sota")
HANDLERS = os.environ.get("HANDLERS", "aws-iot,aktualizr-lite")
TOKEN_LABEL = os.environ.get("TOKEN_LABEL", "aktualizr")

REPO_ID = os.environ["REPOID"]

Expand Down Expand Up @@ -46,7 +48,7 @@ class Pkcs11:
"--key-type=EC:prime256v1",
f"--id={slot}",
f"--label={label}",
"--token-label=aktualizr",
f"--token-label={TOKEN_LABEL}",
]
run_quietly(args)

Expand All @@ -60,11 +62,12 @@ class Pkcs11:
f"--type={type}",
f"--id={slot}",
f"--label={label}",
f"--token-label={TOKEN_LABEL}",
]
run_quietly(args)

def has_labels(self, labels: List[str]) -> bool:
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", "--list-objects"]
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", f"--token-label={TOKEN_LABEL}", "--list-objects"]
out = subprocess.check_output(args)
missing = {x: 1 for x in labels}
for line in out.decode().splitlines():
Expand All @@ -76,20 +79,59 @@ class Pkcs11:
pass # already removed
return len(missing) == 0

@classmethod
def _init_slot_dictionary(cls, pkcs11_output: str) -> dict:
slot_dict = {}
slot_id = None
r = re.compile("\((?P<slot_id>[0-9x]{3})\)")

for line in pkcs11_output.splitlines():
if line.startswith("Slot"):
# decode slot ID
# assume output in format:
# Slot 0 (0x0): 710be08c-e795-597d-bf30-2a98bab41050
g = r.search(line)
slot_id = g.group("slot_id")
slot_dict.update({slot_id: {}})
if line.startswith(" "):
# decode slot dictionary
# assume output in format:
# token label : testtoken
parts = line.split(":", 1)
slot_dict[slot_id].update({parts[0].strip(): parts[1].strip()})
return slot_dict

@classmethod
def _is_initialized(cls) -> bool:
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--pin={PIN}", "--list-objects"]
args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"]
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
out = subprocess.check_output(args, stderr=subprocess.STDOUT)
slot_dict = cls._init_slot_dictionary(out.decode())
for slot_id, slot in slot_dict.items():
token_label = slot.get("token label")
if token_label == f"{TOKEN_LABEL}":
return True
except subprocess.CalledProcessError:
return False
return True
return False

@classmethod
def _initialize(cls):
args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"]
out = subprocess.check_output(args, stderr=subprocess.STDOUT)
slot_dict = cls._init_slot_dictionary(out.decode())
empty_slot = None
for slot_id, slot in slot_dict.items():
token_state = slot.get("token state")
if token_state == "uninitialized":
empty_slot = slot_id
break
if empty_slot is None:
# raise exception as there are no more available slots
raise RuntimeError("No free PKCS11 slots")
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--so-pin={SO_PIN}"]
subprocess.check_call(args + ["--init-token", "--label", "aktualizr"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}"])
subprocess.check_call(args + ["--init-token", "--label", f"{TOKEN_LABEL}", "--slot", f"{empty_slot}"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}", "--slot", f"{empty_slot}"])

@classmethod
def get_initialized(cls) -> "Pkcs11":
Expand Down