Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement externalAccountBinding #270

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ and read your private account key and CSR.
python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ > ./signed_chain.crt
```

If your ACME CA mandates externalAccountBinding (eAB), provide those parameters like so:

```
# Run the script on your server
python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ --eabkid 'PAtzxcSFQMQSdm9SLJTxCt0hwvvl5yNKPfnWBWqPk8o' --eabhmackey 'ZndUSkZvVldvMEFiRzQ5VWNCdERtNkNBNnBTcTl4czNKVEVxdUZiaEdpZXZNUVJBVmRuSFREcDJYX2s3X0NxTA' > ./signed_chain.crt
```

Some ACME CA mandate a contact at registration:

```
# Run the script on your server
python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ --contact [email protected] --eabkid 'PAtzxcSFQMQSdm9SLJTxCt0hwvvl5yNKPfnWBWqPk8o' --eabhmackey 'ZndUSkZvVldvMEFiRzQ5VWNCdERtNkNBNnBTcTl4czNKVEVxdUZiaEdpZXZNUVJBVmRuSFREcDJYX2s3X0NxTA' > ./signed_chain.crt
```


### Step 5: Install the certificate

The signed https certificate chain that is output by this script can be used along
Expand Down
40 changes: 19 additions & 21 deletions acme_tiny.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging, hmac
try:
from urllib.request import urlopen, Request # Python 3
except ImportError: # pragma: no cover
Expand All @@ -13,23 +13,20 @@
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)

def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None):
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, eabkid=None, eabhmackey=None):
directory, acct_headers, alg, jwk = None, None, None, None # global variables

# helper functions - base64 encode for jose spec
def _b64(b):
def _b64(b): # helper function - base64 encode for jose spec
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")

# helper function - run external commands
def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): # helper function - run external commands
proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate(cmd_input)
if proc.returncode != 0:
raise IOError("{0}\n{1}".format(err_msg, err))
return out

# helper function - make request and automatically parse json response
def _do_request(url, data=None, err_msg="Error", depth=0):
def _do_request(url, data=None, err_msg="Error", depth=0): # helper function - make request and automatically parse json response
try:
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
Expand All @@ -46,8 +43,7 @@ def _do_request(url, data=None, err_msg="Error", depth=0):
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
return resp_data, code, headers

# helper function - make signed requests
def _send_signed_request(url, payload, err_msg, depth=0):
def _send_signed_request(url, payload, err_msg, depth=0): # helper function - make signed requests
payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8'))
new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce']
protected = {"url": url, "alg": alg, "nonce": new_nonce}
Expand All @@ -61,8 +57,7 @@ def _send_signed_request(url, payload, err_msg, depth=0):
except IndexError: # retry bad nonces (they raise IndexError)
return _send_signed_request(url, payload, err_msg, depth=(depth + 1))

# helper function - poll until complete
def _poll_until_not(url, pending_statuses, err_msg):
def _poll_until_not(url, pending_statuses, err_msg): # helper function - poll until complete
result, t0 = None, time.time()
while result is None or result['status'] in pending_statuses:
assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
Expand Down Expand Up @@ -108,11 +103,15 @@ def _poll_until_not(url, pending_statuses, err_msg):
# create account, update contact details (if any), and set the global key identifier
log.info("Registering account...")
reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact}
account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
if eabkid and eabhmackey: # https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.4
log.info("Building externalAccountBinding...")
reg_payload['externalAccountBinding'] = {"protected": _b64(json.dumps({"alg": "HS256", "kid": eabkid, "url": directory['newAccount']}).encode('utf-8')), "payload": _b64(json.dumps(jwk).encode('utf-8')), "signature": _b64(hmac.new(base64.urlsafe_b64decode(eabhmackey.strip() + '=='), (_b64(json.dumps({"alg": "HS256", "kid": eabkid, "url": directory['newAccount']}).encode('utf-8')) + "." + _b64(json.dumps(jwk).encode('utf-8'))).encode('utf-8'), digestmod=hashlib.sha256).digest())}
response, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
log.info("{0} Account ID: {1}".format("Registered!" if code == 201 else "Already registered!", acct_headers['Location']))
if contact is not None:
account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
log.info("Updated contact details:\n{0}".format("\n".join(account['contact'])))
if contact and code == 200: # 200 == already reg --> update
response, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
log.info("Updated contact details:\n{0}".format("\n".join(response['contact'])))
log.info("You must agree to updated TOS:\n", response['instance']) if code == 403 and response['type'] == 'urn:ietf:params:acme:error:userActionRequired' else 0 # https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.3 : #userActionRequired only for TOS in RFC8555

# create a new order
log.info("Creating new order...")
Expand Down Expand Up @@ -175,10 +174,8 @@ def main(argv=None):
description=textwrap.dedent("""\
This script automates the process of getting a signed TLS certificate from Let's Encrypt using the ACME protocol.
It will need to be run on your server and have access to your private account key, so PLEASE READ THROUGH IT!
It's only ~200 lines, so it won't take long.

Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt
""")
It's only ~200 lines, so it won't take long.\n\n
Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt""")
)
parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
parser.add_argument("--csr", required=True, help="path to your certificate signing request")
Expand All @@ -189,10 +186,11 @@ def main(argv=None):
parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!")
parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:[email protected]) for your account-key")
parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80")
parser.add_argument("--eabkid", metavar="KID", default=None, help="Key Identifier for External Account Binding"), parser.add_argument("--eabhmackey", metavar="HMAC", default=None, help="HMAC key for External Account Binding")

args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, eabkid=args.eabkid, eabhmackey=args.eabhmackey)
sys.stdout.write(signed_crt)

if __name__ == "__main__": # pragma: no cover
Expand Down
12 changes: 10 additions & 2 deletions tests/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,23 @@ def test_account_key_domain(self):
self.assertIsInstance(result, ValueError)
self.assertIn(self.account_key_error, result.args[0])

def test_contact(self):
""" Make sure optional contact details can be set """
def test_contact_update(self):
""" Make sure optional contact details can be updated """
# add a logging handler that captures the info log output
log_output = StringIO()
debug_handler = logging.StreamHandler(log_output)
acme_tiny.LOGGER.addHandler(debug_handler)
# call acme_tiny with new contact details
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_tiny.main([
"--account-key", self.KEYS['account_key'].name,
"--csr", self.KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--directory-url", self.DIR_URL,
"--check-port", self.check_port,
"--contact", "mailto:[email protected]", "mailto:[email protected]",
])
result = acme_tiny.main([
"--account-key", self.KEYS['account_key'].name,
"--csr", self.KEYS['domain_csr'].name,
Expand Down