From ed097ade2916f388833bcdcf5d0a913f3efbecc2 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Fri, 15 Nov 2024 15:43:58 -0800 Subject: [PATCH] [2/2] Fix: Content Security Policy (CSP) Not Implemented (DataBiosphere/azul-private#6) --- lambdas/service/app.py | 8 +- src/azul/chalice.py | 146 ++------------------------ src/azul/csp.py | 214 +++++++++++++++++++++++++++++++++++++++ test/integration_test.py | 33 ++++-- test/test_doctests.py | 4 +- 5 files changed, 257 insertions(+), 148 deletions(-) create mode 100644 src/azul/csp.py diff --git a/lambdas/service/app.py b/lambdas/service/app.py index de670f370..824f1350a 100644 --- a/lambdas/service/app.py +++ b/lambdas/service/app.py @@ -57,6 +57,9 @@ from azul.collections import ( OrderedSet, ) +from azul.csp import ( + CSP, +) from azul.drs import ( AccessMethod, ) @@ -490,14 +493,15 @@ def manifest_url(self, def oauth2_redirect(): file_name = 'oauth2-redirect.html.template.mustache' template = app.load_static_resource('swagger', file_name) - nonce = app.csp_nonce() + nonce = CSP.new_nonce() html = chevron.render(template, { 'CSP_NONCE': json.dumps(nonce) }) + csp = CSP.for_azul(nonce) return Response(status_code=200, headers={ 'Content-Type': 'text/html', - 'Content-Security-Policy': app.content_security_policy(nonce) + 'Content-Security-Policy': str(csp) }, body=html) diff --git a/src/azul/chalice.py b/src/azul/chalice.py index 1960d24de..f5d1e4876 100644 --- a/src/azul/chalice.py +++ b/src/azul/chalice.py @@ -1,7 +1,6 @@ from abc import ( ABCMeta, ) -import base64 from collections.abc import ( Iterable, ) @@ -16,8 +15,6 @@ import mimetypes import os import pathlib -import re -import secrets from typing import ( Any, Iterator, @@ -61,6 +58,9 @@ from azul.collections import ( deep_dict_merge, ) +from azul.csp import ( + CSP, +) from azul.enums import ( auto, ) @@ -76,7 +76,6 @@ ) from azul.strings import ( join_words as jw, - single_quote as sq, ) from azul.types import ( JSON, @@ -208,137 +207,6 @@ def _api_gateway_context_middleware(self, event, get_response): finally: config.lambda_is_handling_api_gateway_request = False - @classmethod - def csp_nonce(cls) -> str: - """ - Return a randomly generated nonce value for use in a Content Security - Policy header. - """ - return base64.b64encode(secrets.token_bytes(32)).decode('ascii').rstrip('=') - - @classmethod - def content_security_policy(cls, nonce: str | None = None) -> str: - """ - >>> from azul.doctests import assert_json - >>> assert_json(AzulChaliceApp.content_security_policy(None).split(';')) - [ - "default-src 'self'", - "img-src 'self' data:", - "script-src 'self'", - "style-src 'self'", - "frame-ancestors 'none'" - ] - - >>> assert_json(AzulChaliceApp.content_security_policy(nonce='foo').split(';')) - [ - "default-src 'self'", - "img-src 'self' data:", - "script-src 'self' 'nonce-foo'", - "style-src 'self' 'nonce-foo'", - "frame-ancestors 'none'" - ] - """ - self_ = sq('self') - none = sq('none') - nonce = [] if nonce is None else [sq('nonce-' + nonce)] - - return ';'.join([ - jw('default-src', self_), - jw('img-src', self_, 'data:'), - jw('script-src', self_, *nonce), - jw('style-src', self_, *nonce), - jw('frame-ancestors', none), - ]) - - @classmethod - def validate_csp(cls, csp: str, has_nonce: bool) -> str: - """ - Raise an exception if the CSP is invalid, otherwise return the validated - CSP. - - >>> cls = AzulChaliceApp - >>> cls.validate_csp("default-src 'self';img-src 'self' data:;" - ... "script-src 'self';" - ... "style-src 'self';" - ... "frame-ancestors 'none'", has_nonce=False) - "default-src 'self';img-src 'self' data:;script-src 'self';style-src 'self';frame-ancestors 'none'" - - Fails if nonce violates the RFC - - >>> cls.validate_csp("default-src 'self';img-src 'self' data:;" - ... "script-src 'self' 'nonce-1234567890123456789012345678901234567890***';" - ... "style-src 'self' 'nonce-1234567890123456789012345678901234567890***';" - ... "frame-ancestors 'none'", has_nonce=True) - Traceback (most recent call last): - ... - AssertionError: 'nonce-1234567890123456789012345678901234567890***' - - Fails if nonce is shorter than expected - - >>> cls.validate_csp("default-src 'self';img-src 'self' data:;" - ... "script-src 'self' 'nonce-1234567890';" - ... "style-src 'self' 'nonce-1234567890';" - ... "frame-ancestors 'none'", has_nonce=True) - Traceback (most recent call last): - ... - AssertionError: 'nonce-1234567890' - - Fails if nonce is longer than expected - - >>> cls.validate_csp("default-src 'self';img-src 'self' data:;" - ... "script-src 'self' 'nonce-12345678901234567890123456789012345678901234567890';" - ... "style-src 'self' 'nonce-12345678901234567890123456789012345678901234567890';" - ... "frame-ancestors 'none'", has_nonce=True) - Traceback (most recent call last): - ... - AssertionError: 'nonce-12345678901234567890123456789012345678901234567890' - """ - # https://www.w3.org/TR/CSP2/#policy-syntax - directive_re = re.compile(r'[ \t]*([a-zA-Z0-9-]+)' - # Space, tab and any visible character - # (0x21-0xFE) except for comma (0x2C) or - # semicolon (0x3B). - r'(?:[ \t]([ \t\x21-\x2B\x2D-\x3A\x3C-\xFE]*))?') - nonce_re = re.compile(r"'nonce-([a-zA-Z0-9+/]{43})'") - expected_directives = [ - 'default-src', - 'frame-ancestors', - 'img-src', - 'script-src', - 'style-src', - ] - expected_expressions = [ - sq('none'), - sq('self'), - 'data:', - ] - directives = list() - expressions = list() - nonces = dict() - - for directive in csp.split(';'): - match = directive_re.fullmatch(directive) - assert match is not None - name, value = match.groups() - assert name not in directives, name - directives.append(name) - for expression in value.split(' '): - if expression in expected_expressions: - expressions.append(expression) - else: - match = nonce_re.fullmatch(expression) - assert match is not None, expression - assert name not in nonces, name - nonces[name] = match.group(1) - if has_nonce: - assert ['script-src', 'style-src'] == sorted(nonces.keys()), nonces.keys() - assert len(set(nonces.values())) == 1, sorted(set(nonces.values())) - else: - assert nonces == {}, nonces - assert expected_directives == sorted(directives), sorted(directives) - assert expected_expressions == sorted(set(expressions)), sorted(set(expressions)) - return csp - @classmethod def security_headers(cls) -> dict[str, str]: """ @@ -347,8 +215,9 @@ def security_headers(cls) -> dict[str, str]: addresses known security vulnerabilities. """ hsts_max_age = 60 * 60 * 24 * 365 * 2 + csp = CSP.for_azul() return { - 'Content-Security-Policy': cls.content_security_policy(), + 'Content-Security-Policy': str(csp), 'Referrer-Policy': 'strict-origin-when-cross-origin', 'Strict-Transport-Security': jw(f'max-age={hsts_max_age};', 'includeSubDomains;', @@ -667,7 +536,7 @@ def swagger_ui(self) -> Response: base_url = self.base_url redirect_url = furl(base_url).add(path='oauth2_redirect') deployment_url = furl(base_url).add(path='openapi') - nonce = self.csp_nonce() + nonce = CSP.new_nonce() html = chevron.render(template, { 'CSP_NONCE': json.dumps(nonce), 'DEPLOYMENT_PATH': json.dumps(str(deployment_url.path)), @@ -678,10 +547,11 @@ def swagger_ui(self) -> Response: for path, method in self.non_interactive_routes ]) }) + csp = CSP.for_azul(nonce) return Response(status_code=200, headers={ 'Content-Type': 'text/html', - 'Content-Security-Policy': self.content_security_policy(nonce) + 'Content-Security-Policy': str(csp) }, body=html) diff --git a/src/azul/csp.py b/src/azul/csp.py new file mode 100644 index 000000000..717767953 --- /dev/null +++ b/src/azul/csp.py @@ -0,0 +1,214 @@ +import base64 +from collections import ( + defaultdict, +) +import logging +import re +import secrets +from typing import ( + Self, +) + +import attrs +from more_itertools import ( + only, + prepend, +) + +from azul import ( + require, +) +from azul.strings import ( + single_quote as sq, +) + +log = logging.getLogger(__name__) + + +@attrs.frozen +class CSP: + directives: dict[str, list[str]] + + @classmethod + def for_azul(cls, nonce: str | None = None) -> Self: + self, none, data = sq('self'), sq('none'), 'data:' + nonce = [] if nonce is None else [sq('nonce-' + nonce)] + return cls({ + 'default-src': [self], + 'img-src': [self, data], + 'script-src': [self, *nonce], + 'style-src': [self, *nonce], + 'frame-ancestors': [none] + }) + + @classmethod + def new_nonce(cls) -> str: + """ + A random nonce for use in a CSP. + """ + return base64.b64encode(secrets.token_bytes(32)).decode('ascii').rstrip('=') + + @classmethod + def parse(cls, csp: str) -> Self: + """( + + Parse the given CSP or raise RequirementError if it is not syntactically + valid against the specification at https://www.w3.org/TR/CSP2. + + >>> from azul.doctests import ( + ... assert_json, + ... ) + + >>> def parse(s): return CSP.parse(s).directives + + A valid CSP: + + >>> valid_csp = "img-src 'self' data:;frame-ancestors 'none'" + >>> assert_json(parse(valid_csp)) + { + "img-src": [ + "'self'", + "data:" + ], + "frame-ancestors": [ + "'none'" + ] + } + + Insignificant whitespace is removed: + + >>> fluffy_csp = " \timg-src\t'self' data:\t;\tframe-ancestors\t 'none' \t" + >>> parse(valid_csp) == parse(fluffy_csp) + True + + Multiple multiple directives of the same name are consolidated: + + >>> assert_json(parse("img-src data:;img-src 'self':")) + { + "img-src": [ + "data:", + "'self':" + ] + } + + Invalid CSPs: + + >>> parse(";") + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid directive', '') + + >>> parse('img_src;') + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid directive', 'img_src') + + >>> parse('img-src a,b') + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid directive', 'img-src a,b') + """ + # https://www.w3.org/TR/CSP2/#policy-syntax + directive_re = re.compile(r'[ \t]*([a-zA-Z0-9-]+)' + # Space, tab and any visible character + # (0x21-0xFE) except for comma (0x2C) or + # semicolon (0x3B). + r'(?:[ \t]([ \t\x21-\x2B\x2D-\x3A\x3C-\xFE]*))?') + wsp_re = re.compile(r'[ \t]+') + directives = defaultdict(list) + for directive in csp.split(';'): + match = directive_re.fullmatch(directive) + require(match is not None, 'Invalid directive', directive) + name, values = match.groups() + values = [] if values is None else filter(None, wsp_re.split(values)) + directives[name].extend(values) + return cls(directives) + + # Matches only Azul nonces, specifically + nonce_re = re.compile(sq(r'nonce-([a-zA-Z0-9+/]{43})')) + + def validate(self): + """ + Validate the directive values against a subset of the Source List + grammar from the specification. Of that grammar, only the productions + used in CSPs for Azul are supported. + + >>> def validate(s): return CSP.parse(s).validate() + + >>> valid = ('0a+/' * 11)[:43] + >>> validate(f"script-src 'self' 'nonce-{valid}'") + + Disallowed characters in nonce: + + >>> invalid = valid.replace('+','*') + >>> validate(f"script-src 'self' 'nonce-{invalid}'") + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid value', "'nonce-0a*/0a*/0a*/0a*/0a*/0a*/0a*/0a*/0a*/0a*/0a*'") + + Nonce is too short: + + >>> invalid = valid[:-1] + >>> validate(f"script-src 'self' 'nonce-{invalid}'") + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid value', "'nonce-0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a'") + + Nonce is too long: + + >>> invalid = valid + '/' + >>> validate(f"script-src 'self' 'nonce-{invalid}'") + Traceback (most recent call last): + ... + azul.RequirementError: ('Invalid value', "'nonce-0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/0a+/'") + + Other invalid combinations: + + >>> validate("frame-ancestors 'none' 'none'") + Traceback (most recent call last): + ... + azul.RequirementError: ("'none' can only appear alone", ["'none'", "'none'"]) + + >>> validate("frame-ancestors 'self' 'none'") + Traceback (most recent call last): + ... + azul.RequirementError: ("'none' can only appear alone", ["'self'", "'none'"]) + + >>> validate("img-src 'self' data: 'self'") + Traceback (most recent call last): + ... + azul.RequirementError: ('Duplicated value', ["'self'", 'data:', "'self'"]) + """ + self_, none, data = sq('self'), sq('none'), 'data:' + value_res = prepend(self.nonce_re.pattern, map(re.escape, [self_, none, data])) + value_re = re.compile('|'.join(value_res)) + for name, values in self.directives.items(): + for value in values: + match = value_re.fullmatch(value) + require(match is not None, 'Invalid value', value) + require(values == [none] or none not in values, + f'{none} can only appear alone', values) + require(len(values) == len(set(values)), 'Duplicated value', values) + + def nonce(self) -> str | None: + """ + Extract the Azul nonce from this CSP, if present. If there are multiple + occurrances of a nonce, they must all be equal. + """ + return only(set( + value + for name, values in self.directives.items() + for value in values + if self.nonce_re.fullmatch(value) is not None + )) + + def __str__(self) -> str: + """ + >>> s = "img-src 'self' data:;frame-ancestors 'none'" + >>> s == str(CSP.parse(s)) + True + """ + return ';'.join( + ' '.join(value for value in prepend(name, values)) + for name, values in self.directives.items() + ) diff --git a/test/integration_test.py b/test/integration_test.py index d1a89ca7b..38f49f733 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -106,6 +106,9 @@ from azul.collections import ( alist, ) +from azul.csp import ( + CSP, +) from azul.drs import ( AccessMethod, ) @@ -2002,18 +2005,36 @@ def test_response_security_headers(self): } for endpoint in (config.service_endpoint, config.indexer_endpoint): for path, cache_control in test_cases.items(): - expected_headers = {'Cache-Control': cache_control} with self.subTest(endpoint=endpoint, path=path): if path == '/oauth2_redirect' and endpoint == config.indexer_endpoint: pass # no oauth2 endpoint on indexer Lambda else: response = requests.get(str(endpoint / path)) response.raise_for_status() - expected = AzulChaliceApp.security_headers() | expected_headers - csp = AzulChaliceApp.validate_csp(response.headers['Content-Security-Policy'], - has_nonce=path in ['/', '/oauth2_redirect']) - expected['Content-Security-Policy'] = csp - self.assertIsSubset(expected.items(), response.headers.items()) + actual_csp = response.headers['Content-Security-Policy'] + parsed_csp = CSP.parse(actual_csp) + parsed_csp.validate() + nonce = parsed_csp.nonce() + # We only expect a CSP nonce for specific endpoints. + self.assertIs(nonce is None, path not in ['/', '/oauth2_redirect']) + expected_headers = { + # The fact that most headers are hard-coded in + # security_headers() gives us license to use that + # method here to compose the expected value, even + # though it constitutes code under test. There is + # not much that can break in that method, and even + # if one of the literals in it had an error, that + # error would likely be repeated in a literal here. + **AzulChaliceApp.security_headers(), + 'Cache-Control': cache_control, + # The random nonce in the actual CSP makes it hard + # to compose an expected value for it. Instead, we + # parse and validate the actual CSP, then serialize + # it again and interpolate the result into the + # expected value. + 'Content-Security-Policy': str(parsed_csp) + } + self.assertIsSubset(expected_headers.items(), response.headers.items()) def test_default_4xx_response_headers(self): for endpoint in (config.service_endpoint, config.indexer_endpoint): diff --git a/test/test_doctests.py b/test/test_doctests.py index f9cd8188e..048b13d8b 100644 --- a/test/test_doctests.py +++ b/test/test_doctests.py @@ -8,8 +8,8 @@ import azul.bigquery import azul.bytes import azul.caching -import azul.chalice import azul.collections +import azul.csp import azul.docker import azul.doctests import azul.dss @@ -72,8 +72,8 @@ def load_tests(_loader, azul.bigquery, azul.bytes, azul.caching, - azul.chalice, azul.collections, + azul.csp, azul.doctests, azul.docker, azul.dss,