Skip to content

Commit

Permalink
lmp-el2go-auto-register: fix PKCS#11 slot initialization
Browse files Browse the repository at this point in the history
This patch fixes the potential issues when running the auto-registration
script on the device:

1. 1st slot gets overwritten
If the 1st slot is not labeled 'aktualizr' and lmp-el2go-auto-register
can't log into it with it's default PIN it would overwrite it. This
might mean data loss for some othe deamon which initialized the slot

2. Objects not stored in proper slots
When writing to pkcs#11 lmp-el2go-auto-register was not using the slot
label consistently. This means that some objects were written in proper
slot while some other not. This would only happen if the 1st slot (0x0)
was initialized with default PIN.

3. has_labels method doesn't use slot label
For this reason it would always try to access 1st slot (0x0) and ignore
the label. If slot with label 'aktualizr' was initialized in different
position this would mean the certificate/key objects would not be stored
properly.

Signed-off-by: Milosz Wasilewski <[email protected]>
  • Loading branch information
mwasilew committed Oct 11, 2022
1 parent a4ee068 commit 7707a49
Showing 1 changed file with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Copyright (c) 2022, Foundries.io Ltd.

from contextlib import contextmanager
import re
import os
import logging
import subprocess
Expand All @@ -18,6 +19,7 @@ PIN = os.environ.get("PKCS11_PIN", "87654321")
SO_PIN = os.environ.get("PKCS11_SOPIN", "12345678")
SOTA_DIR = os.environ.get("SOTA_DIR", "/var/sota")
HANDLERS = os.environ.get("HANDLERS", "aws-iot,aktualizr-lite")
TOKEN_LABEL = os.environ.get("TOKEN_LABEL", "aktualizr")

REPO_ID = os.environ["REPOID"]

Expand All @@ -34,6 +36,10 @@ def run_quietly(args: List[str]):
sys.exit(1)


class Pkcs11Error(Exception):
pass


class Pkcs11:
LIB = "/usr/lib/libckteec.so.0"

Expand All @@ -46,7 +52,7 @@ class Pkcs11:
"--key-type=EC:prime256v1",
f"--id={slot}",
f"--label={label}",
"--token-label=aktualizr",
f"--token-label={TOKEN_LABEL}",
]
run_quietly(args)

Expand All @@ -60,11 +66,12 @@ class Pkcs11:
f"--type={type}",
f"--id={slot}",
f"--label={label}",
f"--token-label={TOKEN_LABEL}",
]
run_quietly(args)

def has_labels(self, labels: List[str]) -> bool:
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", "--list-objects"]
args = ["pkcs11-tool", f"--module={self.LIB}", f"--pin={PIN}", f"--token-label={TOKEN_LABEL}", "--list-objects"]
out = subprocess.check_output(args)
missing = {x: 1 for x in labels}
for line in out.decode().splitlines():
Expand All @@ -76,20 +83,59 @@ class Pkcs11:
pass # already removed
return len(missing) == 0

@classmethod
def _init_slot_dictionary(cls, pkcs11_output: str) -> dict:
slot_dict = {}
slot_id = None
r = re.compile("\((?P<slot_id>[0-9x]{3})\)")

for line in pkcs11_output.splitlines():
if line.startswith("Slot"):
# decode slot ID
# assume output in format:
# Slot 0 (0x0): 710be08c-e795-597d-bf30-2a98bab41050
g = r.search(line)
slot_id = g.group("slot_id")
slot_dict.update({slot_id: {}})
if line.startswith(" "):
# decode slot dictionary
# assume output in format:
# token label : testtoken
parts = line.split(":", 1)
slot_dict[slot_id].update({parts[0].strip(): parts[1].strip()})
return slot_dict

@classmethod
def _is_initialized(cls) -> bool:
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--pin={PIN}", "--list-objects"]
args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"]
try:
subprocess.check_output(args, stderr=subprocess.STDOUT)
out = subprocess.check_output(args, stderr=subprocess.STDOUT)
slot_dict = cls._init_slot_dictionary(out.decode())
for slot_id, slot in slot_dict.items():
token_label = slot.get("token label")
if token_label == f"{TOKEN_LABEL}":
return True
except subprocess.CalledProcessError:
return False
return True
return False

@classmethod
def _initialize(cls):
args = ["pkcs11-tool", f"--module={cls.LIB}", "--list-slots"]
out = subprocess.check_output(args, stderr=subprocess.STDOUT)
slot_dict = cls._init_slot_dictionary(out.decode())
empty_slot = None
for slot_id, slot in slot_dict.items():
token_state = slot.get("token state")
if token_state == "uninitialized":
empty_slot = slot_id
break
if empty_slot is None:
# raise exception as there are no more available slots
raise Pkcs11Error("No free PKCS11 slots")
args = ["pkcs11-tool", f"--module={cls.LIB}", f"--so-pin={SO_PIN}"]
subprocess.check_call(args + ["--init-token", "--label", "aktualizr"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}"])
subprocess.check_call(args + ["--init-token", "--label", f"{TOKEN_LABEL}", "--slot", f"{empty_slot}"])
subprocess.check_call(args + ["--init-pin", f"--pin={PIN}", "--slot", f"{empty_slot}"])

@classmethod
def get_initialized(cls) -> "Pkcs11":
Expand Down Expand Up @@ -256,3 +302,4 @@ def main():

if __name__ == "__main__":
main()

0 comments on commit 7707a49

Please sign in to comment.