Skip to content

Commit

Permalink
feat(sage_imap): add html support to send email message
Browse files Browse the repository at this point in the history
also we increase small email message security robus
  • Loading branch information
sepehr-akbarzadeh committed Jul 13, 2024
1 parent de85d57 commit 8d317d9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 42 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ setuptools = "^70.2.0"
wheel = "^0.43.0"
twine = "^5.1.1"
sphinx-rtd-theme = "^2.0.0"
requests = "^2.32.3"

[tool.poetry.group.dev.dependencies]
black = "^24.4.2"
Expand Down
107 changes: 75 additions & 32 deletions sage_imap/services/email.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
import re
import smtplib
import socket
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from pathlib import Path
from typing import Any, Dict, List, Optional

import requests
Expand All @@ -20,29 +22,34 @@
SpamResult,
)

EmailAddress = str
HeaderDict = Dict[str, str]
AttachmentList = List[Path]


class SmartEmailMessage:
def __init__(
self,
subject: str,
body: str,
from_email: Optional[str] = None,
to: Optional[List[str]] = None,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
extra_headers: Optional[Dict[str, str]] = None,
from_email: Optional[EmailAddress] = None,
to: Optional[List[EmailAddress]] = None,
cc: Optional[List[EmailAddress]] = None,
bcc: Optional[List[EmailAddress]] = None,
extra_headers: Optional[HeaderDict] = None,
body_html: Optional[str] = None, # Add body_html parameter
**kwargs: Any,
):
logging.info("Initializing SmartEmailMessage")
self.subject = subject
self.body = body
self.body_html = body_html # Store HTML body
self.from_email = from_email
self.to = to or []
self.cc = cc or []
self.bcc = bcc or []
self.to = self._sanitize_email_list(to)
self.cc = self._sanitize_email_list(cc)
self.bcc = self._sanitize_email_list(bcc)

# Combine additional attributes into dictionaries
self.attachments = kwargs.get("attachments", [])
self.attachments: AttachmentList = kwargs.get("attachments", [])

