From bf14a7af610b34a55af351058de33a266bc783cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Stelmach?= Date: Sun, 8 Dec 2019 22:08:31 +0100 Subject: [PATCH] Add support for dns challenge Since there is no single interface for administrating DNS servers a custom script (specified with --challenge-script) is called by acme_tiny.py. The script needs to support the following interface: challenge_script (--add|--remove) --domain DOMAIN TXTRECORD --add - add a TXT record --remove - remove a TXT record --domain DOMAIN - specify a domain name TXTRECORD - value of a TXT record to be added or removed --- acme_tiny.py | 61 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/acme_tiny.py b/acme_tiny.py index 511fa93b..fc3e8d0e 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -13,7 +13,7 @@ LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, challenge_type="http", challenge_script=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -70,6 +70,12 @@ def _poll_until_not(url, pending_statuses, err_msg): result, _, _ = _send_signed_request(url, None, err_msg) return result + if challenge_type not in ("http", "dns"): + raise ValueError("Unsupported challenge type: {0}".format(challenge_type)) + + if challenge_type == "dns" and challenge_script is None: + raise ValueError("Challenge script is required for dns challenge") + # parse account key to get public key log.info("Parsing account key...") out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error") @@ -127,27 +133,46 @@ def _poll_until_not(url, pending_statuses, err_msg): domain = authorization['identifier']['value'] log.info("Verifying {0}...".format(domain)) - # find the http-01 challenge and write the challenge file - challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0] - token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) - keyauthorization = "{0}.{1}".format(token, thumbprint) - wellknown_path = os.path.join(acme_dir, token) - with open(wellknown_path, "w") as wellknown_file: - wellknown_file.write(keyauthorization) - - # check that the file is in place - try: - wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) - assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization) - except (AssertionError, ValueError) as e: - raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) + challenge = None + wellknown_path = None + if challenge_type == "http": + # find the http-01 challenge and write the challenge file + challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0] + token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) + keyauthorization = "{0}.{1}".format(token, thumbprint) + wellknown_path = os.path.join(acme_dir, token) + with open(wellknown_path, "w") as wellknown_file: + wellknown_file.write(keyauthorization) + + # check that the file is in place + try: + wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token) + assert(disable_check or _do_request(wellknown_url)[0] == keyauthorization) + except (AssertionError, ValueError) as e: + os.remove(wellknown_path) + raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e)) + elif challenge_type == "dns": + challenge = [c for c in authorization['challenges'] if c['type'] == "dns-01"][0] + token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) + keyauthorization = "{0}.{1}".format(token, thumbprint) + txtrecord = _b64(hashlib.sha256(keyauthorization).digest()) + subprocess.call([challenge_script, "--add", "--domain", domain, txtrecord]) + try: + subprocess.call(["host", "-t", "TXT", "_acme-challenge.{0}".format(domain)]) + assert(disable_check or True) # TODO + except AssertionError: + subprocess.call([challenge_script, "--remove", "--domain", domain, txtrecord]) + raise ValueError("Set up the DNS challenge, but couldn't verify: {0}".format(e)) # say the challenge is done _send_signed_request(challenge['url'], {}, "Error submitting challenges: {0}".format(domain)) authorization = _poll_until_not(auth_url, ["pending"], "Error checking challenge status for {0}".format(domain)) if authorization['status'] != "valid": raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization)) - os.remove(wellknown_path) + if challenge_type == "http": + os.remove(wellknown_path) + elif challenge_type == "dns": + subprocess.call([challenge_script, "--remove", "--domain", domain, txtrecord]) log.info("{0} verified!".format(domain)) # finalize the order with the csr @@ -188,10 +213,12 @@ def main(argv=None): parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!") parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key") + parser.add_argument("--challenge-type", required=False, default="http", help="type of ACME challenge, supported: http, dns") + parser.add_argument("--challenge-script", required=False, default=None, help="script to set up challenge on the server (required for dns challenge)") args = parser.parse_args(argv) LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, challenge_type=args.challenge_type, challenge_script=args.challenge_script) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover