Skip to content

Commit

Permalink
Implement asyncio receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
moggers87 committed Jan 7, 2020
1 parent ba9b37f commit 36bdb1c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 26 deletions.
105 changes: 79 additions & 26 deletions salmon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import time
import traceback

from aiosmtpd.controller import Controller
from aiosmtpd.lmtp import LMTP
from aiosmtpd.smtp import SMTP
from dns import resolver
import lmtpd

from salmon import __version__, mail, queue, routing
from salmon.bounce import COMBINED_STATUS_CODES, PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES

lmtpd.__version__ = "Salmon Mail router LMTPD, version %s" % __version__
smtpd.__version__ = "Salmon Mail router SMTPD, version %s" % __version__
ROUTER_VERSION_STRING = "Salmon Mail router, version %s" % __version__
SMTP_MULTIPLE_RCPTS_ERROR = "451 Will not accept multiple recipients in one transaction"

lmtpd.__version__ = ROUTER_VERSION_STRING
smtpd.__version__ = ROUTER_VERSION_STRING


def undeliverable_message(raw_message, failure_type):
Expand Down Expand Up @@ -153,6 +159,19 @@ def send(self, To, From, Subject, Body):
self.deliver(msg)


def _deliver(receiver, Peer, From, To, Data, **kwargs):
try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))


class SMTPChannel(smtpd.SMTPChannel):
"""Replaces the standard SMTPChannel with one that rejects more than one recipient"""

Expand All @@ -175,7 +194,7 @@ def smtp_RCPT(self, arg):
# Of course, if smtpd.SMTPServer or SMTPReceiver implemented a
# queue and bounces like you're meant too...
logging.warning("Client attempted to deliver mail with multiple RCPT TOs. This is not supported.")
self.push("451 Will not accept multiple recipients in one transaction")
self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
smtpd.SMTPChannel.smtp_RCPT(self, arg)

Expand Down Expand Up @@ -216,17 +235,7 @@ def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by smtpd.SMTPServer when there's a message received.
"""

try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
return _deliver(self, Peer, From, To, Data, **kwargs)

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
Expand Down Expand Up @@ -268,25 +277,69 @@ def process_message(self, Peer, From, To, Data, **kwargs):
"""
Called by lmtpd.LMTPServer when there's a message received.
"""

try:
logging.debug("Message received from Peer: %r, From: %r, to To %r.", Peer, From, To)
routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
except SMTPError as err:
# looks like they want to return an error, so send it out
# and yes, you should still use SMTPError in your handlers
return str(err)
except Exception:
logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r.",
Peer, From, To)
undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
return _deliver(self, Peer, From, To, Data, **kwargs)

def close(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class SMTPOnlyOneRcpt(SMTP):
async def smtp_RCPT(self, arg):
if self.envelope.rcpt_tos:
await self.push(SMTP_MULTIPLE_RCPTS_ERROR)
else:
await super().smtp_RCPT(arg)


class SMTPHandler:
async def handle_DATA(self, server, session, envelope):
assert len(envelope.rcpt_tos) == 1, "There should only be one RCPT TO"
return _deliver(self, session.peer, envelope.mail_from, envelope.rcpt_tos[0], envelope.content)


class AsyncSMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
def __init__(self, handler=None, **kwargs):
if handler is None:
handler = SMTPHandler()
super().__init__(handler, **kwargs)

def factory(self):
return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)

def stop(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class LMTPHandler:
async def handle_DATA(self, server, session, envelope):
statuses = []
for rcpt in envelope.rcpt_tos:
statuses.append(_deliver(self, session.peer, envelope.mail_from, rcpt, envelope.content) or "250 Ok")
return "\r\n".join(statuses)


class AsyncLMTPReceiver(Controller):
"""Receives emails and hands it to the Router for further processing."""
# TODO: override Controller._run and make it choose between create_server and create_unix_server
def __init__(self, handler=None, **kwargs):
if handler is None:
handler = LMTPHandler()
super().__init__(handler, **kwargs)

def factory(self):
return LMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)

def stop(self):
"""Doesn't do anything except log who called this, since nobody should. Ever."""
trace = traceback.format_exc(chain=False)
logging.error(trace)


class QueueReceiver:
"""
Rather than listen on a socket this will watch a queue directory and
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import versioneer

install_requires = [
'aiosmtpd',
'chardet',
'click',
'dnspython',
Expand Down

0 comments on commit 36bdb1c

Please sign in to comment.