Skip to content

Commit

Permalink
revert f197688
Browse files Browse the repository at this point in the history
  • Loading branch information
boecks authored Oct 27, 2024
1 parent f197688 commit 446ac03
Showing 1 changed file with 42 additions and 40 deletions.
82 changes: 42 additions & 40 deletions smtp2mqtt/smtp2mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@
config[key] = config_defaults[key]
elif value.lower() in ["true", "false"]:
config[key] = value.lower() == "true"
elif key in ["SMTP_LISTEN_PORT", "SMTP_RELAY_PORT", "MQTT_PORT", "SMTP_RELAY_TIMEOUT_SECS"]:
try:
config[key] = int(value)
except ValueError:
config[key] = int(config_defaults[key] or 0)
else:
config[key] = value

# Logging configuration
log_level = logging.DEBUG if config["DEBUG"] else logging.INFO
Expand All @@ -64,10 +61,6 @@
ch.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(uuid)s - %(message)s"))
log.addHandler(ch)

def log_extra(uuid=None):
"""Generates a log extra dictionary with a UUID for logging."""
return {'uuid': uuid or "00000000"}

class SMTP2MQTTHandler:
"""Handles SMTP messages and publishes them to MQTT."""

Expand All @@ -80,33 +73,33 @@ def __init__(self, loop):

async def handle_DATA(self, server, session, envelope):
"""Handles incoming email messages."""
log_uuid = str(uuid.uuid4())[:8]
log.info("Received message from %s", envelope.mail_from, extra=log_extra(log_uuid))
log_extra = {'uuid': str(uuid.uuid4())[:8]}
log.info("Received message from %s", envelope.mail_from, extra=log_extra)

msg = email.message_from_bytes(envelope.original_content, policy=default)
payload = {
'uuid': log_uuid,
'uuid': log_extra['uuid'],
'headers': {k.lower(): v for k, v in msg.items()},
'mime_parts': []
}

# Extract message body and attachments
self.extract_body(msg, payload)
self.handle_attachments(msg, payload, log_extra(log_uuid))
self.handle_attachments(msg, payload, log_extra)

# MQTT Publishing (Only if MQTT_HOST is configured)
if config["MQTT_HOST"]:
topic = f"{config['MQTT_TOPIC']}/{envelope.mail_from.replace('/', '')}"
log.info("Publishing message to MQTT...", extra=log_extra(log_uuid))
await self.mqtt_publish(topic, json.dumps(payload), log_extra(log_uuid))
log.info("Publishing message to MQTT...", extra=log_extra)
await self.mqtt_publish(topic, json.dumps(payload), log_extra)
else:
log.info("MQTT publishing is disabled (no MQTT_HOST configured)", extra=log_extra(log_uuid))
log.info("MQTT publishing is disabled (no MQTT_HOST configured)", extra=log_extra)

# Relay the original message if SMTP relay is configured
if config["SMTP_RELAY_HOST"]:
self.smtp_relay(msg, envelope.mail_from, envelope.rcpt_tos, log_extra(log_uuid))
self.smtp_relay(msg, envelope.mail_from, envelope.rcpt_tos, log_extra)
else:
log.info("SMTP relaying is disabled (no SMTP_RELAY_HOST configured)", extra=log_extra(log_uuid))
log.info("SMTP relaying is disabled (no SMTP_RELAY_HOST configured)", extra=log_extra)

return "250 Message accepted for delivery"

Expand All @@ -122,7 +115,7 @@ def extract_body(self, msg, payload):
body_part = self.create_body_part(msg)
payload['mime_parts'].append(body_part)
except Exception as e:
log.error("Failed to extract body: %s", e, extra=log_extra(payload['uuid']))
log.error("Failed to extract body: %s", e, extra={'uuid': payload['uuid']})

def create_body_part(self, msg_part):
"""Creates a body part dictionary from a message part."""
Expand Down Expand Up @@ -157,7 +150,6 @@ def handle_attachments(self, msg, payload, log_extra):
def save_attachment(self, attachment, log_extra, content, mime_part):
"""Saves the attachment to the configured directory."""
if config["SAVE_ATTACHMENTS_DIR"]:
os.makedirs(config["SAVE_ATTACHMENTS_DIR"], exist_ok=True) # Ensure the directory exists
filename = f"{log_extra['uuid']}_{os.path.basename(attachment.get_filename())}"
file_path = os.path.join(config["SAVE_ATTACHMENTS_DIR"], filename)
log.info(f"Saving attachment to {file_path}", extra=log_extra)
Expand All @@ -171,6 +163,7 @@ def save_attachment(self, attachment, log_extra, content, mime_part):

async def mqtt_publish(self, topic, payload, log_extra):
"""Publishes the payload to the specified MQTT topic."""
# Ensure payload is a dictionary
try:
payload = json.loads(payload)
except json.JSONDecodeError:
Expand All @@ -179,8 +172,9 @@ async def mqtt_publish(self, topic, payload, log_extra):

message_uuid = payload.get("uuid")

# Check for duplicate messages
if message_uuid in self.published_message_uuids:
log.warning(f"Message with UUID {message_uuid} has already been published. Skipping...", extra=log_extra)
log.warning(f"Message with UUID {message_uuid} has already been published. Skipping...")
return

log.debug(f'Publishing to {topic}', extra=log_extra)
Expand Down Expand Up @@ -220,29 +214,37 @@ def smtp_relay(self, msg, mail_from, rcpt_tos, log_extra):

def set_quit(self, *args):
"""Handles termination signals to gracefully shut down the server."""
log.info("Quitting...", extra=log_extra("main thread"))
log.info("Quitting...", extra={'uuid': 'main thread'})
self.quit = True

def dummy_auth_function(server, session, envelope, mechanism, auth_data):
"""Dummy authentication function that always succeeds."""
log.info("Authenticating...", extra={'uuid': 'main thread'})
return AuthResult(success=True)

# Main execution logic
loop = asyncio.get_event_loop()
controller = Controller(
handler=SMTP2MQTTHandler(loop),
hostname=config["SMTP_BIND_ADDRESS"],
port=int(config["SMTP_LISTEN_PORT"] or 25),
auth_required=(config["SMTP_AUTH_REQUIRED"] or False),
auth_require_tls=False,
auth_callback=dummy_auth_function,
)

try:
if __name__ == "__main__":
log.debug(", ".join([f"{k}={v}" for k, v in config.items()]), extra={'uuid': 'main thread'})

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
controller = Controller(
handler=SMTP2MQTTHandler(loop),
loop=loop,
hostname=config["SMTP_BIND_ADDRESS"],
port=int(config["SMTP_LISTEN_PORT"]),
authenticator=dummy_auth_function,
auth_required=config["SMTP_AUTH_REQUIRED"],
auth_require_tls=not config["SMTP_AUTH_REQUIRED"],
)

controller.start()
log.info("Running", extra=log_extra("main thread"))
while not controller.handler.quit:
time.sleep(0.1)
finally:
log.info("Stopping controller...", extra=log_extra("main thread"))
controller.stop()
log.info("Running", extra={'uuid': 'main thread'})

try:
while not controller.handler.quit:
time.sleep(0.5)
controller.stop()
except Exception as e:
controller.stop()
log.exception("Error occurred while running the server", extra={'uuid': 'main thread'})
raise

0 comments on commit 446ac03

Please sign in to comment.