self.message_id = self._generate_message_id()
self.date = self._generate_date()
Expand All @@ -53,8 +60,7 @@ def __init__(
self.update_attachment_status()
self.update_content_type_and_encoding()

# Default headers
self.default_headers = {
self.default_headers: HeaderDict = {
"MIME-Version": "1.0",
"Content-Type": self.content_type,
"Content-Transfer-Encoding": self.content_transfer_encoding,
Expand All @@ -67,14 +73,14 @@ def __init__(
"References": "",
"Reply-To": "",
"X-Originating-IP": self.originating_ip,
"X-Priority": Priority.NORMAL.value,
"X-Priority": Priority.NORMAL,
"X-MS-Has-Attach": self.has_attach,
"X-Report-Abuse-To": "",
"X-Spamd-Result": SpamResult.DEFAULT.value,
"X-Auto-Response-Suppress": AutoResponseSuppress.ALL.value,
"X-Spamd-Result": SpamResult.DEFAULT,
"X-Auto-Response-Suppress": AutoResponseSuppress.ALL,
}

self.extra_headers = self.merge_headers(extra_headers or {})
self.extra_headers: HeaderDict = self.merge_headers(extra_headers or {})
self.validate_headers()

def _generate_message_id(self) -> str:
Expand All @@ -88,7 +94,7 @@ def _generate_date(self) -> str:
def _get_originating_ip(self) -> str:
logging.debug("Getting originating IP")
try:
ip = requests.get("https://api.ipify.org", timeout=5).text
ip = requests.get("https://api.ipify.org", timeout=5, verify=True).text
logging.info(f"Originating IP: {ip}")
return ip
except requests.RequestException as e:
Expand All @@ -102,11 +108,14 @@ def update_attachment_status(self) -> None:
def update_content_type_and_encoding(self) -> None:
logging.debug("Updating content type and encoding")
if self.attachments:
self.content_type = ContentType.MULTIPART.value
self.content_transfer_encoding = ContentTransferEncoding.BASE64.value
self.content_type = "multipart/mixed"
self.content_transfer_encoding = ContentTransferEncoding.BASE64
elif self.body_html:
self.content_type = "multipart/alternative"
self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT
else:
self.content_type = ContentType.PLAIN.value
self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT.value
self.content_type = ContentType.PLAIN
self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT

def _generate_received_header(self) -> str:
logging.debug("Generating received header")
Expand All @@ -128,7 +137,7 @@ def _generate_received_header(self) -> str:
)
return header

def merge_headers(self, extra_headers: Dict[str, str]) -> Dict[str, str]:
def merge_headers(self, extra_headers: HeaderDict) -> HeaderDict:
logging.debug("Merging extra headers with default headers")
headers = self.default_headers.copy()
headers.update(extra_headers)
Expand All @@ -137,19 +146,19 @@ def merge_headers(self, extra_headers: Dict[str, str]) -> Dict[str, str]:
def validate_headers(self) -> None:
logging.debug("Validating headers")
priority = self.extra_headers.get("X-Priority")
if priority and priority not in Priority._value2member_map_:
if priority and priority != Priority.NORMAL:
logging.error(f"Invalid X-Priority header value: {priority}")
raise EmailException(f"Invalid X-Priority header value: {priority}")

spamd_result = self.extra_headers.get("X-Spamd-Result")
if spamd_result and spamd_result not in SpamResult._value2member_map_:
if spamd_result and spamd_result != SpamResult.DEFAULT:
logging.error(f"Invalid X-Spamd-Result header value: {spamd_result}")
raise EmailException(f"Invalid X-Spamd-Result header value: {spamd_result}")

auto_response_suppress = self.extra_headers.get("X-Auto-Response-Suppress")
if (
auto_response_suppress
and auto_response_suppress not in AutoResponseSuppress._value2member_map_
and auto_response_suppress != AutoResponseSuppress.ALL
):
logging.error(
f"Invalid X-Auto-Response-Suppress header value: {auto_response_suppress}"
Expand All @@ -159,15 +168,23 @@ def validate_headers(self) -> None:
)

content_type = self.extra_headers.get("Content-Type")
if content_type and content_type not in ContentType._value2member_map_:
valid_content_types = [
ContentType.PLAIN,
"multipart/mixed",
"multipart/alternative",
]
if content_type and content_type not in valid_content_types:
logging.error(f"Invalid Content-Type header value: {content_type}")
raise EmailException(f"Invalid Content-Type header value: {content_type}")

content_transfer_encoding = self.extra_headers.get("Content-Transfer-Encoding")
valid_encodings = [
ContentTransferEncoding.SEVEN_BIT,
ContentTransferEncoding.BASE64,
]
if (
content_transfer_encoding
and content_transfer_encoding
not in ContentTransferEncoding._value2member_map_
and content_transfer_encoding not in valid_encodings
):
logging.error(
f"Invalid Content-Transfer-Encoding header value: {content_transfer_encoding}"
Expand All @@ -182,12 +199,16 @@ def send(
smtp_port: int,
smtp_user: str,
smtp_password: str,
use_tls: bool = False,
use_tls: bool = True, # Default to TLS
use_ssl: bool = False,
) -> None:
logging.info("Sending email")
try:
msg = MIMEMultipart()
if self.body_html:
msg = MIMEMultipart("alternative")
else:
msg = MIMEMultipart()

msg["Subject"] = self.subject
msg["From"] = self.from_email
msg["To"] = ", ".join(self.to)
Expand All @@ -200,16 +221,23 @@ def send(
msg.add_header(header, value)

msg.attach(MIMEText(self.body, "plain"))
if self.body_html:
msg.attach(MIMEText(self.body_html, "html"))

for attachment_path in self.attachments:
if not os.path.isfile(attachment_path):
logging.error(f"Attachment file does not exist: {attachment_path}")
raise EmailException(
f"Attachment file does not exist: {attachment_path}"
)

with open(attachment_path, "rb") as file:
part = MIMEBase("application", "octet-stream")
part.set_payload(file.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
"attachment",
filename=os.path.basename(attachment_path),
f'attachment; filename="{os.path.basename(attachment_path)}"',
)
msg.attach(part)

Expand All @@ -232,3 +260,18 @@ def send(
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
raise EmailException(f"An unexpected error occurred: {e}")

def _sanitize_email_list(
self, email_list: Optional[List[EmailAddress]]
) -> List[EmailAddress]:
email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
if not email_list:
return []
sanitized_list: List[EmailAddress] = []
for email in email_list:
sanitized_email = email.strip()
if not email_regex.match(sanitized_email):
logging.error(f"Invalid email address: {sanitized_email}")
raise EmailException(f"Invalid email address: {sanitized_email}")
sanitized_list.append(sanitized_email)
return sanitized_list
14 changes: 5 additions & 9 deletions tests/services/test_email.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import pytest
import requests
import smtplib
from unittest.mock import patch, MagicMock
from unittest.mock import patch
from sage_imap.exceptions import EmailException
from sage_imap.helpers.email import (
AutoResponseSuppress,
ContentTransferEncoding,
ContentType,
Priority,
SpamResult,
)
from email.mime.multipart import MIMEMultipart
from sage_imap.services.email import SmartEmailMessage # Correct import path

# Sample data for testing
Expand Down Expand Up @@ -70,14 +66,14 @@ def test_update_attachment_status(email_message):

def test_update_content_type_and_encoding_with_attachments(email_message):
email_message.update_content_type_and_encoding()
assert email_message.content_type == ContentType.MULTIPART.value
assert email_message.content_transfer_encoding == ContentTransferEncoding.BASE64.value
assert email_message.content_type == "multipart/mixed"
assert email_message.content_transfer_encoding == ContentTransferEncoding.BASE64

def test_update_content_type_and_encoding_without_attachments():
email_message = SmartEmailMessage(subject=sample_subject, body=sample_body)
email_message.update_content_type_and_encoding()
assert email_message.content_type == ContentType.PLAIN.value
assert email_message.content_transfer_encoding == ContentTransferEncoding.SEVEN_BIT.value
assert email_message.content_type == ContentType.PLAIN
assert email_message.content_transfer_encoding == ContentTransferEncoding.SEVEN_BIT

def test_merge_headers(email_message):
merged_headers = email_message.merge_headers({"X-New-Header": "NewValue"})
Expand Down

0 comments on commit 8d317d9

Please sign in to comment.