-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathskeleton_ca_handler.py
94 lines (70 loc) · 3.05 KB
/
skeleton_ca_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# -*- coding: utf-8 -*-
""" skeleton for customized CA handler """
from __future__ import print_function
from typing import Tuple
# pylint: disable=e0401
from acme_srv.helper import load_config, header_info_get
class CAhandler(object):
""" EST CA handler """
def __init__(self, _debug: bool = None, logger: object = None):
self.logger = logger
self.parameter = None
def __enter__(self):
""" Makes CAhandler a Context Manager """
if not self.parameter:
self._config_load()
return self
def __exit__(self, *args):
""" cose the connection at the end of the context """
def _config_load(self):
"""" load config from file """
self.logger.debug('CAhandler._config_load()')
config_dic = load_config(self.logger, 'CAhandler')
if 'CAhandler' in config_dic and 'parameter' in config_dic['CAhandler']:
self.parameter = config_dic['CAhandler']['parameter']
self.logger.debug('CAhandler._config_load() ended')
def _stub_func(self, parameter: str):
"""" load config from file """
self.logger.debug('CAhandler._stub_func(%s)', parameter)
self.logger.debug('CAhandler._stub_func() ended')
def enroll(self, csr: str) -> Tuple[str, str, str, str]:
""" enroll certificate """
self.logger.debug('CAhandler.enroll()')
cert_bundle = None
error = None
cert_raw = None
poll_indentifier = None
# optional: lookup http header information from request
qset = header_info_get(self.logger, csr=csr)
if qset:
self.logger.info(qset[-1]['header_info'])
self._stub_func(csr)
self.logger.debug('Certificate.enroll() ended')
return (error, cert_bundle, cert_raw, poll_indentifier)
def poll(self, cert_name: str, poll_identifier: str, _csr: str) -> Tuple[str, str, str, str, bool]:
""" poll status of pending CSR and download certificates """
self.logger.debug('CAhandler.poll()')
error = None
cert_bundle = None
cert_raw = None
rejected = False
self._stub_func(cert_name)
self.logger.debug('CAhandler.poll() ended')
return (error, cert_bundle, cert_raw, poll_identifier, rejected)
def revoke(self, _cert: str, _rev_reason: str, _rev_date: str) -> Tuple[int, str, str]:
""" revoke certificate """
self.logger.debug('CAhandler.revoke()')
code = 500
message = 'urn:ietf:params:acme:error:serverInternal'
detail = 'Revocation is not supported.'
self.logger.debug('Certificate.revoke() ended')
return (code, message, detail)
def trigger(self, payload: str) -> Tuple[str, str, str]:
""" process trigger message and return certificate """
self.logger.debug('CAhandler.trigger()')
error = None
cert_bundle = None
cert_raw = None
self._stub_func(payload)
self.logger.debug('CAhandler.trigger() ended with error: %s', error)
return (error, cert_bundle, cert_raw)