Skip to content
This repository has been archived by the owner on Jan 5, 2025. It is now read-only.

Commit

Permalink
feat: Add option ato redact email addresses in logs
Browse files Browse the repository at this point in the history
fix: Multiple "Re: " prefixes
fix: Regex only working with a single "Re: " or "Fwd: " prefix
  • Loading branch information
slashtechno committed May 18, 2024
1 parent fe2c78f commit 411e840
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 49 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ ALIAS = "LLMail"
# This message will be prepended to the message history sent to the LLM as a message from the system role
# Use this for customizing the behavior of the LLM and hence the nature of the responses
SYSTEM_PROMPT=
# Optionally redact email addresses in logs
# Set to "true" to redact email addresses or you can remove it (or set it to "false") to not redact email addresses
REDACT_EMAIL_ADDRESSES=true

OPENAI_API_KEY=""
# At least when writing this, openrouter.ai has a free tier
Expand Down
117 changes: 79 additions & 38 deletions llmail/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


from llmail.utils.cli_args import argparser
from llmail.utils.utils import set_primary_logger


class EmailThread:
Expand Down Expand Up @@ -48,11 +49,15 @@ def sort_replies(self):
# However, this **significantly** reduces complexity so for now, it's fine

def __repr__(self):
return f"EmailThread(initial_email={self.initial_email}, replies={self.replies})"
return (
f"EmailThread(initial_email={self.initial_email}, replies={self.replies})"
)


class Email:
def __init__(self, imap_id, message_id, subject, sender, timestamp, body, references):
def __init__(
self, imap_id, message_id, subject, sender, timestamp, body, references
):
self.imap_id = imap_id
self.message_id = message_id
self.subject = subject
Expand Down Expand Up @@ -90,10 +95,12 @@ def main():
bot_email = args.imap_username

