From 36bdb1cba731938c19b1ace0e9712bbf6fc83069 Mon Sep 17 00:00:00 2001 From: Matt Molyneaux Date: Fri, 3 Jan 2020 00:28:20 +0000 Subject: [PATCH] Implement asyncio receivers --- salmon/server.py | 105 +++++++++++++++++++++++++++++++++++------------ setup.py | 1 + 2 files changed, 80 insertions(+), 26 deletions(-) diff --git a/salmon/server.py b/salmon/server.py index 0ff3e5e..6dbcf5a 100644 --- a/salmon/server.py +++ b/salmon/server.py @@ -11,14 +11,20 @@ import time import traceback +from aiosmtpd.controller import Controller +from aiosmtpd.lmtp import LMTP +from aiosmtpd.smtp import SMTP from dns import resolver import lmtpd from salmon import __version__, mail, queue, routing from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES -lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__ -smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__ +ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__ +SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction" + +lmtpd.__version__ = ROUTER_VERSION_STRING +smtpd.__version__ = ROUTER_VERSION_STRING def undeliverable_message(raw_message, failure_type): @@ -153,6 +159,19 @@ def send(self, To, From, Subject, Body): self.deliver(msg) +def _deliver(receiver, Peer, From, To, Data, **kwargs): + try: + logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) + routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) + except SMTPError as err: + # looks like they want to return an error, so send it out + return str(err) + except Exception: + logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", + Peer, From, To) + undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + + class SMTPChannel(smtpd.SMTPChannel): """Replaces the standard SMTPChannel with one that rejects more than one recipient""" @@ -175,7 +194,7 @@ def smtp_RCPT(self, arg): # Of course, if smtpd.SMTPServer or SMTPReceiver implemented a # queue and bounces like you're meant too... logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.") - self.push("451 Will not accept multiple recipients in one transaction") + self.push(SMTP_MULTIPLE_RCPTS_ERROR) else: smtpd.SMTPChannel.smtp_RCPT(self, arg) @@ -216,17 +235,7 @@ def process_message(self, Peer, From, To, Data, **kwargs): """ Called by smtpd.SMTPServer when there's a message received. """ - - try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + return _deliver(self, Peer, From, To, Data, **kwargs) def close(self): """Doesn't do anything except log who called this, since nobody should. Ever.""" @@ -268,18 +277,7 @@ def process_message(self, Peer, From, To, Data, **kwargs): """ Called by lmtpd.LMTPServer when there's a message received. """ - - try: - logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To) - routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) - except SMTPError as err: - # looks like they want to return an error, so send it out - # and yes, you should still use SMTPError in your handlers - return str(err) - except Exception: - logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.", - Peer, From, To) - undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To)) + return _deliver(self, Peer, From, To, Data, **kwargs) def close(self): """Doesn't do anything except log who called this, since nobody should. Ever.""" @@ -287,6 +285,61 @@ def close(self): logging.error(trace) +class SMTPOnlyOneRcpt(SMTP): + async def smtp_RCPT(self, arg): + if self.envelope.rcpt_tos: + await self.push(SMTP_MULTIPLE_RCPTS_ERROR) + else: + await super().smtp_RCPT(arg) + + +class SMTPHandler: + async def handle_DATA(self, server, session, envelope): + assert len(envelope.rcpt_tos) == 1, "There should only be one RCPT TO" + return _deliver(self, session.peer, envelope.mail_from, envelope.rcpt_tos[0], envelope.content) + + +class AsyncSMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + def __init__(self, handler=None, **kwargs): + if handler is None: + handler = SMTPHandler() + super().__init__(handler, **kwargs) + + def factory(self): + return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def stop(self): + """Doesn't do anything except log who called this, since nobody should. Ever.""" + trace = traceback.format_exc(chain=False) + logging.error(trace) + + +class LMTPHandler: + async def handle_DATA(self, server, session, envelope): + statuses = [] + for rcpt in envelope.rcpt_tos: + statuses.append(_deliver(self, session.peer, envelope.mail_from, rcpt, envelope.content) or "250 Ok") + return "\r\n".join(statuses) + + +class AsyncLMTPReceiver(Controller): + """Receives emails and hands it to the Router for further processing.""" + # TODO: override Controller._run and make it choose between create_server and create_unix_server + def __init__(self, handler=None, **kwargs): + if handler is None: + handler = LMTPHandler() + super().__init__(handler, **kwargs) + + def factory(self): + return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING) + + def stop(self): + """Doesn't do anything except log who called this, since nobody should. Ever.""" + trace = traceback.format_exc(chain=False) + logging.error(trace) + + class QueueReceiver: """ Rather than listen on a socket this will watch a queue directory and diff --git a/setup.py b/setup.py index 2c0af7e..d274498 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import versioneer install_requires = [ + 'aiosmtpd', 'chardet', 'click', 'dnspython',