Skip to content

Commit

Permalink
[fix] sanitizing certificates.py and unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
grindsa committed Feb 28, 2023
1 parent e859fc9 commit bb77401
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
2 changes: 1 addition & 1 deletion acme_srv/authorization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" Order class """
# pylint: disable=C0209
# pylint: disable=C0209, R0913
from __future__ import print_function
import json
from acme_srv.db_handler import DBstore
Expand Down
4 changes: 2 additions & 2 deletions acme_srv/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
""" certificate class """
from __future__ import print_function
import json
from acme_srv.helper import b64_url_recode, generate_random_string, cert_san_get, cert_extensions_get, hooks_load, uts_now, uts_to_date_utc, date_to_uts_utc, load_config, csr_san_get, csr_extensions_get, cert_dates_get, ca_handler_load, error_dic_get
from acme_srv.helper import b64_url_recode, generate_random_string, cert_san_get, cert_extensions_get, hooks_load, uts_now, uts_to_date_utc, date_to_uts_utc, load_config, csr_san_get, csr_extensions_get, cert_dates_get, ca_handler_load, error_dic_get, string_sanitize
from acme_srv.db_handler import DBstore
from acme_srv.message import Message
from acme_srv.threadwithreturnvalue import ThreadWithReturnValue
Expand Down Expand Up @@ -782,7 +782,7 @@ def enroll_and_store(self, certificate_name, csr, order_name=None):

def new_get(self, url):
""" get request """
certificate_name = url.replace('{0}{1}'.format(self.server_name, self.path_dic['cert_path']), '')
certificate_name = string_sanitize(self.logger, url.replace('{0}{1}'.format(self.server_name, self.path_dic['cert_path']), ''))
self.logger.debug('Certificate.new_get({0})'.format(certificate_name))
# fetch certificate dictionary from DB
certificate_dic = self._info(certificate_name, ['name', 'csr', 'cert', 'order__name', 'order__status_id'])
Expand Down
13 changes: 6 additions & 7 deletions acme_srv/helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/python
# pylint: disable=c0209, c0302, e0401, r0913
# -*- coding: utf-8 -*-
""" helper functions for acme2certifier """
# pylint: disable=c0209, e0401, r0913
from __future__ import print_function
import re
import base64
Expand All @@ -20,9 +19,9 @@
import ssl
import logging
import hashlib
import socks
from urllib.parse import urlparse, quote
from urllib3.util import connection
import socks
from jwcrypto import jwk, jws
from dateutil.parser import parse
import pytz
Expand Down Expand Up @@ -700,10 +699,10 @@ def string_sanitize(logger, unsafe_str):
allowed_range = set(range(32, 127))
safe_str = ''
for char in unsafe_str:
cp = ord(char)
if cp in allowed_range:
cp_ = ord(char)
if cp_ in allowed_range:
safe_str += char
elif cp == 9:
elif cp_ == 9:
safe_str += ' ' * 4
return re.sub(r'\s+', ' ', safe_str)