# Set up logging
set_primary_logger(args.log_level)
ic(args)
set_primary_logger(args.log_level, args.redact_email_addresses)
logger.debug(args)
if args.watch_interval:
logger.info(f"Watching for new emails every {args.watch_interval} seconds")
logger.info(
f"Watching for new emails every {args.watch_interval} seconds"
)
while True:
fetch_and_process_emails(
look_for_subject=args.subject_key,
Expand Down Expand Up @@ -123,7 +130,11 @@ def fetch_and_process_emails(
client.login(args.imap_username, args.imap_password)

email_threads = {}
folders = args.folder if args.folder else [folder[2] for folder in client.list_folders()]
folders = (
args.folder
if args.folder
else [folder[2] for folder in client.list_folders()]
)
# for folder in client.list_folders():
# Disabling fetching from all folders due it not being inefficient
# Instead, just fetch from INBOX and get the threads later
Expand All @@ -136,19 +147,29 @@ def fetch_and_process_emails(
continue
# Might be smart to also search for forwarded emails
messages = client.search(
["OR", "SUBJECT", look_for_subject, "SUBJECT", f"Re: {look_for_subject}"]
[
"OR",
"SUBJECT",
look_for_subject,
"SUBJECT",
f"Re: {look_for_subject}",
]
)
for msg_id in messages:
# TODO: It seems this will throw a KeyError if an email is sent while this for loop is running. May have been fixed by emptying email_threads at the end of the while loop? This should be tested again to confirm
msg_data = client.fetch([msg_id], ["ENVELOPE", "BODY[]", "RFC822.HEADER"])
msg_data = client.fetch(
[msg_id], ["ENVELOPE", "BODY[]", "RFC822.HEADER"]
)
envelope = msg_data[msg_id][b"ENVELOPE"]
subject = envelope.subject.decode()
# Use regex to verify that the subject optionally starts with "Fwd: " or "Re: " and then the intended subject (nothing case-sensitive)
# re.escape is used to escape any special characters in the subject
if not re.match(
r"^(Fwd: ?|Re: ?)?" + re.escape(look_for_subject) + r"$", subject, re.IGNORECASE
r"^(Fwd: ?|Re: ?)*" + re.escape(look_for_subject) + r"$",
subject,
re.IGNORECASE,
):
logger.info(
logger.warning(
f"Skipping email with subject '{subject}' as it does not match the intended subject"
)
continue
Expand Down Expand Up @@ -216,8 +237,7 @@ def fetch_and_process_emails(
f"Created new thread for email {message_id} sent at {timestamp}"
)

# ic([thread for thread in email_threads.values()])
ic(email_threads)
logger.debug(email_threads)
# Check if there are any emails wherein the last email in the thread is a user email
# If so, send a reply
for message_id, email_thread in email_threads.items():
Expand All @@ -228,15 +248,23 @@ def fetch_and_process_emails(
message_id = email_thread.initial_email.message_id
msg_id = email_thread.initial_email.imap_id
references_ids = email_thread.initial_email.references
elif len(email_thread.replies) > 0 and email_thread.replies[-1].sender != bot_email:
elif (
len(email_thread.replies) > 0
and email_thread.replies[-1].sender != bot_email
):
logger.debug(
f"Last email in thread for email {message_id} is from {email_thread.replies[-1].sender}"
)
message_id = email_thread.replies[-1].message_id
msg_id = email_thread.replies[-1].imap_id
references_ids = email_thread.replies[-1].references
elif len(email_thread.replies) > 0 and email_thread.replies[-1].sender == bot_email:
logger.debug(f"Last email in thread for email {message_id} is from the bot")
elif (
len(email_thread.replies) > 0
and email_thread.replies[-1].sender == bot_email
):
logger.debug(
f"Last email in thread for email {message_id} is from the bot"
)
continue
else:
ValueError("Invalid email thread")
Expand Down Expand Up @@ -289,7 +317,11 @@ def get_thread_history(
)
for email in message_identifier.replies:
thread_history.append(
{"sender": email.sender, "content": email.body, "timestamp": email.timestamp}
{
"sender": email.sender,
"content": email.body,
"timestamp": email.timestamp,
}
)
return thread_history
elif isinstance(message_identifier, int) or isinstance(message_identifier, str):
Expand Down Expand Up @@ -335,15 +367,19 @@ def get_thread_history(
{
"sender": get_sender(message)["email"],
"content": get_plain_email_content(message),
"timestamp": make_tz_aware(parsedate_to_datetime(message.get("Date"))),
"timestamp": make_tz_aware(
parsedate_to_datetime(message.get("Date"))
),
}
)
message = prev_message
# Sort the thread history by timestamp
thread_history = sorted(thread_history, key=lambda x: x["timestamp"])
return thread_history
else:
raise TypeError("Invalid type for message. Must be an int, str, or EmailThread object.")
raise TypeError(
"Invalid type for message. Must be an int, str, or EmailThread object."
)


def get_sender(message: Message) -> dict:
Expand All @@ -365,7 +401,9 @@ def get_top_level_email(client, msg_id, message_id=None):

# Extract the References header and split it into individual message IDs
references_header = headers.get("References", "")
references_ids = [m_id.strip() for m_id in references_header.split() if m_id.strip()]
references_ids = [
m_id.strip() for m_id in references_header.split() if m_id.strip()
]

# Extract the first message ID, which represents the top-level email in the thread
# If it doesn't exist, use the current message ID. Not msg_id since msg_id is only for IMAP
Expand Down Expand Up @@ -413,14 +451,6 @@ def get_uid_from_message_id(imap_client, message_id):
return None


def set_primary_logger(log_level):
"""Set up the primary logger with the specified log level. Output to stderr and use the format specified."""
logger.remove()
# ^10 is a formatting directive to center with a padding of 10
logger_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> |<level>{level: ^10}</level>| <level>{message}</level>"
logger.add(stderr, format=logger_format, colorize=True, level=log_level)


def send_reply(
thread: list[dict],
subject: str,
Expand All @@ -440,15 +470,6 @@ def send_reply(
if system_prompt:
thread.insert(0, {"role": "system", "content": system_prompt})
references_ids.append(message_id)
# thread_from_msg_id = get_thread_history(client, msg_id)
# logger.debug(f"Thread history (message_identifier): {thread_from_msg_id}")
# logger.debug(f"Thread history length (message_identifier): {len(thread_from_msg_id)}")
# thread_from_object = get_thread_history(client, email_threads[list(email_threads.keys())[-1]])
# logger.debug(f"Thread history (EmailThread object): {thread_from_object}")
# logger.debug(f"Thread history length (EmailThread object): {len(thread_from_object)}")
logger.info(f"Sending reply to email {message_id} to {sender}")
logger.debug(f"Thread history: {thread}")
logger.debug(f"Thread history length: {len(thread)}")
generated_response = openai.chat.completions.create(
model=model,
messages=thread,
Expand All @@ -463,13 +484,23 @@ def send_reply(
)
yag.send(
to=sender,
# subject=f"Re: {subject}" if not subject.startswith("Re: ") else subject,
subject=f"Re: {subject}",
headers={"In-Reply-To": message_id, "References": " ".join(references_ids)},
contents=generated_response,
message_id=make_msgid(
domain=args.message_id_domain if args.message_id_domain else "llmail"
),
)
# thread_from_msg_id = get_thread_history(client, msg_id)
# logger.debug(f"Thread history (message_identifier): {thread_from_msg_id}")
# logger.debug(f"Thread history length (message_identifier): {len(thread_from_msg_id)}")
# thread_from_object = get_thread_history(client, email_threads[list(email_threads.keys())[-1]])
# logger.debug(f"Thread history (EmailThread object): {thread_from_object}")
# logger.debug(f"Thread history length (EmailThread object): {len(thread_from_object)}")
logger.info(f"Sending reply to email {message_id} to {sender}")
logger.debug(f"Thread history: {thread}")
logger.debug(f"Thread history length: {len(thread)}")


def get_plain_email_content(message: Message | str) -> str:
Expand All @@ -485,15 +516,25 @@ def get_plain_email_content(message: Message | str) -> str:
try:
body = part.get_payload(decode=True)
except UnicodeDecodeError:
logger.debug("UnicodeDecodeError occurred. Trying to get payload as string.")
logger.debug(
"UnicodeDecodeError occurred. Trying to get payload as string."
)
body = str(part.get_payload())
if content_type == "text/plain":
markdown = html2text.html2text(str(body.decode("unicode_escape"))).strip()
markdown = html2text.html2text(
str(body.decode("unicode_escape"))
).strip()
# logger.debug(f"Converted to markdown: {markdown}")
# if len(markdown) < 5:
# logger.warning(
# f"Content is less than 5 characters | Content: {markdown}"
# )
return markdown
else:
logger.debug("Message is not multipart. Getting payload as string.")
body = message.get_payload(decode=True).decode()
# if len(body) < 5:
# logger.warning(f"Content is less than 5 characters | Content: {body}")
return html2text.html2text(str(body.decode("unicode_escape")))


Expand Down
54 changes: 43 additions & 11 deletions llmail/utils/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,38 @@ def set_argparse():
title="Subcommands",
)
# Subcommand: list-folders
_ = subparsers.add_parser("list-folders", help="List all folders in the IMAP account and exit")
_ = subparsers.add_parser(
"list-folders", help="List all folders in the IMAP account and exit"
)
# General arguments
argparser.add_argument(
"--log-level",
"-l",
help="Log level",
default=os.getenv("LOG_LEVEL") if os.getenv("LOG_LEVEL") else "INFO",
)
argparser.add_argument(
"--redact-email-addresses",
help="Replace email addresses with '[redacted]' in logs",
action="store_true",
default=(
True
if (
os.getenv("REDACT_EMAIL_ADDRESSES")
and os.getenv("REDACT_EMAIL_ADDRESSES").lower() == "true"
and os.getenv("REDACT_EMAIL_ADDRESSES").lower() != "false"
)
else False
),
)
argparser.add_argument(
"--watch-interval",
"-w",
help="Interval in seconds to check for new emails. If not set, will only check once.",
type=int,
default=int(os.getenv("WATCH_INTERVAL")) if os.getenv("WATCH_INTERVAL") else None,
default=(
int(os.getenv("WATCH_INTERVAL")) if os.getenv("WATCH_INTERVAL") else None
),
)
# OpenAI-compatible API arguments
ai_api = argparser.add_argument_group("OpenAI-compatible API")
Expand All @@ -65,9 +83,11 @@ def set_argparse():
ai_api.add_argument(
"--openai-model",
help="Model to use for the LLM",
default=os.getenv("OPENAI_MODEL")
if os.getenv("OPENAI_MODEL")
else "mistralai/mistral-7b-instruct:free",
default=(
os.getenv("OPENAI_MODEL")
if os.getenv("OPENAI_MODEL")
else "mistralai/mistral-7b-instruct:free"
),
)
ai_api.add_argument(
"--system-prompt",
Expand All @@ -88,16 +108,22 @@ def set_argparse():
"--subject-key",
"-s",
help="Emails with this subject will be replied to",
default=os.getenv("SUBJECT_KEY") if os.getenv("SUBJECT_KEY") else "llmail autoreply",
default=(
os.getenv("SUBJECT_KEY") if os.getenv("SUBJECT_KEY") else "llmail autoreply"
),
)
email.add_argument(
"--alias",
help="Name to use in the 'From' in addition to the email address",
default=os.getenv("ALIAS") if os.getenv("ALIAS") else "LLMail",
)
imap = email.add_argument_group("IMAP")
imap.add_argument("--imap-host", help="IMAP server hostname", default=os.getenv("IMAP_HOST"))
imap.add_argument("--imap-port", help="IMAP server port", default=os.getenv("IMAP_PORT"))
imap.add_argument(
"--imap-host", help="IMAP server hostname", default=os.getenv("IMAP_HOST")
)
imap.add_argument(
"--imap-port", help="IMAP server port", default=os.getenv("IMAP_PORT")
)
imap.add_argument(
"--imap-username",
help="IMAP server username",
Expand All @@ -109,8 +135,12 @@ def set_argparse():
default=os.getenv("IMAP_PASSWORD"),
)
smtp = email.add_argument_group("SMTP")
smtp.add_argument("--smtp-host", help="SMTP server hostname", default=os.getenv("SMTP_HOST"))
smtp.add_argument("--smtp-port", help="SMTP server port", default=os.getenv("SMTP_PORT"))
smtp.add_argument(
"--smtp-host", help="SMTP server hostname", default=os.getenv("SMTP_HOST")
)
smtp.add_argument(
"--smtp-port", help="SMTP server port", default=os.getenv("SMTP_PORT")
)
smtp.add_argument(
"--smtp-username",
help="SMTP server username",
Expand All @@ -124,7 +154,9 @@ def set_argparse():
smtp.add_argument(
"--message-id-domain",
help="Domain to use for Message-ID header",
default=os.getenv("MESSAGE_ID_DOMAIN") if os.getenv("MESSAGE_ID_DOMAIN") else None,
default=(
os.getenv("MESSAGE_ID_DOMAIN") if os.getenv("MESSAGE_ID_DOMAIN") else None
),
)

check_required_args(
Expand Down
21 changes: 21 additions & 0 deletions llmail/utils/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import re
from loguru import logger
from sys import stderr

logging_file = stderr


def redact_email_sink(message: str):
"""Custom sink function that redacts email addresses before logging."""
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
redacted_message = re.sub(email_pattern, "[redacted]", message)
print(redacted_message, file=logging_file)


def set_primary_logger(log_level, redact_email_addresses):
"""Set up the primary logger with the specified log level. Output to stderr and use the format specified."""
logger.remove()
# ^10 is a formatting directive to center with a padding of 10
logger_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> |<level>{level: ^10}</level>| <level>{message}</level>"
sink = redact_email_sink if redact_email_addresses else stderr
logger.add(sink=sink, format=logger_format, colorize=True, level=log_level)

0 comments on commit 411e840

Please sign in to comment.