diff --git a/Dockerfile b/Dockerfile index 6519824..12867af 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,6 +8,7 @@ EXPOSE 80 25 587 ARG DEBIAN_FRONTEND=noninteractive RUN echo "postfix postfix/mailname string ${MAIL_DOMAIN}" | debconf-set-selections && \ echo "postfix postfix/main_mailer_type string 'Internet Site'" | debconf-set-selections +# https://stackoverflow.com/a/51752997/9878135 We'll use `gnupg` instead of `gnupg2` RUN apt-get update \ && apt-get -y install libpq-dev gcc \ && apt install python3 python3-pip gunicorn3 gnupg2 postfix postfix-pgsql postfix-policyd-spf-python opendkim opendkim-tools dnsutils -y \ diff --git a/alembic/versions/052c520f18a8_.py b/alembic/versions/052c520f18a8_.py new file mode 100644 index 0000000..87090ab --- /dev/null +++ b/alembic/versions/052c520f18a8_.py @@ -0,0 +1,28 @@ +"""empty message + +Revision ID: 052c520f18a8 +Revises: db19ec3696ae +Create Date: 2023-04-08 17:13:48.415749 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '052c520f18a8' +down_revision = 'db19ec3696ae' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('user_preferences', sa.Column('email_gpg_public_key', sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('user_preferences', 'email_gpg_public_key') + # ### end Alembic commands ### diff --git a/app/constants.py b/app/constants.py index 3e32e9a..9ecf905 100644 --- a/app/constants.py +++ b/app/constants.py @@ -62,7 +62,8 @@ PUBLIC_KEY_MAX_LENGTH = 10_000 ENCRYPTED_NOTES_MAX_LENGTH = 10_000 ENCRYPTED_PASSWORD_MAX_LENGTH = 6_000 -PUBLIC_KEY_REGEX = r"-----((BEGIN PUBLIC KEY)|(BEGIN PGP PUBLIC KEY BLOCK))-----(.*)-----((END PUBLIC KEY)|(END PGP PUBLIC KEY BLOCK))-----" +PUBLIC_KEY_REGEX = \ + r"^-+((BEGIN PUBLIC KEY)|(BEGIN PGP PUBLIC KEY BLOCK))-+[\w\s\/+=]*-+((END PUBLIC KEY)|(END PGP PUBLIC KEY BLOCK))-+$" ACCESS_TOKEN_COOKIE_NAME = "access_token_cookie" REFRESH_TOKEN_COOKIE_NAME = "refresh_token_cookie" EMAIL_REPORT_ENCRYPTED_CONTENT_MAX_LENGTH = 200_000 @@ -86,5 +87,8 @@ API_KEY_HEADER_REGEX = re.compile(r"^Api-Key (.*)$") API_KEY_MAX_LABEL_LENGTH = 80 +GPG_AUTO_LOCATE_KEY_TYPE_REGEX = r"^pub (\w+) ([0-9\-]+) \[\w+\]$" +GPG_AUTO_LOCATE_KEY_FINGERPRINT_REGEX = r"^ +(\w+)$" +GPG_AUTO_LOCATE_KEY_EMAIL_REGEX = rf"^uid +\[[\w\s]+\] ({EMAIL_REGEX[1:-1]}) <{EMAIL_REGEX[1:-1]}>$" TESTING_DB = None diff --git a/app/controllers/user_preferences.py b/app/controllers/user_preferences.py index fe6d2e5..c414ba6 100644 --- a/app/controllers/user_preferences.py +++ b/app/controllers/user_preferences.py @@ -30,7 +30,7 @@ def update_user_preferences( user = preferences.user - update_data = update.dict(exclude_unset=True, exclude_none=True) + update_data = update.dict(exclude_unset=True) update_all = update_data.pop("update_all_instances", None) for key, value in update_data.items(): diff --git a/app/default_life_constants.py b/app/default_life_constants.py index f30f568..ae93665 100644 --- a/app/default_life_constants.py +++ b/app/default_life_constants.py @@ -123,3 +123,4 @@ API_KEY_LENGTH = 36 API_KEY_MAX_DAYS = 365 ALLOW_REGISTRATIONS = "True" +ENABLE_PGP_KEY_DISCOVERY = "True" diff --git a/app/gpg_handler.py b/app/gpg_handler.py index 68b6fd2..381ea6d 100644 --- a/app/gpg_handler.py +++ b/app/gpg_handler.py @@ -1,8 +1,7 @@ import base64 import sys -from pretty_bad_protocol import gnupg -from pretty_bad_protocol._parsers import ImportResult +import gnupg from app import life_constants @@ -10,31 +9,43 @@ "gpg", "encrypt_message", "SERVER_PUBLIC_KEY", - "sign_message" + "sign_message", + "get_public_key_from_fingerprint", ] + PATHS = { "darwin": "/opt/homebrew/bin/gpg" } -gpg = gnupg.GPG(PATHS[sys.platform] if sys.platform in PATHS else None) +gpg = gnupg.GPG(PATHS[sys.platform] if sys.platform in PATHS else "gpg") gpg.encoding = "utf-8" -__private_key: ImportResult = gpg.import_keys( +__private_key: gnupg.ImportResult = gpg.import_keys( base64.b64decode(life_constants.SERVER_PRIVATE_KEY).decode("utf-8") ) SERVER_PUBLIC_KEY = gpg.export_keys(__private_key.fingerprints[0]) -def sign_message(message: str) -> str: +def sign_message(message: str, clearsign: bool = True, detach: bool = True) -> str: return gpg.sign( message, - default_key=__private_key.fingerprints[0], - clearsign=True, + keyid=__private_key.fingerprints[0], + clearsign=clearsign, + detach=detach, ) -def encrypt_message(message: str, public_key_in_str: str) -> str: - public_key = gpg.import_keys(public_key_in_str) +def encrypt_message(message: str, public_key_in_str: str) -> gnupg.Crypt: + public_key: gnupg.ImportResult = gpg.import_keys(public_key_in_str) + + result = gpg.trust_keys(public_key.fingerprints[0], "TRUST_ULTIMATE") + + if not public_key.fingerprints: + raise ValueError("This is not a valid PGP public key.") return gpg.encrypt(message, public_key.fingerprints[0]) + + +def get_public_key_from_fingerprint(fingerprint: str) -> gnupg.GPG: + return gpg.export_keys(fingerprint, minimal=True) diff --git a/app/life_constants.py b/app/life_constants.py index 5ee33f2..4c8a8a7 100644 --- a/app/life_constants.py +++ b/app/life_constants.py @@ -66,6 +66,7 @@ "API_KEY_LENGTH", "API_KEY_MAX_DAYS", "ALLOW_REGISTRATIONS", + "ENABLE_PGP_KEY_DISCOVERY", ] load_dotenv() @@ -183,3 +184,4 @@ def get_list(name: str, default: list = None) -> list: API_KEY_LENGTH = get_int("API_KEY_LENGTH") API_KEY_MAX_DAYS = get_int("API_KEY_MAX_DAYS") ALLOW_REGISTRATIONS = get_bool("ALLOW_REGISTRATIONS") +ENABLE_PGP_KEY_DISCOVERY = get_bool("ENABLE_PGP_KEY_DISCOVERY") diff --git a/app/mails/send_email_login_token.py b/app/mails/send_email_login_token.py index d884ee4..bd8ab09 100644 --- a/app/mails/send_email_login_token.py +++ b/app/mails/send_email_login_token.py @@ -18,6 +18,7 @@ def send_email_login_token(user: User, token: str) -> None: "code": token, "server_url": life_constants.APP_DOMAIN, }, + gpg_public_key=user.preferences.email_gpg_public_key, ), to_mail=user.email.address, ) diff --git a/app/models/user_preferences.py b/app/models/user_preferences.py index e26baba..b93e96d 100644 --- a/app/models/user_preferences.py +++ b/app/models/user_preferences.py @@ -33,6 +33,11 @@ class UserPreferences(Base, IDMixin): UUID(as_uuid=True), ForeignKey("user.id"), ) + email_gpg_public_key = sa.Column( + sa.String, + default=None, + nullable=True, + ) alias_remove_trackers = sa.Column( sa.Boolean, diff --git a/app/routes/server.py b/app/routes/server.py index 8abef11..62f075c 100644 --- a/app/routes/server.py +++ b/app/routes/server.py @@ -44,6 +44,7 @@ def get_settings( "max_aliases_per_user": settings.get(db, "MAX_ALIASES_PER_USER"), "api_key_max_days": life_constants.API_KEY_MAX_DAYS, "allow_registrations": settings.get(db, "ALLOW_REGISTRATIONS"), + "allow_pgp_key_discovery": life_constants.ENABLE_PGP_KEY_DISCOVERY, } diff --git a/app/routes/user_preference.py b/app/routes/user_preference.py index 6dae2f6..72f50ac 100644 --- a/app/routes/user_preference.py +++ b/app/routes/user_preference.py @@ -1,11 +1,21 @@ -from fastapi import APIRouter, Depends +from datetime import date + +from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session +from starlette.responses import JSONResponse +from app import gpg_handler, life_constants from app.controllers.user_preferences import update_user_preferences from app.database.dependencies import get_db -from app.dependencies.auth import AuthResult, get_auth +from app.dependencies.auth import AuthResult, AuthResultMethod, get_auth from app.models.enums.api_key import APIKeyScope -from app.schemas.user_preferences import UserPreferencesUpdate +from app.schemas._basic import HTTPBadRequestExceptionModel, HTTPNotFoundExceptionModel +from app.schemas.user_preferences import ( + FindPublicKeyGPGKeyDiscoveryDisabledResponseModel, + FindPublicKeyResponseModel, UserPreferencesUpdate, +) +from app.utils.email import normalize_email +from email_utils.web_key_discovery import find_public_key router = APIRouter() @@ -13,6 +23,12 @@ @router.patch( "/", response_model=None, + responses={ + 403: { + "model": HTTPBadRequestExceptionModel, + "description": "You cannot update your GPG public key with an API key." + } + } ) def update_user_preferences_api( update: UserPreferencesUpdate, @@ -22,6 +38,12 @@ def update_user_preferences_api( )), db: Session = Depends(get_db), ): + if update.email_gpg_public_key is not None and auth.method == AuthResultMethod.API_KEY: + raise HTTPException( + status_code=403, + detail="You cannot update your GPG public key with an API key." + ) + update_user_preferences( db, preferences=auth.user.preferences, @@ -31,3 +53,51 @@ def update_user_preferences_api( return { "detail": "Updated preferences successfully!" } + + +@router.post( + "/find-public-key", + response_model=FindPublicKeyResponseModel, + responses={ + 404: { + "model": HTTPNotFoundExceptionModel, + "description": "No public key found for the email address." + }, + 202: { + "model": FindPublicKeyGPGKeyDiscoveryDisabledResponseModel, + "description": "PGP key discovery is disabled." + } + } +) +async def find_public_key_api( + auth: AuthResult = Depends(get_auth( + allow_api=True, + api_key_scope=APIKeyScope.PREFERENCES_READ, + )), +): + if not life_constants.ENABLE_PGP_KEY_DISCOVERY: + return JSONResponse({ + "detail": "PGP key discovery is disabled." + }, status_code=202) + + result = find_public_key(auth.user.email.address) + + if result is None: + return JSONResponse({ + "detail": "No public key found for the email address." + }, status_code=404) + + normalized_result_email = await normalize_email(result.raw_email) + + if normalized_result_email != auth.user.email.address: + return JSONResponse({ + "detail": "No public key found for the email address." + }, status_code=404) + + public_key = gpg_handler.get_public_key_from_fingerprint(result.fingerprint) + + return { + "public_key": str(public_key), + "type": result.type, + "created_at": result.created_at + } diff --git a/app/schemas/server.py b/app/schemas/server.py index 54b3c0d..7d0a4d3 100644 --- a/app/schemas/server.py +++ b/app/schemas/server.py @@ -31,6 +31,7 @@ class SettingsModel(BaseModel): allow_alias_deletion: bool api_key_max_days: int allow_registrations: bool + allow_pgp_key_discovery: bool class ServerStatisticsModel(BaseModel): diff --git a/app/schemas/user.py b/app/schemas/user.py index 71f8881..0b8d930 100644 --- a/app/schemas/user.py +++ b/app/schemas/user.py @@ -97,6 +97,7 @@ class UserPreferences(BaseModel): alias_proxy_user_agent: ProxyUserAgentType alias_expand_url_shorteners: bool alias_reject_on_privacy_leak: bool + email_gpg_public_key: Optional[str] class Config: orm_mode = True diff --git a/app/schemas/user_preferences.py b/app/schemas/user_preferences.py index 3dacdb7..47ffd9f 100644 --- a/app/schemas/user_preferences.py +++ b/app/schemas/user_preferences.py @@ -1,13 +1,24 @@ -from pydantic import BaseModel, Field, root_validator +from datetime import date +from typing import Optional +from pydantic import BaseModel, Field, root_validator, validator + +from app import constants, gpg_handler from app.models.enums.alias import ImageProxyFormatType, ProxyUserAgentType __all__ = [ "UserPreferencesUpdate", + "FindPublicKeyResponseModel", + "FindPublicKeyGPGKeyDiscoveryDisabledResponseModel", ] class UserPreferencesUpdate(BaseModel): + email_gpg_public_key: Optional[str] = Field( + None, + regex=constants.PUBLIC_KEY_REGEX, + max_length=constants.PUBLIC_KEY_MAX_LENGTH, + ) alias_remove_trackers: bool = None alias_create_mail_report: bool = None alias_proxy_images: bool = None @@ -21,7 +32,10 @@ class UserPreferencesUpdate(BaseModel): @root_validator() def validate_any_value_set(cls, values: dict) -> dict: data = values.copy() - data.pop("update_all_instances", None) + update_all_instances = data.pop("update_all_instances", False) + + if not update_all_instances: + return values if all( value is None @@ -30,3 +44,36 @@ def validate_any_value_set(cls, values: dict) -> dict: raise ValueError("You must set at least one preference to update.") return values + + @validator("email_gpg_public_key") + def validate_email_gpg_public_key(cls, value: Optional[str]) -> Optional[str]: + if not value: + return + + value = value.strip() + message = f"PGP verification. Your public key is: {value}" + + try: + result = gpg_handler.encrypt_message(message, value) + + if not result.ok: + raise ValueError( + "This is not a valid PGP public key; we could not encrypt a test message." + ) + except ValueError: + raise ValueError( + "This is not a valid PGP public key; we could not encrypt a test message." + ) + + return value + + +class FindPublicKeyResponseModel(BaseModel): + public_key: str + type: str + created_at: date + + +class FindPublicKeyGPGKeyDiscoveryDisabledResponseModel(BaseModel): + detail: str + code = "error:settings:gpg_disabled" diff --git a/email_utils/headers.py b/email_utils/headers.py index 8fc927a..354e572 100644 --- a/email_utils/headers.py +++ b/email_utils/headers.py @@ -25,6 +25,7 @@ RETURN_PATH = "Return-Path" X_SPAM_STATUS = "X-Spam-Status" KLECK_FORWARD_STATUS = "X-Kleck-Forward-Status" +CONTENT_DESCRIPTION = "Content-Description" # headers used to DKIM sign in order of preference DKIM_HEADERS = [ diff --git a/email_utils/send_mail.py b/email_utils/send_mail.py index 28aac01..ce4fadb 100644 --- a/email_utils/send_mail.py +++ b/email_utils/send_mail.py @@ -2,10 +2,11 @@ import time from email.message import Message from email.mime.multipart import MIMEMultipart +from email.mime.nonmultipart import MIMENonMultipart from email.mime.text import MIMEText from typing import Any, Optional -from app import life_constants, logger +from app import life_constants, logger, gpg_handler from . import formatters, headers from .bounce_messages import generate_forward_status, StatusType from .headers import set_header @@ -56,6 +57,32 @@ def _debug_email( logger.info("<===============> SEND email --- END --- <===============>") +# Create a email message +def _m( + klaas: Any, + *args, + extra_headers: dict[str, str] = None, + attachments: list[Message] = None, + payload: Optional[str] = None, + name: Optional[str] = None, + protocol: Optional[str] = None, +) -> Message: + attachments = attachments or [] + extra_headers = extra_headers or {} + message = klaas(*args, name=name, protocol=protocol) + + for header, value in extra_headers.items(): + set_header(message, header, value) + + if payload: + message.set_payload(payload) + + for attachment in attachments: + message.attach(attachment) + + return message + + def send_mail( message: Message, to_mail: str, @@ -101,6 +128,7 @@ def draft_message( template: str, bounce_status: StatusType = StatusType.OFFICIAL, context: dict[str, Any] = None, + gpg_public_key: str = None, ) -> Message: html = render( f"{template}.html", @@ -111,18 +139,136 @@ def draft_message( **context, ) - message = MIMEMultipart("alternative") - message.attach(MIMEText(html, "html")) - message.attach(MIMEText(plaintext, "plain")) + if gpg_public_key: + # Thanks to https://datawookie.dev/blog/2021/11/understanding-encrypted-email/ + content_message = _m( + MIMEMultipart, + "mixed", + attachments=[ + MIMEText(html, "html"), + MIMEText(plaintext, "plain"), + _m( + MIMENonMultipart, + "application", + "pgp-keys", + name="public_key.asc", + extra_headers={ + headers.CONTENT_DESCRIPTION: "OpenPGP public key", + headers.CONTENT_TRANSFER_ENCODING: "quoted-printable", + }, + payload=gpg_handler.SERVER_PUBLIC_KEY, + ), + ] + ) + decrypted_message = _m( + MIMEMultipart, + "application", + "signed", + protocol="application/pgp-signature", + attachments=[ + content_message, + _m( + MIMENonMultipart, + "application", + "pgp-signature", + name="signature.asc", + extra_headers={ + headers.CONTENT_DESCRIPTION: "OpenPGP digital signature", + }, + payload=str( + gpg_handler.sign_message( + content_message.as_string(), + clearsign=False, + detach=True, + ) + ), + ), + ], + ) + + message = _m( + MIMEMultipart, + "encrypted", + protocol="application/pgp-encrypted", + extra_headers={ + headers.SUBJECT: subject, + headers.DATE: formatters.format_date(), + headers.MIME_VERSION: "1.0", + }, + attachments=[ + MIMEText("Version: 1", "pgp-encrypted"), + _m( + MIMENonMultipart, + "application", + "octet-stream", + name="encrypted.asc", + extra_headers={ + headers.CONTENT_DESCRIPTION: "OpenPGP encrypted message", + }, + payload=str( + gpg_handler.encrypt_message( + decrypted_message.as_string(), + gpg_public_key, + ) + ), + ), + ] + ) + + else: + content_message = _m( + MIMEMultipart, + "alternative", + attachments=[ + MIMEText(html, "html"), + MIMEText(plaintext, "plain"), + _m( + MIMENonMultipart, + "application", + "pgp-keys", + name="public_key.asc", + extra_headers={ + headers.CONTENT_DESCRIPTION: "OpenPGP public key", + headers.CONTENT_TRANSFER_ENCODING: "quoted-printable", + }, + payload=gpg_handler.SERVER_PUBLIC_KEY, + ), + ], + ) + + message = _m( + MIMEMultipart, + "signed", + protocol="application/pgp-signature", + extra_headers={ + headers.SUBJECT: subject, + headers.DATE: formatters.format_date(), + headers.MIME_VERSION: "1.0", + }, + attachments=[ + content_message, + _m( + MIMENonMultipart, + "application", + "pgp-signature", + name="signature.asc", + extra_headers={ + headers.CONTENT_DESCRIPTION: "OpenPGP digital signature", + }, + payload=str( + gpg_handler.sign_message( + content_message.as_string(), + clearsign=False, + ) + ), + ), + ], + ) # Those headers will be replaced by `send_mail` message[headers.FROM] = "ReplaceMe" message[headers.TO] = "ReplaceMe" - message[headers.SUBJECT] = subject - message[headers.DATE] = formatters.format_date() - message[headers.MIME_VERSION] = "1.0" - message[headers.KLECK_FORWARD_STATUS] = generate_forward_status(bounce_status) return message diff --git a/email_utils/web_key_discovery.py b/email_utils/web_key_discovery.py new file mode 100644 index 0000000..dddc984 --- /dev/null +++ b/email_utils/web_key_discovery.py @@ -0,0 +1,66 @@ +import re +import subprocess +from dataclasses import dataclass +from datetime import date +from typing import Optional +from app import life_constants, constants + + +@dataclass +class PublicKeyResult: + fingerprint: str + type: str + created_at: date + # The raw email imported. This is not normalized + raw_email: str + + +def find_public_key(email: str) -> Optional[PublicKeyResult]: + """Try to find the public key for the given `email`. + Uses web key discovery to find the key. + If a key is found, it's fingerprint is returned. + + :raises ValueError: If the email is invalid. + """ + # We validate the email again to avoid any possible injection + email = email.strip() + if not re.match(constants.EMAIL_REGEX, email): + return None + + # We don't want to locate keys locally, as the user can't access the server internally. + # If there was a key locally, it is most likely fake. + process = subprocess.Popen( + ["gpg", "--auto-key-locate", "wkd,ntds,ldap,cert,dane,nodefault", "--locate-key", email], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = process.communicate() + + if process.returncode != 0: + raise ValueError(stderr.decode("utf-8")) + + value = stdout.decode("utf-8") + type_line, fingerprint_line, email_line, _, _ = value.splitlines() + + type_regex_result = re.match( + constants.GPG_AUTO_LOCATE_KEY_TYPE_REGEX, + type_line, + ) + fingerprint_regex_result = re.match( + constants.GPG_AUTO_LOCATE_KEY_FINGERPRINT_REGEX, + fingerprint_line, + ) + email_regex_result = re.match( + constants.GPG_AUTO_LOCATE_KEY_EMAIL_REGEX, + email_line, + ) + + if not all((type_regex_result, fingerprint_regex_result, email_regex_result)): + return None + + return PublicKeyResult( + fingerprint=fingerprint_regex_result.group(1), + type=type_regex_result.group(1), + created_at=date.fromisoformat(type_regex_result.group(2)), + raw_email=email_regex_result.group(1), + ) diff --git a/poetry.lock b/poetry.lock index 90f25d1..b3cbec0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -366,7 +366,7 @@ scylla-driver = ["scylla-driver (>=3.25.6,<4.0.0)"] [[package]] name = "fastapi-users" -version = "10.4.0" +version = "10.4.1" description = "Ready-to-use and customizable users management for FastAPI" category = "main" optional = false @@ -379,7 +379,7 @@ fastapi-users-db-sqlalchemy = {version = ">=4.0.0", optional = true, markers = " makefun = ">=1.11.2,<2.0.0" passlib = {version = "1.7.4", extras = ["bcrypt"]} pyjwt = {version = "2.6.0", extras = ["crypto"]} -python-multipart = "0.0.5" +python-multipart = "0.0.6" [package.extras] beanie = ["fastapi-users-db-beanie (>=1.0.0)"] @@ -796,6 +796,14 @@ python-versions = ">=3.7" [package.extras] cli = ["click (>=5.0)"] +[[package]] +name = "python-gnupg" +version = "0.5.0" +description = "A wrapper for the Gnu Privacy Guard (GPG or GnuPG)" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "python-jose" version = "3.3.0" @@ -817,14 +825,14 @@ pycryptodome = ["pycryptodome (>=3.3.1,<4.0.0)", "pyasn1"] [[package]] name = "python-multipart" -version = "0.0.5" +version = "0.0.6" description = "A streaming multipart parser for Python" category = "main" optional = false -python-versions = "*" +python-versions = ">=3.7" -[package.dependencies] -six = ">=1.4.0" +[package.extras] +dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pytest (==7.2.0)", "pyyaml (==5.1)"] [[package]] name = "pyyaml" @@ -1058,7 +1066,7 @@ python-versions = ">=3.7" [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "1b779ee65e2cf81d561c76c6152abb2fd647edee88a07f368bff37dcfec98833" +content-hash = "3b32d6ce25100febe7eed2a689ff799de57938d151632f74eea77280209c4d9d" [metadata.files] aiodns = [] @@ -1259,8 +1267,8 @@ fastapi-pagination = [ {file = "fastapi_pagination-0.11.4.tar.gz", hash = "sha256:9457b78cd5d0be590cbda506d5913f9d196a7b50768b7943cd0ad6a5f5aa090c"}, ] fastapi-users = [ - {file = "fastapi_users-10.4.0-py3-none-any.whl", hash = "sha256:c8edc0c16b3bca41c4b4cea479b85b8f37f6682414e8649e01a05c0b135dbe7d"}, - {file = "fastapi_users-10.4.0.tar.gz", hash = "sha256:7f77a2a74be8833daaba5d60f8e8c5b445bc26f26aecc445eea9d80e877d5629"}, + {file = "fastapi_users-10.4.1-py3-none-any.whl", hash = "sha256:42e24cd7dab59989c40b4beae970292d9a380ada234334d9d656488e533adc0f"}, + {file = "fastapi_users-10.4.1.tar.gz", hash = "sha256:cbc589278ce35b4e6218ec00987c64bb4dfbfbde39c192d8327b51eda171b9f1"}, ] fastapi-users-db-sqlalchemy = [ {file = "fastapi_users_db_sqlalchemy-4.0.5-py3-none-any.whl", hash = "sha256:83c955cdd3a3c8c95077c087cecdb51d067906ad397e2f3febf02784bcb5d13b"}, @@ -1806,8 +1814,15 @@ python-dotenv = [ {file = "python-dotenv-0.21.1.tar.gz", hash = "sha256:1c93de8f636cde3ce377292818d0e440b6e45a82f215c3744979151fa8151c49"}, {file = "python_dotenv-0.21.1-py3-none-any.whl", hash = "sha256:41e12e0318bebc859fcc4d97d4db8d20ad21721a6aa5047dd59f090391cb549a"}, ] +python-gnupg = [ + {file = "python-gnupg-0.5.0.tar.gz", hash = "sha256:70758e387fc0e0c4badbcb394f61acbe68b34970a8fed7e0f7c89469fe17912a"}, + {file = "python_gnupg-0.5.0-py2.py3-none-any.whl", hash = "sha256:345723a03e67b82aba0ea8ae2328b2e4a3906fbe2c18c4082285c3b01068f270"}, +] python-jose = [] -python-multipart = [] +python-multipart = [ + {file = "python_multipart-0.0.6-py3-none-any.whl", hash = "sha256:ee698bab5ef148b0a760751c261902cd096e57e10558e11aca17646b74ee1c18"}, + {file = "python_multipart-0.0.6.tar.gz", hash = "sha256:e9925a80bb668529f1b67c7fdb0a5dacdd7cbfc6fb0bff3ea443fe22bdd62132"}, +] pyyaml = [] requests = [ {file = "requests-2.28.2-py3-none-any.whl", hash = "sha256:64299f4909223da747622c030b781c0d7811e359c37124b4bd368fb8c6518baa"}, diff --git a/pyproject.toml b/pyproject.toml index 5ebb87b..9224c4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ gunicorn = "^20.1.0" python-dotenv = "^0.21.0" dkimpy = "^1.0.5" pyotp = "^2.8.0" +python-gnupg = "^0.5.0" [tool.poetry.dev-dependencies] pytest = "^7.1.3" diff --git a/tests/conftest.py b/tests/conftest.py index 125b971..e64952b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,11 +76,11 @@ def client(db): @pytest.fixture def create_email(db): - def _method() -> Email: + def _method(address: str = None) -> Email: return create_item( db, { - "address": f"mail.{random.randint(10000, 99999)}@example.com", + "address": address or f"mail.{random.randint(10000, 99999)}@example.com", "is_verified": False, "token": "abc", }, @@ -92,8 +92,8 @@ def _method() -> Email: @pytest.fixture def create_user(db, create_email): - def _method(is_verified=False, is_admin=False, password=None) -> User: - email = create_email() + def _method(is_verified=False, is_admin=False, email=None, password=None) -> User: + email = create_email(email) user = create_item( db, { diff --git a/tests/test_preferences.py b/tests/test_preferences.py index 55cc019..9ff3f68 100644 --- a/tests/test_preferences.py +++ b/tests/test_preferences.py @@ -1,6 +1,23 @@ +import base64 + from starlette.testclient import TestClient +PUBLIC_KEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- + +xjMEY1BWURYJKwYBBAHaRw8BAQdAF7V3c/or1pqMSO+K1NdlTzX8M3OWMIsM +fRaXjBpKcfrNG0pvbiBTbWl0aCA8am9uQGV4YW1wbGUuY29tPsKMBBAWCgA+ +BQJjUFZRBAsJBwgJEPYcYW9Bd+RzAxUICgQWAAIBAhkBAhsDAh4BFiEEoXES +0EdOGluYpuHr9hxhb0F35HMAAHPbAQDKUYRKK4fBmx0oY51NFngIWlgh37r2 +jh43FGyEfPtiiAD/ar4x4hYdzdTgstCd5IgHGN0rHePn8buFQ+BTclK3UwjO +OARjUFZREgorBgEEAZdVAQUBAQdAfNbp3wadPhBZd8PA0RuQbsWLQMkKozDF +x/vu1H34bQQDAQgHwngEGBYIACoFAmNQVlEJEPYcYW9Bd+RzAhsMFiEEoXES +0EdOGluYpuHr9hxhb0F35HMAAPaXAP90baEdk1ughlSfxwr1/qdXYasj4eXD +CY/XrzMgKvnwawEAtMHyvho1Les1B+7jJsKpNmOjssHIbDSeWTc0Dl8hugc= +=/9/F +-----END PGP PUBLIC KEY BLOCK-----""" + + def test_user_can_update_single_preferences( create_user, create_auth_tokens, @@ -47,3 +64,91 @@ def test_user_can_update_single_preferences_with_instances_update( assert response.status_code == 200 assert user.preferences.alias_proxy_images is False assert user.email_aliases[0].pref_proxy_images is False + + +def test_can_update_gpg_key_preference( + create_user, + create_auth_tokens, + client: TestClient, +) -> None: + user = create_user(is_verified=True) + auth = create_auth_tokens(user) + + response = client.patch( + "/v1/preferences/", + json={ + "email_gpg_public_key": PUBLIC_KEY, + }, + headers=auth["headers"], + ) + print(response.json()) + + assert response.status_code == 200 + + +def test_can_not_update_preferences_with_invalid_public_key( + create_user, + create_auth_tokens, + client: TestClient, +) -> None: + user = create_user(is_verified=True) + auth = create_auth_tokens(user) + + invalid_public_key = """-----BEGIN PGP PUBLIC KEY BLOCK----- +Not a valid public key +-----END PGP PUBLIC KEY BLOCK-----""" + + response = client.patch( + "/v1/preferences/", + json={ + "email_gpg_public_key": invalid_public_key, + }, + headers=auth["headers"], + ) + + assert response.status_code == 422 + + +def test_can_find_public_key( + create_user, + create_auth_tokens, + client: TestClient, +): + # "proton.me" strangely gives for every email address a public key + email = "no-reply@protonmail.com" + + user = create_user(is_verified=True, email=email) + auth = create_auth_tokens(user) + + response = client.post( + "/v1/preferences/find-public-key", + headers=auth["headers"], + ) + + assert response.status_code == 200 + assert response.json()["public_key"] is not None + + +def test_can_not_find_public_key_for_nonexistent_mail( + create_user, + create_auth_tokens, + client: TestClient, +): + # Please do not register this email just to fuck up our tests + # We base64 encode it so that bots don't register the domain + email = base64.b64decode( + "c2FoY3lpeGl1YnV2aWZk" + "aGlqYnNoem" + "5mdWhiY2l2eEBkb25vdHJlZ2lzdGVydGhpc2RvbWFpbnV" + "pamFzaWRta256anVma2hhc2RmaGp2eGNuaGp1eXhuY3YuY29t" + ) + + user = create_user(is_verified=True, email=email) + auth = create_auth_tokens(user) + + response = client.post( + "/v1/preferences/find-public-key", + headers=auth["headers"], + ) + + assert response.status_code == 404