Expand Down Expand Up @@ -838,7 +837,7 @@ def url_get_with_own_dns(logger, url, verify=True):
connection._orig_create_connection = connection.create_connection
connection.create_connection = patched_create_connection
try:
req = requests.get(url, verify=verify, headers={'Connection': 'close', 'Accept-Encoding': 'gzip', 'User-Agent': USER_AGENT})
req = requests.get(url, verify=verify, headers={'Connection': 'close', 'Accept-Encoding': 'gzip', 'User-Agent': USER_AGENT}, timeout=20)
result = req.text
except Exception as err_:
result = None
Expand Down
33 changes: 32 additions & 1 deletion test/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def setUp(self):
patch.dict('sys.modules', modules).start()
import logging
logging.basicConfig(level=logging.CRITICAL)
from acme_srv.helper import b64decode_pad, b64_decode, b64_encode, b64_url_encode, b64_url_recode, convert_string_to_byte, convert_byte_to_string, decode_message, decode_deserialize, get_url, generate_random_string, signature_check, validate_email, uts_to_date_utc, date_to_uts_utc, load_config, cert_serial_get, cert_san_get, cert_dates_get, build_pem_file, date_to_datestr, datestr_to_date, dkeys_lower, csr_cn_get, cert_pubkey_get, csr_pubkey_get, url_get, url_get_with_own_dns, dns_server_list_load, csr_san_get, csr_extensions_get, fqdn_resolve, fqdn_in_san_check, sha256_hash, sha256_hash_hex, cert_der2pem, cert_pem2der, cert_extensions_get, csr_dn_get, logger_setup, logger_info, print_debug, jwk_thumbprint_get, allowed_gai_family, patched_create_connection, validate_csr, servercert_get, txt_get, proxystring_convert, proxy_check, handle_exception, ca_handler_load, eab_handler_load, hooks_load, error_dic_get, _logger_nonce_modify, _logger_certificate_modify, _logger_token_modify, _logger_challenges_modify, config_check, cert_issuer_get, cert_cn_get
from acme_srv.helper import b64decode_pad, b64_decode, b64_encode, b64_url_encode, b64_url_recode, convert_string_to_byte, convert_byte_to_string, decode_message, decode_deserialize, get_url, generate_random_string, signature_check, validate_email, uts_to_date_utc, date_to_uts_utc, load_config, cert_serial_get, cert_san_get, cert_dates_get, build_pem_file, date_to_datestr, datestr_to_date, dkeys_lower, csr_cn_get, cert_pubkey_get, csr_pubkey_get, url_get, url_get_with_own_dns, dns_server_list_load, csr_san_get, csr_extensions_get, fqdn_resolve, fqdn_in_san_check, sha256_hash, sha256_hash_hex, cert_der2pem, cert_pem2der, cert_extensions_get, csr_dn_get, logger_setup, logger_info, print_debug, jwk_thumbprint_get, allowed_gai_family, patched_create_connection, validate_csr, servercert_get, txt_get, proxystring_convert, proxy_check, handle_exception, ca_handler_load, eab_handler_load, hooks_load, error_dic_get, _logger_nonce_modify, _logger_certificate_modify, _logger_token_modify, _logger_challenges_modify, config_check, cert_issuer_get, cert_cn_get, string_sanitize
self.logger = logging.getLogger('test_a2c')
self.allowed_gai_family = allowed_gai_family
self.b64_decode = b64_decode
Expand Down Expand Up @@ -91,6 +91,7 @@ def setUp(self):
self.validate_csr = validate_csr
self.sha256_hash = sha256_hash
self.sha256_hash_hex = sha256_hash_hex
self.string_sanitize = string_sanitize
self.proxystring_convert = proxystring_convert
self.handle_exception = handle_exception

Expand Down Expand Up @@ -1737,5 +1738,35 @@ def test_0236_helper_cert_cn_get(self):
t+eRUDECE+0UnjyeCjTn3EU="""
self.assertEqual('foo.example.com', self.cert_cn_get(self.logger, cert))

def test_237_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'foo'
self.assertEqual('foo', self.string_sanitize(self.logger, unsafe_string))

def test_238_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'foo\n;'
self.assertEqual('foo;', self.string_sanitize(self.logger, unsafe_string))

def test_239_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'fooö'
self.assertEqual('foo', self.string_sanitize(self.logger, unsafe_string))

def test_240_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'fooö'
self.assertEqual('foo', self.string_sanitize(self.logger, unsafe_string))

def test_241_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'foo '
self.assertEqual('foo ', self.string_sanitize(self.logger, unsafe_string))

def test_242_logger_challenges_modify(self):
""" test string_sanitize() """
unsafe_string = 'foo\u0009'
self.assertEqual('foo ', self.string_sanitize(self.logger, unsafe_string))

if __name__ == '__main__':
unittest.main()

0 comments on commit bb77401

Please sign in to comment.