Skip to content

Commit

Permalink
fix(sage_imap): fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
sepehr-akbarzadeh committed Jul 15, 2024
2 parents 48dcb61 + 71becd0 commit 723140c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
29 changes: 29 additions & 0 deletions sage_imap/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,35 @@ class IMAPMailboxFetchError(IMAPMailboxError):
default_code = "fetch_error"


class IMAPEmptyFileError(IMAPClientError):
"""Exception raised when the specified file is empty."""

status_code = 400
default_detail = "The specified file is empty."
default_code = "empty_file_error"

class IMAPInvalidEmailDateError(IMAPClientError):
"""Exception raised when the email does not have a valid Date header."""

status_code = 400
default_detail = "The email does not have a valid Date header."
default_code = "invalid_email_date_error"

class IMAPAppendError(IMAPMailboxError):
"""Exception raised when appending to the IMAP server fails."""

status_code = 500
default_detail = "Failed to append to the IMAP server."
default_code = "imap_append_error"

class IMAPMailboxUploadError(IMAPMailboxError):
"""Exception raised for upload file"""

status_code = 500
default_detail = "Failed to upload email file."
default_code = "upload_error"


class EmailException(Exception):
"""Custom exception for email related errors."""

Expand Down
156 changes: 156 additions & 0 deletions sage_imap/services/mailbox.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import email
import imaplib
import logging
import os
import time
from email import policy
from email.utils import parsedate_to_datetime
from typing import Any, List, Optional

from sage_imap.exceptions import (
IMAPAppendError,
IMAPEmptyFileError,
IMAPInvalidEmailDateError,
IMAPMailboxCheckError,
IMAPMailboxClosureError,
IMAPMailboxDeleteError,
Expand All @@ -12,13 +20,15 @@
IMAPMailboxSaveSentError,
IMAPMailboxSelectionError,
IMAPMailboxStatusError,
IMAPMailboxUploadError,
IMAPSearchError,
)
from sage_imap.helpers.email import EmailIterator, EmailMessage
from sage_imap.helpers.flags import FlagCommand, Flags
from sage_imap.helpers.mailbox import DefaultMailboxes, MailboxStatusItems
from sage_imap.helpers.message import MessageParts, MessageSet
from sage_imap.helpers.search import IMAPSearchCriteria
from sage_imap.utils import is_english

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -637,3 +647,149 @@ def __combine_status_items(self, *status_items: MailboxStatusItems) -> str:
The combined status items string.
"""
return " ".join(item.value for item in status_items)

def upload_eml_file(
self,
file_path: str,
folder: str = DefaultMailboxes.INBOX,
flags: Optional[List[Flags]] = None,
max_file_size: int = 10 * 1024 * 1024,
strict_date_validation: bool = False,
) -> None:
"""
Uploads a .eml file to the specified folder on the IMAP server.
Purpose
-------
This method uploads the raw email data from a .eml file to the specified folder
on the IMAP server.
Parameters
----------
file_path : str
The path to the .eml file to be uploaded.
folder : str, optional
The name of the folder to upload the email to (default is DefaultMailboxes.INBOX).
flags : List[str], optional
The flags to set for the uploaded email (default is [Flags.SEEN]).
max_file_size : int, optional
The maximum allowed file size for the .eml file (default is 10 MB).
strict_date_validation : bool, optional
If True, raise an error if the email does not have a valid Date header. If False, use the current time.
Raises
------
FileNotFoundError
If the specified file does not exist or is not a file.
IMAPEmptyFileError
If the specified file is empty.
InvalidEmailDateError
If the email does not have a valid Date header and strict_date_validation is True.
IMAPAppendError
If the append operation to the IMAP server fails.
"""
try:
logger.debug("Uploading .eml file to folder: %s", folder)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
logger.error("File not found or is not a file: %s", file_path)
raise FileNotFoundError(f"File not found or is not a file: {file_path}")

if os.path.getsize(file_path) > max_file_size:
logger.error("File size exceeds limit: %s", file_path)
raise ValueError(
f"File size exceeds limit of {max_file_size / (1024 * 1024)} MB."
)

with open(file_path, "rb") as f:
raw_email = f.read()

if not raw_email:
logger.error("Empty file content: %s", file_path)
raise IMAPEmptyFileError("Empty file content.")

# Parse the email to extract the date and validate its structure
email_message = email.message_from_bytes(raw_email, policy=policy.default)
date_header = email_message.get("Date")

if date_header:
try:
date = parsedate_to_datetime(date_header)
date = imaplib.Time2Internaldate(time.mktime(date.timetuple()))
except (TypeError, ValueError):
logger.error("Invalid date value or format in Date header.")
if strict_date_validation:
raise IMAPInvalidEmailDateError(
"Email does not have a valid Date header."
)
else:
logger.warning(
"Using current time instead of invalid Date header."
)
date = imaplib.Time2Internaldate(time.time())
else:
if strict_date_validation:
logger.error("Email does not have a Date header.")
raise IMAPInvalidEmailDateError(
"Email does not have a Date header."
)
else:
logger.warning(
"Email does not have a Date header. Using current time instead."
)
date = imaplib.Time2Internaldate(time.time())

logger.debug("Email date: %s", date)

# Ensure all attachments and body are in UTF-8 and rename non-English filenames
attachment_count = 0
for part in email_message.walk():
content_disposition = str(part.get("Content-Disposition"))

if part.get_content_maintype() == "multipart":
continue

if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
if not is_english(filename):
attachment_count += 1
extension = os.path.splitext(filename)[1]
new_filename = f"attachment-{attachment_count}{extension}"
part.set_param(
"filename", new_filename, header="Content-Disposition"
)
part.set_param("name", new_filename, header="Content-Type")
logger.debug(
"Non-English attachment filename '%s' changed to '%s'",
filename,
new_filename,
)
else:
logger.debug("Attachment found: %s", filename)

if part.get_content_type() in ["text/plain", "text/html"]:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
try:
part.set_payload(payload.decode(charset), charset="utf-8")
except (UnicodeDecodeError, LookupError) as e:
logger.error("Error re-encoding part to UTF-8: %s", e)
raise ValueError("Failed to re-encode email part to UTF-8.")
logger.debug("Body part re-encoded to UTF-8.")

# Convert the email back to bytes after potential modifications
raw_email = email_message.as_bytes()

# Append the email to the specified folder
status, _ = self.client.append(folder, flags, date, raw_email) # type: ignore[attr-defined]
if status != "OK":
logger.error("Failed to upload .eml file to folder: %s", status)
raise IMAPAppendError("Failed to upload .eml file.")
logger.info("Uploaded .eml file to folder: %s", folder)

self.check() # Ensure all changes are synchronized
except Exception as e:
logger.error(
"Exception occurred while uploading .eml file to folder: %s", e
)
raise IMAPMailboxUploadError("Failed to upload .eml file.") from e
19 changes: 19 additions & 0 deletions sage_imap/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,22 @@ def read_eml_files_from_zip(zip_path: Path) -> EmailIterator:
email_message = EmailMessage.read_from_eml_bytes(eml_bytes)
email_list.append(email_message)
return EmailIterator(email_list)
def is_english(s: str) -> bool:
"""
Checks if a string contains only ASCII characters.
Parameters
----------
s : str
The string to check.
Returns
-------
bool
True if the string contains only ASCII characters, False otherwise.
"""
try:
s.encode('ascii')
except UnicodeEncodeError:
return False
return True

0 comments on commit 723140c

Please sign in to comment.